1
0
Fork 0
koff/decoder.go

235 lines
5.1 KiB
Go

package koff
import (
"context"
"encoding/binary"
"runtime/debug"
"time"
"github.com/localhots/gobelt/log"
)
// Message is the main structure that wraps a consumer offsets topic message.
type Message struct {
Consumer string
OffsetMessage *OffsetMessage
GroupMessage *GroupMessage
}
// OffsetMessage is a kind of message that carries individual consumer offset.
type OffsetMessage struct {
Topic string
Partition int32
Offset int64
Metadata string
CommittedAt time.Time
ExpiresAt time.Time
}
// GroupMessage contains consumer group metadata.
type GroupMessage struct {
ProtocolType string
GenerationID int32
LeaderID string
Protocol string
Members []GroupMember
}
// GroupMember contains metadata for a consumer group member.
type GroupMember struct {
ID string
ClientID string
ClientHost string
SessionTimeout time.Duration
RebalanceTimeout time.Duration
Subscription []TopicAndPartition
Assignment []TopicAndPartition
}
// TopicAndPartition is a tuple of topic and partition.
type TopicAndPartition struct {
Topic string
Partition int32
}
// Decode decodes message key and value into an OffsetMessage.
func Decode(ctx context.Context, key, val []byte) Message {
var m Message
defer func() {
if err := recover(); err != nil {
log.Error(ctx, "Failed to decode group metadata", log.F{
"error": err,
"payload": val,
"message": m,
})
debug.PrintStack()
}
}()
m.decodeKey(key)
if m.OffsetMessage != nil {
m.decodeOffsetMessage(val)
} else {
m.decodeGroupMetadata(ctx, val)
}
return m
}
// Complete returns true if message is complete.
func (gm GroupMessage) Complete() bool {
return gm.LeaderID != ""
}
// Key structure:
// [2] Version, uint16 big endian
// [2] Consumer name length, uint16 big endian
// [^] Consumer name
// if Version is 0 or 1 {
// [2] Topic length, uint16 big endian
// [^] Topic
// [4] Partition, uint32 big endian
// }
func (m *Message) decodeKey(key []byte) {
buf := &buffer{data: key}
version := buf.readInt16()
m.Consumer = buf.readString()
if version < 2 {
m.OffsetMessage = &OffsetMessage{
Topic: buf.readString(),
Partition: buf.readInt32(),
}
}
}
// Value structure:
// [2] Version, uint16 big endian
// [8] Offset, uint32 big endian
// [2] Meta length, uint16 big endian
// [^] Meta
// [8] Commit time, unix timestamp with millisecond precision
// if Version is 1 {
// [8] Expire time, unix timestamp with millisecond precision
// }
func (m *Message) decodeOffsetMessage(val []byte) {
buf := &buffer{data: val}
om := m.OffsetMessage
version := buf.readInt16()
om.Offset = buf.readInt64()
om.Metadata = buf.readString()
om.CommittedAt = makeTime(buf.readInt64())
if version == 1 {
om.ExpiresAt = makeTime(buf.readInt64())
}
}
func (m *Message) decodeGroupMetadata(ctx context.Context, val []byte) {
buf := &buffer{data: val}
m.GroupMessage = &GroupMessage{Members: []GroupMember{}}
gm := m.GroupMessage
version := buf.readInt16()
if version > 1 {
return
}
gm.ProtocolType = buf.readString()
gm.GenerationID = buf.readInt32()
if gm.GenerationID > 1 {
// Messages with generation greater than one often lack remaining fields
// If followin sequence is like 0xFF 0xFF 0xFF 0xFF 0x00 0x00 ...
// then just skip that shit
next := buf.readInt32()
if next == -1 {
return
}
buf.pos -= 4
}
gm.LeaderID = buf.readString()
gm.Protocol = buf.readString()
arySize := int(buf.readInt32())
for i := 0; i < arySize; i++ {
gm.Members = append(gm.Members, GroupMember{
ID: buf.readString(),
ClientID: buf.readString(),
ClientHost: buf.readString(),
SessionTimeout: makeDur(buf.readInt32()),
RebalanceTimeout: makeDur(buf.readInt32()),
Subscription: readAssignment(buf),
Assignment: readAssignment(buf),
})
}
}
func readAssignment(buf *buffer) []TopicAndPartition {
ass := []TopicAndPartition{}
buf.skip(2) // Eh
size := buf.readInt32()
if size == 0 {
buf.skip(4)
return ass
}
for i := 0; i < int(size); i++ {
ass = append(ass, readTopicAndSubscription(buf))
buf.skip(4) // Hash key or smth
}
return ass
}
func readTopicAndSubscription(buf *buffer) TopicAndPartition {
return TopicAndPartition{
Topic: buf.readString(),
Partition: buf.readInt32(),
}
}
func makeTime(ts int64) time.Time {
return time.Unix(ts/1000, (ts%1000)*1000000)
}
func makeDur(to int32) time.Duration {
return time.Duration(to) * time.Millisecond
}
//
// Buffer
//
type buffer struct {
data []byte
pos int
}
func (b *buffer) skip(n int) {
b.pos += n
}
func (b *buffer) readBytes(n int) []byte {
b.pos += n
return b.data[b.pos-n : b.pos]
}
func (b *buffer) readInt16() int16 {
i := binary.BigEndian.Uint16(b.data[b.pos:])
b.pos += 2
return int16(i)
}
func (b *buffer) readInt32() int32 {
i := binary.BigEndian.Uint32(b.data[b.pos:])
b.pos += 4
return int32(i)
}
func (b *buffer) readInt64() int64 {
i := binary.BigEndian.Uint64(b.data[b.pos:])
b.pos += 8
return int64(i)
}
func (b *buffer) readString() string {
strlen := int(b.readInt16())
b.pos += strlen
return string(b.data[b.pos-strlen : b.pos])
}