From 58007b38ab90dfd182da6b7197239f09305cb16a Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Sun, 10 Jan 2016 21:01:53 +0300 Subject: [PATCH] Add Subscriber interface as a replacement for SubscribeFunc --- daemon.go | 28 ++++++++++++++-------------- example/kafka/kafka.go | 5 ++++- satan.go | 18 ++++++++++-------- 3 files changed, 28 insertions(+), 23 deletions(-) diff --git a/daemon.go b/daemon.go index 972b53d..48ad5bf 100644 --- a/daemon.go +++ b/daemon.go @@ -45,23 +45,23 @@ type Daemon interface { // BaseDaemon is the parent structure for all daemons. type BaseDaemon struct { - self Daemon - name string - queue chan<- *task - logger *log.Logger - panicHandler PanicHandler - subscribeFunc SubscribeFunc - publisher Publisher - shutdown chan struct{} - limit *ratelimit.Bucket + self Daemon + name string + queue chan<- *task + logger *log.Logger + panicHandler PanicHandler + subscriber Subscriber + publisher Publisher + shutdown chan struct{} + limit *ratelimit.Bucket } // PanicHandler is a function that handles panics. Duh! type PanicHandler func(interface{}) var ( - errMissingSubscriptionFun = errors.New("subscription function is not set up") - errMissingPublisher = errors.New("publisher is not set up") + errMissingSubscriber = errors.New("subscriber is not set up") + errMissingPublisher = errors.New("publisher is not set up") ) // Process creates a task and then adds it to processing queue. @@ -92,11 +92,11 @@ func (d *BaseDaemon) SystemProcess(name string, a Actor) { func (d *BaseDaemon) Subscribe(topic string, fun interface{}) { name := fmt.Sprintf("subscription for topic %q", topic) d.SystemProcess(name, func() { - if d.subscribeFunc == nil { - panic(errMissingSubscriptionFun) + if d.subscriber == nil { + panic(errMissingSubscriber) } - stream := d.subscribeFunc(d.String(), topic) + stream := d.subscriber.Subscribe(d.String(), topic) defer stream.Close() cf, err := caller.New(fun) diff --git a/example/kafka/kafka.go b/example/kafka/kafka.go index b2978bd..5c5acc6 100644 --- a/example/kafka/kafka.go +++ b/example/kafka/kafka.go @@ -18,6 +18,9 @@ type ConsumerState struct { Offset int64 `json:"offset"` } +// Subscriber is a dummy structure that implements satan.Subscriber interface. +type Subscriber struct{} + // Stream is an implementation of satan.Stremer for Kafka messaging queue. type Stream struct { messages chan []byte @@ -68,7 +71,7 @@ func Shutdown() { } // Subscribe creates a satan.Streamer implementation for Kafka messaging queue. -func Subscribe(consumer, topic string) satan.Streamer { +func (s Subscriber) Subscribe(consumer, topic string) satan.Streamer { c, ok := consumers[consumer] if !ok { panic(fmt.Errorf("Consumer %q has no config", consumer)) diff --git a/satan.go b/satan.go index 7c52e70..78ac167 100644 --- a/satan.go +++ b/satan.go @@ -13,11 +13,11 @@ import ( // Satan is the master daemon. type Satan struct { - SubscribeFunc SubscribeFunc - Publisher Publisher - DaemonStats stats.Publisher - Logger *log.Logger - NumWorkers int + Subscriber Subscriber + Publisher Publisher + DaemonStats stats.Publisher + Logger *log.Logger + NumWorkers int daemons []Daemon queue chan *task @@ -32,8 +32,10 @@ type Satan struct { // Actor is a function that could be executed by daemon workers. type Actor func() -// SubscribeFunc is a function that is used by daemons to subscribe to messages. -type SubscribeFunc func(consumer, topic string) Streamer +// Subscriber is the interface that is used by daemons to subscribe to messages. +type Subscriber interface { + Subscribe(consumer, topic string) Streamer +} // Streamer is the interface that wraps message consumers. Error handling // should be provided by the implementation. Feel free to panic. @@ -78,7 +80,7 @@ func Summon() *Satan { func (s *Satan) AddDaemon(d Daemon) { base := d.base() base.self = d - base.subscribeFunc = s.SubscribeFunc + base.subscriber = s.Subscriber base.publisher = s.Publisher base.queue = s.queue base.logger = s.Logger