1
0
Fork 0

Add Subscriber interface as a replacement for SubscribeFunc

This commit is contained in:
Gregory Eremin 2016-01-10 21:01:53 +03:00
parent adde6a54ab
commit 58007b38ab
3 changed files with 28 additions and 23 deletions

View File

@ -45,23 +45,23 @@ type Daemon interface {
// BaseDaemon is the parent structure for all daemons. // BaseDaemon is the parent structure for all daemons.
type BaseDaemon struct { type BaseDaemon struct {
self Daemon self Daemon
name string name string
queue chan<- *task queue chan<- *task
logger *log.Logger logger *log.Logger
panicHandler PanicHandler panicHandler PanicHandler
subscribeFunc SubscribeFunc subscriber Subscriber
publisher Publisher publisher Publisher
shutdown chan struct{} shutdown chan struct{}
limit *ratelimit.Bucket limit *ratelimit.Bucket
} }
// PanicHandler is a function that handles panics. Duh! // PanicHandler is a function that handles panics. Duh!
type PanicHandler func(interface{}) type PanicHandler func(interface{})
var ( var (
errMissingSubscriptionFun = errors.New("subscription function is not set up") errMissingSubscriber = errors.New("subscriber is not set up")
errMissingPublisher = errors.New("publisher is not set up") errMissingPublisher = errors.New("publisher is not set up")
) )
// Process creates a task and then adds it to processing queue. // Process creates a task and then adds it to processing queue.
@ -92,11 +92,11 @@ func (d *BaseDaemon) SystemProcess(name string, a Actor) {
func (d *BaseDaemon) Subscribe(topic string, fun interface{}) { func (d *BaseDaemon) Subscribe(topic string, fun interface{}) {
name := fmt.Sprintf("subscription for topic %q", topic) name := fmt.Sprintf("subscription for topic %q", topic)
d.SystemProcess(name, func() { d.SystemProcess(name, func() {
if d.subscribeFunc == nil { if d.subscriber == nil {
panic(errMissingSubscriptionFun) panic(errMissingSubscriber)
} }
stream := d.subscribeFunc(d.String(), topic) stream := d.subscriber.Subscribe(d.String(), topic)
defer stream.Close() defer stream.Close()
cf, err := caller.New(fun) cf, err := caller.New(fun)

View File

@ -18,6 +18,9 @@ type ConsumerState struct {
Offset int64 `json:"offset"` Offset int64 `json:"offset"`
} }
// Subscriber is a dummy structure that implements satan.Subscriber interface.
type Subscriber struct{}
// Stream is an implementation of satan.Stremer for Kafka messaging queue. // Stream is an implementation of satan.Stremer for Kafka messaging queue.
type Stream struct { type Stream struct {
messages chan []byte messages chan []byte
@ -68,7 +71,7 @@ func Shutdown() {
} }
// Subscribe creates a satan.Streamer implementation for Kafka messaging queue. // Subscribe creates a satan.Streamer implementation for Kafka messaging queue.
func Subscribe(consumer, topic string) satan.Streamer { func (s Subscriber) Subscribe(consumer, topic string) satan.Streamer {
c, ok := consumers[consumer] c, ok := consumers[consumer]
if !ok { if !ok {
panic(fmt.Errorf("Consumer %q has no config", consumer)) panic(fmt.Errorf("Consumer %q has no config", consumer))

View File

@ -13,11 +13,11 @@ import (
// Satan is the master daemon. // Satan is the master daemon.
type Satan struct { type Satan struct {
SubscribeFunc SubscribeFunc Subscriber Subscriber
Publisher Publisher Publisher Publisher
DaemonStats stats.Publisher DaemonStats stats.Publisher
Logger *log.Logger Logger *log.Logger
NumWorkers int NumWorkers int
daemons []Daemon daemons []Daemon
queue chan *task queue chan *task
@ -32,8 +32,10 @@ type Satan struct {
// Actor is a function that could be executed by daemon workers. // Actor is a function that could be executed by daemon workers.
type Actor func() type Actor func()
// SubscribeFunc is a function that is used by daemons to subscribe to messages. // Subscriber is the interface that is used by daemons to subscribe to messages.
type SubscribeFunc func(consumer, topic string) Streamer type Subscriber interface {
Subscribe(consumer, topic string) Streamer
}
// Streamer is the interface that wraps message consumers. Error handling // Streamer is the interface that wraps message consumers. Error handling
// should be provided by the implementation. Feel free to panic. // should be provided by the implementation. Feel free to panic.
@ -78,7 +80,7 @@ func Summon() *Satan {
func (s *Satan) AddDaemon(d Daemon) { func (s *Satan) AddDaemon(d Daemon) {
base := d.base() base := d.base()
base.self = d base.self = d
base.subscribeFunc = s.SubscribeFunc base.subscriber = s.Subscriber
base.publisher = s.Publisher base.publisher = s.Publisher
base.queue = s.queue base.queue = s.queue
base.logger = s.Logger base.logger = s.Logger