1
0
Fork 0

Initial commit

This commit is contained in:
Gregory Eremin 2018-07-29 16:29:04 +02:00
commit 16e8e9ec19
4 changed files with 429 additions and 0 deletions

18
LICENSE Normal file
View File

@ -0,0 +1,18 @@
Copyright 2018 Gregory Eremin
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

37
cmd/main.go Normal file
View File

@ -0,0 +1,37 @@
package main
import (
"flag"
"fmt"
"log"
"os"
"strings"
"github.com/localhots/koff"
"github.com/localhots/pretty"
)
const topicName = "__consumer_offsets"
func main() {
brokers := flag.String("brokers", "", "Comma separated list of brokers")
flag.Parse()
if *brokers == "" {
fmt.Println("Brokers list required")
flag.Usage()
os.Exit(1)
}
c, err := koff.NewConsumer(strings.Split(*brokers, ","), false)
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
defer c.Close()
for msg := range c.Messages() {
if msg.GroupMessage != nil {
pretty.Println("MESSAGE", msg)
}
}
}

156
consumer.go Normal file
View File

@ -0,0 +1,156 @@
package koff
import (
"context"
"sync"
"time"
"github.com/Shopify/sarama"
"github.com/localhots/gobelt/log"
)
const topicName = "__consumer_offsets"
// Consumer reads messages from __consumer_offsets topic and decodes them into
// OffsetMessages.
type Consumer struct {
client sarama.Client
cons sarama.Consumer
pcs map[int32]sarama.PartitionConsumer
msgs chan Message
watch bool
lock sync.Mutex
wg sync.WaitGroup
pticker *time.Ticker
}
// NewConsumer creates a new Kafka offsets topic consumer.
func NewConsumer(brokers []string, watch bool) (*Consumer, error) {
log.Info(context.TODO(), "Creating client")
client, err := sarama.NewClient(brokers, sarama.NewConfig())
if err != nil {
return nil, err
}
log.Info(context.TODO(), "Creating consumer")
sc, err := sarama.NewConsumerFromClient(client)
if err != nil {
return nil, err
}
c := &Consumer{
client: client,
cons: sc,
pcs: make(map[int32]sarama.PartitionConsumer),
msgs: make(chan Message),
watch: watch,
}
log.Info(context.TODO(), "Setting up partition consumers")
if err := c.setupPartitionConsumers(); err != nil {
return nil, err
}
if watch {
c.wg.Add(1)
go c.keepPartitionConsumersUpdated(30 * time.Second)
}
return c, nil
}
// Messages returns a read only channel of offset messages.
func (c *Consumer) Messages() <-chan Message {
return c.msgs
}
// Close shuts down the consumer.
func (c *Consumer) Close() error {
c.pticker.Stop()
for _, pc := range c.pcs {
if err := pc.Close(); err != nil {
return err
}
}
if err := c.cons.Close(); err != nil {
return err
}
c.wg.Wait()
close(c.msgs)
return nil
}
func (c *Consumer) keepPartitionConsumersUpdated(interval time.Duration) {
c.pticker = time.NewTicker(interval)
for range c.pticker.C {
c.setupPartitionConsumers()
}
}
func (c *Consumer) setupPartitionConsumers() error {
log.Info(context.TODO(), "Fetching partition list")
partitions, err := c.cons.Partitions(topicName)
if err != nil {
return err
}
for _, partition := range partitions {
if _, ok := c.pcs[partition]; !ok {
c.wg.Add(1)
go c.consumePartition(partition)
}
}
pmap := make(map[int32]struct{}, len(partitions))
for _, partition := range partitions {
pmap[partition] = struct{}{}
}
for partition, pc := range c.pcs {
if _, ok := pmap[partition]; !ok {
err := pc.Close()
if err != nil {
return err
}
c.lock.Lock()
delete(c.pcs, partition)
c.lock.Unlock()
}
}
return nil
}
func (c *Consumer) consumePartition(partition int32) error {
defer c.wg.Done()
pc, err := c.cons.ConsumePartition(topicName, partition, sarama.OffsetOldest)
if err != nil {
return err
}
log.Info(context.TODO(), "Started partition consumer", log.F{"p": partition})
c.lock.Lock()
c.pcs[partition] = pc
c.lock.Unlock()
var maxOffset *int64
if c.watch {
off, err := c.client.GetOffset(topicName, partition, sarama.OffsetNewest)
if err != nil {
return err
}
maxOffset = &off
}
for msg := range pc.Messages() {
if msg.Value == nil {
continue
}
c.msgs <- Decode(msg.Key, msg.Value)
if maxOffset != nil && msg.Offset == *maxOffset {
return nil
}
}
return nil
}

218
decoder.go Normal file
View File

