diff --git a/README.md b/README.md
index 8f3ddd8..125607b 100644
--- a/README.md
+++ b/README.md
@@ -13,7 +13,7 @@ So far it is only cable of printing parsed messages. For usage eample take a
look at the main command.
```
-go run cmd/main.go -brokers 127.0.0.1:9092
+go run cmd/printer/main.go -brokers 127.0.0.1:9092
```
### Design
diff --git a/cmd/main.go b/cmd/printer/main.go
similarity index 100%
rename from cmd/main.go
rename to cmd/printer/main.go
diff --git a/cmd/server/main.go b/cmd/server/main.go
new file mode 100644
index 0000000..e0007e9
--- /dev/null
+++ b/cmd/server/main.go
@@ -0,0 +1,207 @@
+package main
+
+import (
+ "bytes"
+ "flag"
+ "fmt"
+ "log"
+ "net/http"
+ "os"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+
+ "html/template"
+
+ "github.com/localhots/koff"
+)
+
+type clusterState struct {
+ consumerOffsets map[string]koff.OffsetMessage
+ consumerGroups map[string]koff.GroupMessage
+}
+
+var state = &clusterState{
+ consumerOffsets: make(map[string]koff.OffsetMessage),
+ consumerGroups: make(map[string]koff.GroupMessage),
+}
+
+var lock sync.Mutex
+
+func main() {
+ brokers := flag.String("brokers", "", "Comma separated list of brokers")
+ flag.Parse()
+
+ if *brokers == "" {
+ fmt.Println("Brokers list required")
+ flag.Usage()
+ os.Exit(1)
+ }
+
+ c, err := koff.NewConsumer(strings.Split(*brokers, ","), false)
+ if err != nil {
+ log.Fatalf("Failed to create consumer: %v", err)
+ }
+ defer c.Close()
+
+ go func() {
+ for msg := range c.Messages() {
+ state.add(msg)
+ }
+ }()
+
+ http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
+ w.Write(state.render())
+ })
+ http.ListenAndServe(":8080", nil)
+}
+
+func (s *clusterState) add(msg koff.Message) {
+ lock.Lock()
+ defer lock.Unlock()
+
+ if msg.OffsetMessage != nil {
+ cur, ok := s.consumerOffsets[msg.Consumer]
+ if !ok || cur.CommittedAt.Before(msg.OffsetMessage.CommittedAt) {
+ s.consumerOffsets[msg.Consumer] = *msg.OffsetMessage
+ }
+ } else {
+ cur, ok := s.consumerGroups[msg.Consumer]
+ if !ok || (msg.GroupMessage.GenerationID > cur.GenerationID && msg.GroupMessage.Complete()) {
+ s.consumerGroups[msg.Consumer] = *msg.GroupMessage
+ }
+ }
+}
+
+//
+// Render
+//
+
+var htmlTpl = template.Must(template.New("main").Parse(`
+
+
+Kafka Consumers
+
+
+
+
+
+
+Consumer Offsets
+
+
+ Consumer |
+ Topic |
+ Partition |
+ Offset |
+ Timestamp |
+
+{{range .ConsumerOffsets}}
+
+ {{.Consumer}} |
+ {{.Topic}} |
+ {{.Partition}} |
+ {{.Offset}} |
+ {{.Timestamp}} |
+
+{{end}}
+
+
+
+Consumer Groups
+
+{{range .ConsumerGroups}}
+
+
+ {{.Consumer}} |
+
+
+ Consumer |
+ Topic |
+ Partition |
+ Leader |
+
+{{range .Members}}
+{{$id := .ID}}
+{{range .Assignment}}
+
+ {{$id}} |
+ {{.Topic}} |
+ {{.Partition}} |
+ Yes |
+
+{{end}}
+{{end}}
+
+
+{{end}}
+
+
+
+`))
+
+type offsetMessage struct {
+ Consumer string
+ Timestamp string
+ koff.OffsetMessage
+}
+type groupMessage struct {
+ Consumer string
+ koff.GroupMessage
+}
+
+func (s *clusterState) render() []byte {
+ var tpl struct {
+ ConsumerOffsets []offsetMessage
+ ConsumerGroups []groupMessage
+ }
+
+ lock.Lock()
+ defer lock.Unlock()
+
+ for k, m := range s.consumerOffsets {
+ if strings.HasPrefix(k, "console-consumer") {
+ continue
+ }
+ tpl.ConsumerOffsets = append(tpl.ConsumerOffsets, offsetMessage{
+ Consumer: k,
+ OffsetMessage: m,
+ Timestamp: m.CommittedAt.Format(time.Stamp),
+ })
+ }
+ sort.Slice(tpl.ConsumerOffsets, func(i, j int) bool {
+ return tpl.ConsumerOffsets[i].Consumer < tpl.ConsumerOffsets[j].Consumer
+ })
+
+ for k, m := range s.consumerGroups {
+ // if strings.HasPrefix(k, "console-consumer") {
+ // continue
+ // }
+ tpl.ConsumerGroups = append(tpl.ConsumerGroups, groupMessage{
+ Consumer: k,
+ GroupMessage: m,
+ })
+ }
+ sort.Slice(tpl.ConsumerGroups, func(i, j int) bool {
+ return tpl.ConsumerGroups[i].Consumer < tpl.ConsumerGroups[j].Consumer
+ })
+
+ var buf bytes.Buffer
+ err := htmlTpl.Execute(&buf, tpl)
+ if err != nil {
+ panic(err)
+ }
+
+ return buf.Bytes()
+}
diff --git a/decoder.go b/decoder.go
index 03f3d37..230f357 100644
--- a/decoder.go
+++ b/decoder.go
@@ -76,6 +76,11 @@ func Decode(ctx context.Context, key, val []byte) Message {
return m
}
+// Complete returns true if message is complete.
+func (gm GroupMessage) Complete() bool {
+ return gm.LeaderID != ""
+}
+
// Key structure:
// [2] Version, uint16 big endian
// [2] Consumer name length, uint16 big endian