1
0
Fork 0

Move consumer features into a subpackage

This commit is contained in:
Gregory Eremin 2016-07-26 22:55:36 +02:00
parent 3d68c57224
commit a7db215c45
3 changed files with 81 additions and 72 deletions

76
consumer/consumer.go Normal file
View File

@ -0,0 +1,76 @@
package consumer
import (
"errors"
"fmt"
"github.com/localhots/caller"
"github.com/localhots/shezmu"
)
// Subscriber is the interface that is used by daemons to subscribe to messages.
type Subscriber interface {
Subscribe(consumerName, topic string) Streamer
}
// Streamer is the interface that wraps message consumers. Error handling
// should be provided by the implementation. Feel free to panic.
type Streamer interface {
Messages() <-chan []byte
Close()
}
// Publisher is the interface that wraps message publishers. Error handling
// should be provided by the implementation. Feel free to panic.
type Publisher interface {
Publish(topic string, msg []byte, meta interface{})
Close()
}
// Consumer extends Shezmu's BaseDaemon with pub/sub features.
type Consumer struct {
shezmu.BaseDaemon
publisher Publisher
subscriber Subscriber
}
var (
errMissingSubscriber = errors.New("subscriber is not set up")
errMissingPublisher = errors.New("publisher is not set up")
)
// Publish sends a message to the publisher.
func (c *Consumer) Publish(topic string, msg []byte, meta interface{}) {
if c.publisher == nil {
panic(errMissingPublisher)
}
c.publisher.Publish(topic, msg, meta)
}
// Subscribe subscriasdsdfsdgdfgdfsg sdgsdfg sdfgs dfgdfgdfg.
func (c *Consumer) Subscribe(topic string, fun interface{}) {
name := fmt.Sprintf("subscription for topic %q", topic)
c.SystemProcess(name, func() {
if c.subscriber == nil {
panic(errMissingSubscriber)
}
stream := c.subscriber.Subscribe(c.String(), topic)
defer stream.Close()
cf, err := caller.New(fun)
if err != nil {
panic(err)
}
for {
select {
case msg := <-stream.Messages():
c.Process(func() { cf.Call(msg) })
case <-c.ShutdownRequested():
return
}
}
})
}

View File

@ -1,13 +1,11 @@
package shezmu
import (
"errors"
"fmt"
"strings"
"time"
"github.com/juju/ratelimit"
"github.com/localhots/shezmu/caller"
)
// Daemon is the interface that contains a set of methods required to be
@ -52,8 +50,6 @@ type BaseDaemon struct {
queue chan<- *task
logger Logger
panicHandler PanicHandler
subscriber Subscriber
publisher Publisher
shutdown chan struct{}
limit *ratelimit.Bucket
}
@ -61,11 +57,6 @@ type BaseDaemon struct {
// PanicHandler is a function that handles panics. Duh!
type PanicHandler func(interface{})
var (
errMissingSubscriber = errors.New("subscriber is not set up")
errMissingPublisher = errors.New("publisher is not set up")
)
// Process creates a task and then adds it to processing queue.
func (d *BaseDaemon) Process(a Actor) {
if d.limit != nil {
@ -76,12 +67,17 @@ func (d *BaseDaemon) Process(a Actor) {
daemon: d.self,
actor: a,
createdAt: time.Now(),
name: "Actor",
})
}
// SystemProcess creates a system task that is restarted in case of failure
// and then adds it to processing queue.
func (d *BaseDaemon) SystemProcess(name string, a Actor) {
if name == "" {
name = "SystemProcess"
}
d.tryEnqueue(&task{
daemon: d.self,
actor: a,
@ -91,42 +87,6 @@ func (d *BaseDaemon) SystemProcess(name string, a Actor) {
})
}
// Subscribe subscriasdsdfsdgdfgdfsg sdgsdfg sdfgs dfgdfgdfg.
func (d *BaseDaemon) Subscribe(topic string, fun interface{}) {
name := fmt.Sprintf("subscription for topic %q", topic)
d.SystemProcess(name, func() {
if d.subscriber == nil {
panic(errMissingSubscriber)
}
stream := d.subscriber.Subscribe(d.String(), topic)
defer stream.Close()
cf, err := caller.New(fun)
if err != nil {
panic(err)
}
for {
select {
case msg := <-stream.Messages():
d.Process(func() { cf.Call(msg) })
case <-d.shutdown:
return
}
}
})
}
// Publish sends a message to the publisher.
func (d *BaseDaemon) Publish(topic string, msg []byte, meta interface{}) {
if d.publisher == nil {
panic(errMissingPublisher)
}
d.publisher.Publish(topic, msg, meta)
}
// LimitRate limits the daemons' processing rate.
func (d *BaseDaemon) LimitRate(times int, per time.Duration) {
rate := float64(time.Second) / float64(per) * float64(times)

View File

@ -13,8 +13,6 @@ import (
// Shezmu is the master daemon.
type Shezmu struct {
Subscriber Subscriber
Publisher Publisher
DaemonStats stats.Publisher
Logger Logger
NumWorkers int
@ -32,25 +30,6 @@ type Shezmu struct {
// Actor is a function that could be executed by daemon workers.
type Actor func()
// Subscriber is the interface that is used by daemons to subscribe to messages.
type Subscriber interface {
Subscribe(consumerName, topic string) Streamer
}
// Streamer is the interface that wraps message consumers. Error handling
// should be provided by the implementation. Feel free to panic.
type Streamer interface {
Messages() <-chan []byte
Close()
}
// Publisher is the interface that wraps message publishers. Error handling
// should be provided by the implementation. Feel free to panic.
type Publisher interface {
Publish(topic string, msg []byte, meta interface{})
Close()
}
// Logger is the interface that implements minimal logging functions.
type Logger interface {
Printf(format string, v ...interface{})
@ -88,8 +67,6 @@ func Summon() *Shezmu {
func (s *Shezmu) AddDaemon(d Daemon) {
base := d.base()
base.self = d
base.subscriber = s.Subscriber
base.publisher = s.Publisher
base.queue = s.queue
base.logger = s.Logger
base.shutdown = s.shutdownSystem
@ -232,9 +209,5 @@ func (s *Shezmu) processGeneralTask(t *task) {
}
func (t *task) String() string {
if t.name == "" {
return fmt.Sprintf("[unnamed %s process]", t.daemon)
}
return fmt.Sprintf("%s[%s]", t.daemon, t.name)
}