Move example Kafka implementation to a separate package

This commit is contained in:
Gregory Eremin 2015-10-17 04:10:16 +03:00
parent 6acd520f08
commit 290b1dfd4e
2 changed files with 30 additions and 25 deletions

View File

@ -1,4 +1,4 @@
package main package kafka
import ( import (
"encoding/json" "encoding/json"
@ -11,14 +11,14 @@ import (
"github.com/localhots/satan" "github.com/localhots/satan"
) )
// KafkaConsumerState contains data that is required to create a Kafka consumer. // ConsumerState contains data that is required to create a Kafka consumer.
type KafkaConsumerState struct { type ConsumerState struct {
Partition int32 `json:"partition"` Partition int32 `json:"partition"`
Offset int64 `json:"offset"` Offset int64 `json:"offset"`
} }
// KafkaStream is an implementation of satan.Stremer for Kafka messaging queue. // Stream is an implementation of satan.Stremer for Kafka messaging queue.
type KafkaStream struct { type Stream struct {
messages chan []byte messages chan []byte
shutdown chan struct{} shutdown chan struct{}
} }
@ -30,10 +30,11 @@ const (
var ( var (
kafkaClient sarama.Client kafkaClient sarama.Client
kafkaConsumer sarama.Consumer kafkaConsumer sarama.Consumer
consumers = map[string]map[string]KafkaConsumerState{} consumers = map[string]map[string]ConsumerState{}
) )
func initKafka(brokers []string) { // Initialize sets up the kafka package.
func Initialize(brokers []string) {
conf := sarama.NewConfig() conf := sarama.NewConfig()
conf.ClientID = "Satan Example" conf.ClientID = "Satan Example"
@ -44,9 +45,12 @@ func initKafka(brokers []string) {
if kafkaConsumer, err = sarama.NewConsumerFromClient(kafkaClient); err != nil { if kafkaConsumer, err = sarama.NewConsumerFromClient(kafkaClient); err != nil {
panic(err) panic(err)
} }
loadConsumerConfig()
} }
func shutdownKafka() { // Shutdown shuts down the kafka package.
func Shutdown() {
if err := kafkaConsumer.Close(); err != nil { if err := kafkaConsumer.Close(); err != nil {
panic(err) panic(err)
} }
@ -55,7 +59,8 @@ func shutdownKafka() {
} }
} }
func makeStream(consumer, topic string) satan.Streamer { // MakeStream creates a satan.Streamer implementation for Kafka messaging queue.
func MakeStream(consumer, topic string) satan.Streamer {
c, ok := consumers[consumer] c, ok := consumers[consumer]
if !ok { if !ok {
panic(fmt.Errorf("Consumer %q has no config", consumer)) panic(fmt.Errorf("Consumer %q has no config", consumer))
@ -70,7 +75,7 @@ func makeStream(consumer, topic string) satan.Streamer {
panic(err) panic(err)
} }
stream := &KafkaStream{ stream := &Stream{
messages: make(chan []byte), messages: make(chan []byte),
shutdown: make(chan struct{}), shutdown: make(chan struct{}),
} }
@ -91,6 +96,16 @@ func makeStream(consumer, topic string) satan.Streamer {
return stream return stream
} }
// Messages returns a channel that stream messages.
func (s *Stream) Messages() <-chan []byte {
return s.messages
}
// Close stops Kafka partition consumer.
func (s *Stream) Close() {
close(s.shutdown)
}
func loadConsumerConfig() { func loadConsumerConfig() {
if b, err := ioutil.ReadFile(consumerStateFile); err != nil { if b, err := ioutil.ReadFile(consumerStateFile); err != nil {
fmt.Println(`Kafka consumers state file was not found at ` + consumerStateFile + ` fmt.Println(`Kafka consumers state file was not found at ` + consumerStateFile + `
@ -111,13 +126,3 @@ Config file contents should look like this:
} }
} }
} }
// Messages returns a channel that stream messages.
func (s *KafkaStream) Messages() <-chan []byte {
return s.messages
}
// Close stops Kafka partition consumer.
func (s *KafkaStream) Close() {
close(s.shutdown)
}

View File

@ -10,6 +10,7 @@ import (
"github.com/localhots/satan" "github.com/localhots/satan"
"github.com/localhots/satan/example/daemons" "github.com/localhots/satan/example/daemons"
"github.com/localhots/satan/example/kafka"
) )
func main() { func main() {
@ -17,7 +18,7 @@ func main() {
var brokers string var brokers string
flag.BoolVar(&debug, "v", false, "Verbose mode") flag.BoolVar(&debug, "v", false, "Verbose mode")
flag.StringVar(&brokers, "brokers", "127.0.0.1:9092", "Kafka broker addresses (separated by space)") flag.StringVar(&brokers, "brokers", "127.0.0.1:9092", "Kafka broker addresses separated by space")
flag.Parse() flag.Parse()
log.SetOutput(ioutil.Discard) log.SetOutput(ioutil.Discard)
@ -25,12 +26,11 @@ func main() {
log.SetOutput(os.Stderr) log.SetOutput(os.Stderr)
} }
loadConsumerConfig() kafka.Initialize(strings.Split(brokers, " "))
initKafka(strings.Split(brokers, " ")) defer kafka.Shutdown()
defer shutdownKafka()
s := satan.Summon() s := satan.Summon()
s.SubscribeFunc = makeStream s.SubscribeFunc = kafka.MakeStream
s.AddDaemon(&daemons.NumberPrinter{}) s.AddDaemon(&daemons.NumberPrinter{})
s.AddDaemon(&daemons.PriceConsumer{}) s.AddDaemon(&daemons.PriceConsumer{})