1
0
Fork 0

Implement Kafka messaging backend

This commit is contained in:
Gregory Eremin 2015-10-16 02:17:39 +03:00
parent 4db43004ae
commit 06589b9ba2
2 changed files with 168 additions and 0 deletions

13
backend/backend.go Normal file
View File

@ -0,0 +1,13 @@
package backend
// Backend is the interface that should be implemented by Satan backends.
type Backend interface {
NewConsumer(topic string, partition int32, offset int64) (c Consumer, err error)
Publish(topic string, msg []byte) error
Close() error
}
// Consumer is the interface that should be implemented by backend consumer.
type Consumer interface {
NextMessage() (msg []byte, err error)
}

155
backend/kafka/kafka.go Normal file
View File

@ -0,0 +1,155 @@
package kafka
import (
"errors"
"fmt"
"github.com/Shopify/sarama"
"github.com/localhots/satan/backend"
)
// Client is the Kafka adapter client for Satan.
type Client struct {
client sarama.Client
producer sarama.SyncProducer
consumer sarama.Consumer
pconsumers map[string]*Consumer
shutdown chan struct{}
}
// Consumer is a wrapper for Kafka partition consumer.
type Consumer struct {
Topic string
Partition int32
Offset int64
consumer sarama.PartitionConsumer
shutdown chan struct{}
}
var (
// ErrConsumerNotInitialized is returned when NewConsumer is called before
// initializing Kafka consumer.
ErrConsumerNotInitialized = errors.New("consumer is not initialized")
// ErrProducerNotInitialized is returned when Publish is called before
// initializing Kafka producer.
ErrProducerNotInitialized = errors.New("producer is not initialized")
)
// New creates a new instance of Client.
func New(id string, brokers []string) (c *Client, err error) {
conf := sarama.NewConfig()
conf.ClientID = id
conf.Consumer.Return.Errors = true
client, err := sarama.NewClient(brokers, conf)
if err != nil {
return nil, err
}
return &Client{
client: client,
pconsumers: make(map[string]*Consumer),
shutdown: make(chan struct{}),
}, nil
}
// InitializeProducer initializes Kafka producer.
func (c *Client) InitializeProducer() error {
if c.producer == nil {
var err error
if c.producer, err = sarama.NewSyncProducerFromClient(c.client); err != nil {
return err
}
}
return nil
}
// InitializeConsumer initializes Kafka consumer.
func (c *Client) InitializeConsumer() error {
if c.consumer == nil {
var err error
if c.consumer, err = sarama.NewConsumerFromClient(c.client); err != nil {
return err
}
}
return nil
}
// NewConsumer creates a new partition consumer.
func (c *Client) NewConsumer(topic string, partition int32, offset int64) (cb backend.Consumer, err error) {
if c.consumer == nil {
return nil, ErrConsumerNotInitialized
}
pcons, err := c.consumer.ConsumePartition(topic, partition, offset)
if err != nil {
return nil, err
}
cons := &Consumer{
Topic: topic,
Partition: partition,
Offset: offset,
consumer: pcons,
shutdown: make(chan struct{}),
}
name := fmt.Sprintf("%s-%d-#%d", topic, partition, len(c.pconsumers)+1)
c.pconsumers[name] = cons
return cons, nil
}
// Publish sends a message to producer.
func (c *Client) Publish(topic string, body []byte) error {
if c.producer == nil {
return ErrProducerNotInitialized
}
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(body),
}
_, _, err := c.producer.SendMessage(msg)
return err
}
// Close shuts down Kafka producer and consumers.
func (c *Client) Close() error {
for _, pc := range c.pconsumers {
if err := pc.Close(); err != nil {
return err
}
}
if err := c.consumer.Close(); err != nil {
return err
}
if err := c.producer.Close(); err != nil {
return err
}
return nil
}
// NextMessage returns a pair of next message and next error. If both are nil,
// there are no messages left.
func (c *Consumer) NextMessage() (msg []byte, err error) {
select {
case err := <-c.consumer.Errors():
return nil, err.Err
case msg := <-c.consumer.Messages():
c.Offset = msg.Offset
return msg.Value, nil
case <-c.shutdown:
return nil, nil
}
}
// Close shuts down partition consumer.
func (c *Consumer) Close() error {
return c.consumer.Close()
}