From 16e8e9ec199cf10314fa88c536fa299d31f08250 Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Sun, 29 Jul 2018 16:29:04 +0200 Subject: [PATCH] Initial commit --- LICENSE | 18 +++++ cmd/main.go | 37 +++++++++ consumer.go | 156 +++++++++++++++++++++++++++++++++++++ decoder.go | 218 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 429 insertions(+) create mode 100644 LICENSE create mode 100644 cmd/main.go create mode 100644 consumer.go create mode 100644 decoder.go diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..ba7a548 --- /dev/null +++ b/LICENSE @@ -0,0 +1,18 @@ +Copyright 2018 Gregory Eremin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..ed5f29f --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,37 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + "strings" + + "github.com/localhots/koff" + "github.com/localhots/pretty" +) + +const topicName = "__consumer_offsets" + +func main() { + brokers := flag.String("brokers", "", "Comma separated list of brokers") + flag.Parse() + + if *brokers == "" { + fmt.Println("Brokers list required") + flag.Usage() + os.Exit(1) + } + + c, err := koff.NewConsumer(strings.Split(*brokers, ","), false) + if err != nil { + log.Fatalf("Failed to create consumer: %v", err) + } + defer c.Close() + + for msg := range c.Messages() { + if msg.GroupMessage != nil { + pretty.Println("MESSAGE", msg) + } + } +} diff --git a/consumer.go b/consumer.go new file mode 100644 index 0000000..995b572 --- /dev/null +++ b/consumer.go @@ -0,0 +1,156 @@ +package koff + +import ( + "context" + "sync" + "time" + + "github.com/Shopify/sarama" + "github.com/localhots/gobelt/log" +) + +const topicName = "__consumer_offsets" + +// Consumer reads messages from __consumer_offsets topic and decodes them into +// OffsetMessages. +type Consumer struct { + client sarama.Client + cons sarama.Consumer + pcs map[int32]sarama.PartitionConsumer + msgs chan Message + watch bool + + lock sync.Mutex + wg sync.WaitGroup + pticker *time.Ticker +} + +// NewConsumer creates a new Kafka offsets topic consumer. +func NewConsumer(brokers []string, watch bool) (*Consumer, error) { + log.Info(context.TODO(), "Creating client") + client, err := sarama.NewClient(brokers, sarama.NewConfig()) + if err != nil { + return nil, err + } + + log.Info(context.TODO(), "Creating consumer") + sc, err := sarama.NewConsumerFromClient(client) + if err != nil { + return nil, err + } + c := &Consumer{ + client: client, + cons: sc, + pcs: make(map[int32]sarama.PartitionConsumer), + msgs: make(chan Message), + watch: watch, + } + log.Info(context.TODO(), "Setting up partition consumers") + if err := c.setupPartitionConsumers(); err != nil { + return nil, err + } + + if watch { + c.wg.Add(1) + go c.keepPartitionConsumersUpdated(30 * time.Second) + } + + return c, nil +} + +// Messages returns a read only channel of offset messages. +func (c *Consumer) Messages() <-chan Message { + return c.msgs +} + +// Close shuts down the consumer. +func (c *Consumer) Close() error { + c.pticker.Stop() + for _, pc := range c.pcs { + if err := pc.Close(); err != nil { + return err + } + } + if err := c.cons.Close(); err != nil { + return err + } + c.wg.Wait() + close(c.msgs) + return nil +} + +func (c *Consumer) keepPartitionConsumersUpdated(interval time.Duration) { + c.pticker = time.NewTicker(interval) + for range c.pticker.C { + c.setupPartitionConsumers() + } +} + +func (c *Consumer) setupPartitionConsumers() error { + log.Info(context.TODO(), "Fetching partition list") + partitions, err := c.cons.Partitions(topicName) + if err != nil { + return err + } + + for _, partition := range partitions { + if _, ok := c.pcs[partition]; !ok { + c.wg.Add(1) + go c.consumePartition(partition) + } + } + + pmap := make(map[int32]struct{}, len(partitions)) + for _, partition := range partitions { + pmap[partition] = struct{}{} + } + + for partition, pc := range c.pcs { + if _, ok := pmap[partition]; !ok { + err := pc.Close() + if err != nil { + return err + } + c.lock.Lock() + delete(c.pcs, partition) + c.lock.Unlock() + } + } + + return nil +} + +func (c *Consumer) consumePartition(partition int32) error { + defer c.wg.Done() + + pc, err := c.cons.ConsumePartition(topicName, partition, sarama.OffsetOldest) + if err != nil { + return err + } + log.Info(context.TODO(), "Started partition consumer", log.F{"p": partition}) + + c.lock.Lock() + c.pcs[partition] = pc + c.lock.Unlock() + + var maxOffset *int64 + if c.watch { + off, err := c.client.GetOffset(topicName, partition, sarama.OffsetNewest) + if err != nil { + return err + } + maxOffset = &off + } + + for msg := range pc.Messages() { + if msg.Value == nil { + continue + } + c.msgs <- Decode(msg.Key, msg.Value) + if maxOffset != nil && msg.Offset == *maxOffset { + return nil + } + } + + return nil +} diff --git a/decoder.go b/decoder.go new file mode 100644 index 0000000..0925ddb --- /dev/null +++ b/decoder.go @@ -0,0 +1,218 @@ +package koff + +import ( + "encoding/binary" + "fmt" + "runtime/debug" + "time" +) + +// Message is the main structure that wraps a consumer offsets topic message. +type Message struct { + Consumer string + OffsetMessage *OffsetMessage + GroupMessage *GroupMessage +} + +// OffsetMessage is a kind of message that carries individual consumer offset. +type OffsetMessage struct { + Topic string + Partition int32 + Offset int64 + Metadata string + CommittedAt time.Time + ExpiresAt time.Time +} + +// GroupMessage contains consumer group metadata. +type GroupMessage struct { + ProtocolType string + GenerationID int32 + LeaderID string + Protocol string + Members []GroupMember +} + +// GroupMember contains metadata for a consumer group member. +type GroupMember struct { + ID string + ClientID string + ClientHost string + SessionTimeout int32 + RebalanceTimeout int32 + Subscription []TopicAndPartition + Assignment []TopicAndPartition +} + +// TopicAndPartition is a tuple of topic and partition. +type TopicAndPartition struct { + Topic string + Partition int32 +} + +// Decode decodes message key and value into an OffsetMessage. +func Decode(key, val []byte) Message { + var m Message + m.decodeKey(key) + if m.OffsetMessage != nil { + m.decodeOffsetMessage(val) + } else { + m.decodeGroupMetadata(val) + } + + return m +} + +// Key structure: +// [2] Version, uint16 big endian +// [2] Consumer name length, uint16 big endian +// [^] Consumer name +// if Version is 0 or 1 { +// [2] Topic length, uint16 big endian +// [^] Topic +// [4] Partition, uint32 big endian +// } +func (m *Message) decodeKey(key []byte) { + buf := &buffer{data: key} + version := buf.readInt16() + m.Consumer = buf.readString() + if version < 2 { + m.OffsetMessage = &OffsetMessage{ + Topic: buf.readString(), + Partition: buf.readInt32(), + } + } +} + +// Value structure: +// [2] Version, uint16 big endian +// [8] Offset, uint32 big endian +// [2] Meta length, uint16 big endian +// [^] Meta +// [8] Commit time, unix timestamp with millisecond precision +// if Version is 1 { +// [8] Expire time, unix timestamp with millisecond precision +// } +func (m *Message) decodeOffsetMessage(val []byte) { + buf := &buffer{data: val} + om := m.OffsetMessage + version := buf.readInt16() + om.Offset = buf.readInt64() + om.Metadata = buf.readString() + om.CommittedAt = makeTime(buf.readInt64()) + if version == 1 { + om.ExpiresAt = makeTime(buf.readInt64()) + } +} + +func (m *Message) decodeGroupMetadata(val []byte) { + defer func() { + if err := recover(); err != nil { + fmt.Println(err) + debug.PrintStack() + fmt.Println(val, m) + } + }() + buf := &buffer{data: val} + m.GroupMessage = &GroupMessage{Members: []GroupMember{}} + gm := m.GroupMessage + version := buf.readInt16() + if version > 1 { + return + } + gm.ProtocolType = buf.readString() + gm.GenerationID = buf.readInt32() + if gm.GenerationID > 1 { + next := buf.readInt32() + if next == -1 { + return + } + buf.pos -= 4 + } + + gm.LeaderID = buf.readString() + gm.Protocol = buf.readString() + + ary := buf.readInt32() + if ary == 0 { + return + } + + gm.Members = append(gm.Members, GroupMember{ + ID: buf.readString(), + ClientID: buf.readString(), + ClientHost: buf.readString(), + SessionTimeout: buf.readInt32(), + RebalanceTimeout: buf.readInt32(), + }) + gm.Members[0].Subscription = readAssignment(buf) + gm.Members[0].Assignment = readAssignment(buf) +} + +func readAssignment(buf *buffer) []TopicAndPartition { + ass := []TopicAndPartition{} + buf.skip(2) // Eh + size := buf.readInt32() + if size == 0 { + buf.skip(4) + return ass + } + for i := 0; i < int(size); i++ { + ass = append(ass, readTopicAndSubscription(buf)) + buf.skip(4) // Hash key or smth + } + return ass +} + +func readTopicAndSubscription(buf *buffer) TopicAndPartition { + return TopicAndPartition{ + Topic: buf.readString(), + Partition: buf.readInt32(), + } +} + +func makeTime(ts int64) time.Time { + return time.Unix(ts/1000, (ts%1000)*1000000) +} + +// +// Buffer +// + +type buffer struct { + data []byte + pos int +} + +func (b *buffer) skip(n int) { + b.pos += n +} + +func (b *buffer) readBytes(n int) []byte { + b.pos += n + return b.data[b.pos-n : b.pos] +} + +func (b *buffer) readInt16() int16 { + i := binary.BigEndian.Uint16(b.data[b.pos:]) + b.pos += 2 + return int16(i) +} + +func (b *buffer) readInt32() int32 { + i := binary.BigEndian.Uint32(b.data[b.pos:]) + b.pos += 4 + return int32(i) +} + +func (b *buffer) readInt64() int64 { + i := binary.BigEndian.Uint64(b.data[b.pos:]) + b.pos += 8 + return int64(i) +} + +func (b *buffer) readString() string { + strlen := int(b.readInt16()) + b.pos += strlen + return string(b.data[b.pos-strlen : b.pos]) +}