From 119d6d0d2caaf380281745bf8a39a78bca5fa385 Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Sun, 29 Jul 2018 18:53:36 +0200 Subject: [PATCH] Add context for logging and cancellation --- consumer.go | 33 +++++++++++++++++++++------- decoder.go | 63 +++++++++++++++++++++++++++++++---------------------- 2 files changed, 62 insertions(+), 34 deletions(-) diff --git a/consumer.go b/consumer.go index 995b572..959d518 100644 --- a/consumer.go +++ b/consumer.go @@ -20,6 +20,8 @@ type Consumer struct { msgs chan Message watch bool + ctx context.Context + stopFn func() lock sync.Mutex wg sync.WaitGroup pticker *time.Ticker @@ -27,25 +29,29 @@ type Consumer struct { // NewConsumer creates a new Kafka offsets topic consumer. func NewConsumer(brokers []string, watch bool) (*Consumer, error) { - log.Info(context.TODO(), "Creating client") + ctx := context.Background() + log.Info(ctx, "Creating client", log.F{"brokers": brokers}) client, err := sarama.NewClient(brokers, sarama.NewConfig()) if err != nil { return nil, err } - log.Info(context.TODO(), "Creating consumer") + log.Info(ctx, "Creating consumer") sc, err := sarama.NewConsumerFromClient(client) if err != nil { return nil, err } + ctx, cancel := context.WithCancel(ctx) c := &Consumer{ client: client, cons: sc, pcs: make(map[int32]sarama.PartitionConsumer), msgs: make(chan Message), watch: watch, + ctx: ctx, + stopFn: cancel, } - log.Info(context.TODO(), "Setting up partition consumers") + log.Info(ctx, "Setting up partition consumers") if err := c.setupPartitionConsumers(); err != nil { return nil, err } @@ -65,6 +71,7 @@ func (c *Consumer) Messages() <-chan Message { // Close shuts down the consumer. func (c *Consumer) Close() error { + c.stopFn() c.pticker.Stop() for _, pc := range c.pcs { if err := pc.Close(); err != nil { @@ -81,13 +88,20 @@ func (c *Consumer) Close() error { func (c *Consumer) keepPartitionConsumersUpdated(interval time.Duration) { c.pticker = time.NewTicker(interval) - for range c.pticker.C { - c.setupPartitionConsumers() + for { + select { + case <-c.pticker.C: + if err := c.setupPartitionConsumers(); err != nil { + log.Error(c.ctx, "Failed to setup update partition consumers", log.F{"error": err}) + } + case <-c.ctx.Done(): + return + } } } func (c *Consumer) setupPartitionConsumers() error { - log.Info(context.TODO(), "Fetching partition list") + log.Info(c.ctx, "Fetching partition list") partitions, err := c.cons.Partitions(topicName) if err != nil { return err @@ -122,12 +136,12 @@ func (c *Consumer) setupPartitionConsumers() error { func (c *Consumer) consumePartition(partition int32) error { defer c.wg.Done() + ctx := log.ContextWithFields(c.ctx, log.F{"partition": partition}) pc, err := c.cons.ConsumePartition(topicName, partition, sarama.OffsetOldest) if err != nil { return err } - log.Info(context.TODO(), "Started partition consumer", log.F{"p": partition}) c.lock.Lock() c.pcs[partition] = pc @@ -135,6 +149,7 @@ func (c *Consumer) consumePartition(partition int32) error { var maxOffset *int64 if c.watch { + log.Debug(ctx, "Fetching last offset") off, err := c.client.GetOffset(topicName, partition, sarama.OffsetNewest) if err != nil { return err @@ -142,11 +157,13 @@ func (c *Consumer) consumePartition(partition int32) error { maxOffset = &off } + log.Info(ctx, "Started partition consumer") for msg := range pc.Messages() { if msg.Value == nil { continue } - c.msgs <- Decode(msg.Key, msg.Value) + ctx := log.ContextWithFields(ctx, log.F{"offset": msg.Offset}) + c.msgs <- Decode(ctx, msg.Key, msg.Value) if maxOffset != nil && msg.Offset == *maxOffset { return nil } diff --git a/decoder.go b/decoder.go index 0925ddb..03f3d37 100644 --- a/decoder.go +++ b/decoder.go @@ -1,10 +1,12 @@ package koff import ( + "context" "encoding/binary" - "fmt" "runtime/debug" "time" + + "github.com/localhots/gobelt/log" ) // Message is the main structure that wraps a consumer offsets topic message. @@ -38,8 +40,8 @@ type GroupMember struct { ID string ClientID string ClientHost string - SessionTimeout int32 - RebalanceTimeout int32 + SessionTimeout time.Duration + RebalanceTimeout time.Duration Subscription []TopicAndPartition Assignment []TopicAndPartition } @@ -51,13 +53,24 @@ type TopicAndPartition struct { } // Decode decodes message key and value into an OffsetMessage. -func Decode(key, val []byte) Message { +func Decode(ctx context.Context, key, val []byte) Message { var m Message + defer func() { + if err := recover(); err != nil { + log.Error(ctx, "Failed to decode group metadata", log.F{ + "error": err, + "payload": val, + "message": m, + }) + debug.PrintStack() + } + }() + m.decodeKey(key) if m.OffsetMessage != nil { m.decodeOffsetMessage(val) } else { - m.decodeGroupMetadata(val) + m.decodeGroupMetadata(ctx, val) } return m @@ -105,14 +118,7 @@ func (m *Message) decodeOffsetMessage(val []byte) { } } -func (m *Message) decodeGroupMetadata(val []byte) { - defer func() { - if err := recover(); err != nil { - fmt.Println(err) - debug.PrintStack() - fmt.Println(val, m) - } - }() +func (m *Message) decodeGroupMetadata(ctx context.Context, val []byte) { buf := &buffer{data: val} m.GroupMessage = &GroupMessage{Members: []GroupMember{}} gm := m.GroupMessage @@ -123,6 +129,9 @@ func (m *Message) decodeGroupMetadata(val []byte) { gm.ProtocolType = buf.readString() gm.GenerationID = buf.readInt32() if gm.GenerationID > 1 { + // Messages with generation greater than one often lack remaining fields + // If followin sequence is like 0xFF 0xFF 0xFF 0xFF 0x00 0x00 ... + // then just skip that shit next := buf.readInt32() if next == -1 { return @@ -133,20 +142,18 @@ func (m *Message) decodeGroupMetadata(val []byte) { gm.LeaderID = buf.readString() gm.Protocol = buf.readString() - ary := buf.readInt32() - if ary == 0 { - return + arySize := int(buf.readInt32()) + for i := 0; i < arySize; i++ { + gm.Members = append(gm.Members, GroupMember{ + ID: buf.readString(), + ClientID: buf.readString(), + ClientHost: buf.readString(), + SessionTimeout: makeDur(buf.readInt32()), + RebalanceTimeout: makeDur(buf.readInt32()), + Subscription: readAssignment(buf), + Assignment: readAssignment(buf), + }) } - - gm.Members = append(gm.Members, GroupMember{ - ID: buf.readString(), - ClientID: buf.readString(), - ClientHost: buf.readString(), - SessionTimeout: buf.readInt32(), - RebalanceTimeout: buf.readInt32(), - }) - gm.Members[0].Subscription = readAssignment(buf) - gm.Members[0].Assignment = readAssignment(buf) } func readAssignment(buf *buffer) []TopicAndPartition { @@ -175,6 +182,10 @@ func makeTime(ts int64) time.Time { return time.Unix(ts/1000, (ts%1000)*1000000) } +func makeDur(to int32) time.Duration { + return time.Duration(to) * time.Millisecond +} + // // Buffer //