@ -0,0 +1,218 @@
package koff
import (
"encoding/binary"
"fmt"
"runtime/debug"
"time"
)
// Message is the main structure that wraps a consumer offsets topic message.
type Message struct {
Consumer string
OffsetMessage *OffsetMessage
GroupMessage *GroupMessage
}
// OffsetMessage is a kind of message that carries individual consumer offset.
type OffsetMessage struct {
Topic string
Partition int32
Offset int64
Metadata string
CommittedAt time.Time
ExpiresAt time.Time
}
// GroupMessage contains consumer group metadata.
type GroupMessage struct {
ProtocolType string
GenerationID int32
LeaderID string
Protocol string
Members []GroupMember
}
// GroupMember contains metadata for a consumer group member.
type GroupMember struct {
ID string
ClientID string
ClientHost string
SessionTimeout int32
RebalanceTimeout int32
Subscription []TopicAndPartition
Assignment []TopicAndPartition
}
// TopicAndPartition is a tuple of topic and partition.
type TopicAndPartition struct {
Topic string
Partition int32
}
// Decode decodes message key and value into an OffsetMessage.
func Decode(key, val []byte) Message {
var m Message
m.decodeKey(key)
if m.OffsetMessage != nil {
m.decodeOffsetMessage(val)
} else {
m.decodeGroupMetadata(val)
}
return m
}
// Key structure:
// [2] Version, uint16 big endian
// [2] Consumer name length, uint16 big endian
// [^] Consumer name
// if Version is 0 or 1 {
// [2] Topic length, uint16 big endian
// [^] Topic
// [4] Partition, uint32 big endian
// }
func (m *Message) decodeKey(key []byte) {
buf := &buffer{data: key}
version := buf.readInt16()
m.Consumer = buf.readString()
if version < 2 {
m.OffsetMessage = &OffsetMessage{
Topic: buf.readString(),
Partition: buf.readInt32(),
}
}
}
// Value structure:
// [2] Version, uint16 big endian
// [8] Offset, uint32 big endian
// [2] Meta length, uint16 big endian
// [^] Meta
// [8] Commit time, unix timestamp with millisecond precision
// if Version is 1 {
// [8] Expire time, unix timestamp with millisecond precision
// }
func (m *Message) decodeOffsetMessage(val []byte) {
buf := &buffer{data: val}
om := m.OffsetMessage
version := buf.readInt16()
om.Offset = buf.readInt64()
om.Metadata = buf.readString()
om.CommittedAt = makeTime(buf.readInt64())
if version == 1 {
om.ExpiresAt = makeTime(buf.readInt64())
}
}
func (m *Message) decodeGroupMetadata(val []byte) {
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
debug.PrintStack()
fmt.Println(val, m)
}
}()
buf := &buffer{data: val}
m.GroupMessage = &GroupMessage{Members: []GroupMember{}}
gm := m.GroupMessage
version := buf.readInt16()
if version > 1 {
return
}
gm.ProtocolType = buf.readString()
gm.GenerationID = buf.readInt32()
if gm.GenerationID > 1 {
next := buf.readInt32()
if next == -1 {
return
}
buf.pos -= 4
}
gm.LeaderID = buf.readString()
gm.Protocol = buf.readString()
ary := buf.readInt32()
if ary == 0 {
return
}
gm.Members = append(gm.Members, GroupMember{
ID: buf.readString(),
ClientID: buf.readString(),
ClientHost: buf.readString(),
SessionTimeout: buf.readInt32(),
RebalanceTimeout: buf.readInt32(),
})
gm.Members[0].Subscription = readAssignment(buf)
gm.Members[0].Assignment = readAssignment(buf)
}
func readAssignment(buf *buffer) []TopicAndPartition {
ass := []TopicAndPartition{}
buf.skip(2) // Eh
size := buf.readInt32()
if size == 0 {
buf.skip(4)
return ass
}
for i := 0; i < int(size); i++ {
ass = append(ass, readTopicAndSubscription(buf))
buf.skip(4) // Hash key or smth
}
return ass
}
func readTopicAndSubscription(buf *buffer) TopicAndPartition {
return TopicAndPartition{
Topic: buf.readString(),
Partition: buf.readInt32(),
}
}
func makeTime(ts int64) time.Time {
return time.Unix(ts/1000, (ts%1000)*1000000)
}
//
// Buffer
//
type buffer struct {
data []byte
pos int
}
func (b *buffer) skip(n int) {
b.pos += n
}
func (b *buffer) readBytes(n int) []byte {
b.pos += n
return b.data[b.pos-n : b.pos]
}
func (b *buffer) readInt16() int16 {
i := binary.BigEndian.Uint16(b.data[b.pos:])
b.pos += 2
return int16(i)
}
func (b *buffer) readInt32() int32 {
i := binary.BigEndian.Uint32(b.data[b.pos:])
b.pos += 4
return int32(i)
}
func (b *buffer) readInt64() int64 {
i := binary.BigEndian.Uint64(b.data[b.pos:])
b.pos += 8
return int64(i)
}
func (b *buffer) readString() string {
strlen := int(b.readInt16())
b.pos += strlen
return string(b.data[b.pos-strlen : b.pos])
}