From bc1f5cf474888294f28d29c3ca4a0655b9aabad9 Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Mon, 30 Jul 2018 21:50:16 +0200 Subject: [PATCH] Add simple http server and dashboard prototype --- README.md | 2 +- cmd/{ => printer}/main.go | 0 cmd/server/main.go | 207 ++++++++++++++++++++++++++++++++++++++ decoder.go | 5 + 4 files changed, 213 insertions(+), 1 deletion(-) rename cmd/{ => printer}/main.go (100%) create mode 100644 cmd/server/main.go diff --git a/README.md b/README.md index 8f3ddd8..125607b 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ So far it is only cable of printing parsed messages. For usage eample take a look at the main command. ``` -go run cmd/main.go -brokers 127.0.0.1:9092 +go run cmd/printer/main.go -brokers 127.0.0.1:9092 ``` ### Design diff --git a/cmd/main.go b/cmd/printer/main.go similarity index 100% rename from cmd/main.go rename to cmd/printer/main.go diff --git a/cmd/server/main.go b/cmd/server/main.go new file mode 100644 index 0000000..e0007e9 --- /dev/null +++ b/cmd/server/main.go @@ -0,0 +1,207 @@ +package main + +import ( + "bytes" + "flag" + "fmt" + "log" + "net/http" + "os" + "sort" + "strings" + "sync" + "time" + + "html/template" + + "github.com/localhots/koff" +) + +type clusterState struct { + consumerOffsets map[string]koff.OffsetMessage + consumerGroups map[string]koff.GroupMessage +} + +var state = &clusterState{ + consumerOffsets: make(map[string]koff.OffsetMessage), + consumerGroups: make(map[string]koff.GroupMessage), +} + +var lock sync.Mutex + +func main() { + brokers := flag.String("brokers", "", "Comma separated list of brokers") + flag.Parse() + + if *brokers == "" { + fmt.Println("Brokers list required") + flag.Usage() + os.Exit(1) + } + + c, err := koff.NewConsumer(strings.Split(*brokers, ","), false) + if err != nil { + log.Fatalf("Failed to create consumer: %v", err) + } + defer c.Close() + + go func() { + for msg := range c.Messages() { + state.add(msg) + } + }() + + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Write(state.render()) + }) + http.ListenAndServe(":8080", nil) +} + +func (s *clusterState) add(msg koff.Message) { + lock.Lock() + defer lock.Unlock() + + if msg.OffsetMessage != nil { + cur, ok := s.consumerOffsets[msg.Consumer] + if !ok || cur.CommittedAt.Before(msg.OffsetMessage.CommittedAt) { + s.consumerOffsets[msg.Consumer] = *msg.OffsetMessage + } + } else { + cur, ok := s.consumerGroups[msg.Consumer] + if !ok || (msg.GroupMessage.GenerationID > cur.GenerationID && msg.GroupMessage.Complete()) { + s.consumerGroups[msg.Consumer] = *msg.GroupMessage + } + } +} + +// +// Render +// + +var htmlTpl = template.Must(template.New("main").Parse(` + + +Kafka Consumers + + + + + + +

Consumer Offsets

+ + + + + + + + +{{range .ConsumerOffsets}} + + + + + + + +{{end}} +
ConsumerTopicPartitionOffsetTimestamp
{{.Consumer}}{{.Topic}}{{.Partition}}{{.Offset}}{{.Timestamp}}
+
+ +

Consumer Groups

+ +{{range .ConsumerGroups}} + + + + + + + + + + +{{range .Members}} +{{$id := .ID}} +{{range .Assignment}} + + + + + + +{{end}} +{{end}} +
{{.Consumer}}
ConsumerTopicPartitionLeader
{{$id}}{{.Topic}}{{.Partition}}Yes
+
+{{end}} + + + +`)) + +type offsetMessage struct { + Consumer string + Timestamp string + koff.OffsetMessage +} +type groupMessage struct { + Consumer string + koff.GroupMessage +} + +func (s *clusterState) render() []byte { + var tpl struct { + ConsumerOffsets []offsetMessage + ConsumerGroups []groupMessage + } + + lock.Lock() + defer lock.Unlock() + + for k, m := range s.consumerOffsets { + if strings.HasPrefix(k, "console-consumer") { + continue + } + tpl.ConsumerOffsets = append(tpl.ConsumerOffsets, offsetMessage{ + Consumer: k, + OffsetMessage: m, + Timestamp: m.CommittedAt.Format(time.Stamp), + }) + } + sort.Slice(tpl.ConsumerOffsets, func(i, j int) bool { + return tpl.ConsumerOffsets[i].Consumer < tpl.ConsumerOffsets[j].Consumer + }) + + for k, m := range s.consumerGroups { + // if strings.HasPrefix(k, "console-consumer") { + // continue + // } + tpl.ConsumerGroups = append(tpl.ConsumerGroups, groupMessage{ + Consumer: k, + GroupMessage: m, + }) + } + sort.Slice(tpl.ConsumerGroups, func(i, j int) bool { + return tpl.ConsumerGroups[i].Consumer < tpl.ConsumerGroups[j].Consumer + }) + + var buf bytes.Buffer + err := htmlTpl.Execute(&buf, tpl) + if err != nil { + panic(err) + } + + return buf.Bytes() +} diff --git a/decoder.go b/decoder.go index 03f3d37..230f357 100644 --- a/decoder.go +++ b/decoder.go @@ -76,6 +76,11 @@ func Decode(ctx context.Context, key, val []byte) Message { return m } +// Complete returns true if message is complete. +func (gm GroupMessage) Complete() bool { + return gm.LeaderID != "" +} + // Key structure: // [2] Version, uint16 big endian // [2] Consumer name length, uint16 big endian