1
0
Fork 0
shezmu/consumer/consumer.go

77 lines
1.7 KiB
Go

package consumer
import (
"errors"
"fmt"
"github.com/localhots/caller"
"github.com/localhots/shezmu"
)
// Subscriber is the interface that is used by daemons to subscribe to messages.
type Subscriber interface {
Subscribe(consumerName, topic string) Streamer
}
// Streamer is the interface that wraps message consumers. Error handling
// should be provided by the implementation. Feel free to panic.
type Streamer interface {
Messages() <-chan []byte
Close()
}
// Publisher is the interface that wraps message publishers. Error handling
// should be provided by the implementation. Feel free to panic.
type Publisher interface {
Publish(topic string, msg []byte, meta interface{})
Close()
}
// Consumer extends Shezmu's BaseDaemon with pub/sub features.
type Consumer struct {
shezmu.BaseDaemon
publisher Publisher
subscriber Subscriber
}
var (
errMissingSubscriber = errors.New("subscriber is not set up")
errMissingPublisher = errors.New("publisher is not set up")
)
// Publish sends a message to the publisher.
func (c *Consumer) Publish(topic string, msg []byte, meta interface{}) {
if c.publisher == nil {
panic(errMissingPublisher)
}
c.publisher.Publish(topic, msg, meta)
}
// Subscribe subscriasdsdfsdgdfgdfsg sdgsdfg sdfgs dfgdfgdfg.
func (c *Consumer) Subscribe(topic string, fun interface{}) {
name := fmt.Sprintf("subscription for topic %q", topic)
c.SystemProcess(name, func() {
if c.subscriber == nil {
panic(errMissingSubscriber)
}
stream := c.subscriber.Subscribe(c.String(), topic)
defer stream.Close()
cf, err := caller.New(fun)
if err != nil {
panic(err)
}
for {
select {
case msg := <-stream.Messages():
c.Process(func() { cf.Call(msg) })
case <-c.ShutdownRequested():
return
}
}
})
}