Add context for logging and cancellation
This commit is contained in:
parent
16e8e9ec19
commit
119d6d0d2c
33
consumer.go
33
consumer.go
|
@ -20,6 +20,8 @@ type Consumer struct {
|
|||
msgs chan Message
|
||||
watch bool
|
||||
|
||||
ctx context.Context
|
||||
stopFn func()
|
||||
lock sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
pticker *time.Ticker
|
||||
|
@ -27,25 +29,29 @@ type Consumer struct {
|
|||
|
||||
// NewConsumer creates a new Kafka offsets topic consumer.
|
||||
func NewConsumer(brokers []string, watch bool) (*Consumer, error) {
|
||||
log.Info(context.TODO(), "Creating client")
|
||||
ctx := context.Background()
|
||||
log.Info(ctx, "Creating client", log.F{"brokers": brokers})
|
||||
client, err := sarama.NewClient(brokers, sarama.NewConfig())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Info(context.TODO(), "Creating consumer")
|
||||
log.Info(ctx, "Creating consumer")
|
||||
sc, err := sarama.NewConsumerFromClient(client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
c := &Consumer{
|
||||
client: client,
|
||||
cons: sc,
|
||||
pcs: make(map[int32]sarama.PartitionConsumer),
|
||||
msgs: make(chan Message),
|
||||
watch: watch,
|
||||
ctx: ctx,
|
||||
stopFn: cancel,
|
||||
}
|
||||
log.Info(context.TODO(), "Setting up partition consumers")
|
||||
log.Info(ctx, "Setting up partition consumers")
|
||||
if err := c.setupPartitionConsumers(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -65,6 +71,7 @@ func (c *Consumer) Messages() <-chan Message {
|
|||
|
||||
// Close shuts down the consumer.
|
||||
func (c *Consumer) Close() error {
|
||||
c.stopFn()
|
||||
c.pticker.Stop()
|
||||
for _, pc := range c.pcs {
|
||||
if err := pc.Close(); err != nil {
|
||||
|
@ -81,13 +88,20 @@ func (c *Consumer) Close() error {
|
|||
|
||||
func (c *Consumer) keepPartitionConsumersUpdated(interval time.Duration) {
|
||||
c.pticker = time.NewTicker(interval)
|
||||
for range c.pticker.C {
|
||||
c.setupPartitionConsumers()
|
||||
for {
|
||||
select {
|
||||
case <-c.pticker.C:
|
||||
if err := c.setupPartitionConsumers(); err != nil {
|
||||
log.Error(c.ctx, "Failed to setup update partition consumers", log.F{"error": err})
|
||||
}
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) setupPartitionConsumers() error {
|
||||
log.Info(context.TODO(), "Fetching partition list")
|
||||
log.Info(c.ctx, "Fetching partition list")
|
||||
partitions, err := c.cons.Partitions(topicName)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -122,12 +136,12 @@ func (c *Consumer) setupPartitionConsumers() error {
|
|||
|
||||
func (c *Consumer) consumePartition(partition int32) error {
|
||||
defer c.wg.Done()
|
||||
ctx := log.ContextWithFields(c.ctx, log.F{"partition": partition})
|
||||
|
||||
pc, err := c.cons.ConsumePartition(topicName, partition, sarama.OffsetOldest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Info(context.TODO(), "Started partition consumer", log.F{"p": partition})
|
||||
|
||||
c.lock.Lock()
|
||||
c.pcs[partition] = pc
|
||||
|
@ -135,6 +149,7 @@ func (c *Consumer) consumePartition(partition int32) error {
|
|||
|
||||
var maxOffset *int64
|
||||
if c.watch {
|
||||
log.Debug(ctx, "Fetching last offset")
|
||||
off, err := c.client.GetOffset(topicName, partition, sarama.OffsetNewest)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -142,11 +157,13 @@ func (c *Consumer) consumePartition(partition int32) error {
|
|||
maxOffset = &off
|
||||
}
|
||||
|
||||
log.Info(ctx, "Started partition consumer")
|
||||
for msg := range pc.Messages() {
|
||||
if msg.Value == nil {
|
||||
continue
|
||||
}
|
||||
c.msgs <- Decode(msg.Key, msg.Value)
|
||||
ctx := log.ContextWithFields(ctx, log.F{"offset": msg.Offset})
|
||||
c.msgs <- Decode(ctx, msg.Key, msg.Value)
|
||||
if maxOffset != nil && msg.Offset == *maxOffset {
|
||||
return nil
|
||||
}
|
||||
|
|
63
decoder.go
63
decoder.go
|
@ -1,10 +1,12 @@
|
|||
package koff
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"time"
|
||||
|
||||
"github.com/localhots/gobelt/log"
|
||||
)
|
||||
|
||||
// Message is the main structure that wraps a consumer offsets topic message.
|
||||
|
@ -38,8 +40,8 @@ type GroupMember struct {
|
|||
ID string
|
||||
ClientID string
|
||||
ClientHost string
|
||||
SessionTimeout int32
|
||||
RebalanceTimeout int32
|
||||
SessionTimeout time.Duration
|
||||
RebalanceTimeout time.Duration
|
||||
Subscription []TopicAndPartition
|
||||
Assignment []TopicAndPartition
|
||||
}
|
||||
|
@ -51,13 +53,24 @@ type TopicAndPartition struct {
|
|||
}
|
||||
|
||||
// Decode decodes message key and value into an OffsetMessage.
|
||||
func Decode(key, val []byte) Message {
|
||||
func Decode(ctx context.Context, key, val []byte) Message {
|
||||
var m Message
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
log.Error(ctx, "Failed to decode group metadata", log.F{
|
||||
"error": err,
|
||||
"payload": val,
|
||||
"message": m,
|
||||
})
|
||||
debug.PrintStack()
|
||||
}
|
||||
}()
|
||||
|
||||
m.decodeKey(key)
|
||||
if m.OffsetMessage != nil {
|
||||
m.decodeOffsetMessage(val)
|
||||
} else {
|
||||
m.decodeGroupMetadata(val)
|
||||
m.decodeGroupMetadata(ctx, val)
|
||||
}
|
||||
|
||||
return m
|
||||
|
@ -105,14 +118,7 @@ func (m *Message) decodeOffsetMessage(val []byte) {
|
|||
}
|
||||
}
|
||||
|
||||
func (m *Message) decodeGroupMetadata(val []byte) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
fmt.Println(err)
|
||||
debug.PrintStack()
|
||||
fmt.Println(val, m)
|
||||
}
|
||||
}()
|
||||
func (m *Message) decodeGroupMetadata(ctx context.Context, val []byte) {
|
||||
buf := &buffer{data: val}
|
||||
m.GroupMessage = &GroupMessage{Members: []GroupMember{}}
|
||||
gm := m.GroupMessage
|
||||
|
@ -123,6 +129,9 @@ func (m *Message) decodeGroupMetadata(val []byte) {
|
|||
gm.ProtocolType = buf.readString()
|
||||
gm.GenerationID = buf.readInt32()
|
||||
if gm.GenerationID > 1 {
|
||||
// Messages with generation greater than one often lack remaining fields
|
||||
// If followin sequence is like 0xFF 0xFF 0xFF 0xFF 0x00 0x00 ...
|
||||
// then just skip that shit
|
||||
next := buf.readInt32()
|
||||
if next == -1 {
|
||||
return
|
||||
|
@ -133,20 +142,18 @@ func (m *Message) decodeGroupMetadata(val []byte) {
|
|||
gm.LeaderID = buf.readString()
|
||||
gm.Protocol = buf.readString()
|
||||
|
||||
ary := buf.readInt32()
|
||||
if ary == 0 {
|
||||
return
|
||||
arySize := int(buf.readInt32())
|
||||
for i := 0; i < arySize; i++ {
|
||||
gm.Members = append(gm.Members, GroupMember{
|
||||
ID: buf.readString(),
|
||||
ClientID: buf.readString(),
|
||||
ClientHost: buf.readString(),
|
||||
SessionTimeout: makeDur(buf.readInt32()),
|
||||
RebalanceTimeout: makeDur(buf.readInt32()),
|
||||
Subscription: readAssignment(buf),
|
||||
Assignment: readAssignment(buf),
|
||||
})
|
||||
}
|
||||
|
||||
gm.Members = append(gm.Members, GroupMember{
|
||||
ID: buf.readString(),
|
||||
ClientID: buf.readString(),
|
||||
ClientHost: buf.readString(),
|
||||
SessionTimeout: buf.readInt32(),
|
||||
RebalanceTimeout: buf.readInt32(),
|
||||
})
|
||||
gm.Members[0].Subscription = readAssignment(buf)
|
||||
gm.Members[0].Assignment = readAssignment(buf)
|
||||
}
|
||||
|
||||
func readAssignment(buf *buffer) []TopicAndPartition {
|
||||
|
@ -175,6 +182,10 @@ func makeTime(ts int64) time.Time {
|
|||
return time.Unix(ts/1000, (ts%1000)*1000000)
|
||||
}
|
||||
|
||||
func makeDur(to int32) time.Duration {
|
||||
return time.Duration(to) * time.Millisecond
|
||||
}
|
||||
|
||||
//
|
||||
// Buffer
|
||||
//
|
||||
|
|
Loading…
Reference in New Issue