diff --git a/consumer/consumer.go b/consumer/consumer.go new file mode 100644 index 0000000..f67aec4 --- /dev/null +++ b/consumer/consumer.go @@ -0,0 +1,76 @@ +package consumer + +import ( + "errors" + "fmt" + + "github.com/localhots/caller" + "github.com/localhots/shezmu" +) + +// Subscriber is the interface that is used by daemons to subscribe to messages. +type Subscriber interface { + Subscribe(consumerName, topic string) Streamer +} + +// Streamer is the interface that wraps message consumers. Error handling +// should be provided by the implementation. Feel free to panic. +type Streamer interface { + Messages() <-chan []byte + Close() +} + +// Publisher is the interface that wraps message publishers. Error handling +// should be provided by the implementation. Feel free to panic. +type Publisher interface { + Publish(topic string, msg []byte, meta interface{}) + Close() +} + +// Consumer extends Shezmu's BaseDaemon with pub/sub features. +type Consumer struct { + shezmu.BaseDaemon + publisher Publisher + subscriber Subscriber +} + +var ( + errMissingSubscriber = errors.New("subscriber is not set up") + errMissingPublisher = errors.New("publisher is not set up") +) + +// Publish sends a message to the publisher. +func (c *Consumer) Publish(topic string, msg []byte, meta interface{}) { + if c.publisher == nil { + panic(errMissingPublisher) + } + + c.publisher.Publish(topic, msg, meta) +} + +// Subscribe subscriasdsdfsdgdfgdfsg sdgsdfg sdfgs dfgdfgdfg. +func (c *Consumer) Subscribe(topic string, fun interface{}) { + name := fmt.Sprintf("subscription for topic %q", topic) + c.SystemProcess(name, func() { + if c.subscriber == nil { + panic(errMissingSubscriber) + } + + stream := c.subscriber.Subscribe(c.String(), topic) + defer stream.Close() + + cf, err := caller.New(fun) + if err != nil { + panic(err) + } + + for { + select { + case msg := <-stream.Messages(): + c.Process(func() { cf.Call(msg) }) + case <-c.ShutdownRequested(): + return + } + } + }) +} diff --git a/daemon.go b/daemon.go index 2391b3a..23f3dd9 100644 --- a/daemon.go +++ b/daemon.go @@ -1,13 +1,11 @@ package shezmu import ( - "errors" "fmt" "strings" "time" "github.com/juju/ratelimit" - "github.com/localhots/shezmu/caller" ) // Daemon is the interface that contains a set of methods required to be @@ -52,8 +50,6 @@ type BaseDaemon struct { queue chan<- *task logger Logger panicHandler PanicHandler - subscriber Subscriber - publisher Publisher shutdown chan struct{} limit *ratelimit.Bucket } @@ -61,11 +57,6 @@ type BaseDaemon struct { // PanicHandler is a function that handles panics. Duh! type PanicHandler func(interface{}) -var ( - errMissingSubscriber = errors.New("subscriber is not set up") - errMissingPublisher = errors.New("publisher is not set up") -) - // Process creates a task and then adds it to processing queue. func (d *BaseDaemon) Process(a Actor) { if d.limit != nil { @@ -76,12 +67,17 @@ func (d *BaseDaemon) Process(a Actor) { daemon: d.self, actor: a, createdAt: time.Now(), + name: "Actor", }) } // SystemProcess creates a system task that is restarted in case of failure // and then adds it to processing queue. func (d *BaseDaemon) SystemProcess(name string, a Actor) { + if name == "" { + name = "SystemProcess" + } + d.tryEnqueue(&task{ daemon: d.self, actor: a, @@ -91,42 +87,6 @@ func (d *BaseDaemon) SystemProcess(name string, a Actor) { }) } -// Subscribe subscriasdsdfsdgdfgdfsg sdgsdfg sdfgs dfgdfgdfg. -func (d *BaseDaemon) Subscribe(topic string, fun interface{}) { - name := fmt.Sprintf("subscription for topic %q", topic) - d.SystemProcess(name, func() { - if d.subscriber == nil { - panic(errMissingSubscriber) - } - - stream := d.subscriber.Subscribe(d.String(), topic) - defer stream.Close() - - cf, err := caller.New(fun) - if err != nil { - panic(err) - } - - for { - select { - case msg := <-stream.Messages(): - d.Process(func() { cf.Call(msg) }) - case <-d.shutdown: - return - } - } - }) -} - -// Publish sends a message to the publisher. -func (d *BaseDaemon) Publish(topic string, msg []byte, meta interface{}) { - if d.publisher == nil { - panic(errMissingPublisher) - } - - d.publisher.Publish(topic, msg, meta) -} - // LimitRate limits the daemons' processing rate. func (d *BaseDaemon) LimitRate(times int, per time.Duration) { rate := float64(time.Second) / float64(per) * float64(times) diff --git a/shezmu.go b/shezmu.go index 9bf04d6..9169374 100644 --- a/shezmu.go +++ b/shezmu.go @@ -13,8 +13,6 @@ import ( // Shezmu is the master daemon. type Shezmu struct { - Subscriber Subscriber - Publisher Publisher DaemonStats stats.Publisher Logger Logger NumWorkers int @@ -32,25 +30,6 @@ type Shezmu struct { // Actor is a function that could be executed by daemon workers. type Actor func() -// Subscriber is the interface that is used by daemons to subscribe to messages. -type Subscriber interface { - Subscribe(consumerName, topic string) Streamer -} - -// Streamer is the interface that wraps message consumers. Error handling -// should be provided by the implementation. Feel free to panic. -type Streamer interface { - Messages() <-chan []byte - Close() -} - -// Publisher is the interface that wraps message publishers. Error handling -// should be provided by the implementation. Feel free to panic. -type Publisher interface { - Publish(topic string, msg []byte, meta interface{}) - Close() -} - // Logger is the interface that implements minimal logging functions. type Logger interface { Printf(format string, v ...interface{}) @@ -88,8 +67,6 @@ func Summon() *Shezmu { func (s *Shezmu) AddDaemon(d Daemon) { base := d.base() base.self = d - base.subscriber = s.Subscriber - base.publisher = s.Publisher base.queue = s.queue base.logger = s.Logger base.shutdown = s.shutdownSystem @@ -232,9 +209,5 @@ func (s *Shezmu) processGeneralTask(t *task) { } func (t *task) String() string { - if t.name == "" { - return fmt.Sprintf("[unnamed %s process]", t.daemon) - } - return fmt.Sprintf("%s[%s]", t.daemon, t.name) }