1
0
Fork 0

Add consumer support to daemons

This commit is contained in:
Gregory Eremin 2015-10-17 03:40:17 +03:00
parent 8eac0b59fb
commit dff6c81d04
1 changed files with 51 additions and 19 deletions

View File

@ -1,11 +1,14 @@
package satan package satan
import ( import (
"errors"
"fmt" "fmt"
"log" "log"
"runtime/debug" "runtime/debug"
"strings" "strings"
"time" "time"
"github.com/localhots/satan/caller"
) )
// Daemon is the interface that contains a set of methods required to be // Daemon is the interface that contains a set of methods required to be
@ -38,14 +41,12 @@ type Daemon interface {
// base is a (hack) function that allows the Daemon interface to reference // base is a (hack) function that allows the Daemon interface to reference
// underlying BaseDaemon structure. // underlying BaseDaemon structure.
base() *BaseDaemon base() *BaseDaemon
// initialize is also a hack that is used by the Satan to initialize
// base daemon fields.
initialize(self Daemon, queue chan<- *task)
} }
// BaseDaemon is the parent structure for all daemons. // BaseDaemon is the parent structure for all daemons.
type BaseDaemon struct { type BaseDaemon struct {
subscribeFunc SubscribeFunc
publisher Publisher
self Daemon self Daemon
name string name string
stats *statistics stats *statistics
@ -54,6 +55,11 @@ type BaseDaemon struct {
shutdown chan struct{} shutdown chan struct{}
} }
var (
errMissingSubscriptionFun = errors.New("subscription function is not set up")
errMissingPublisher = errors.New("publisher is not set up")
)
// Process creates a task and then adds it to processing queue. // Process creates a task and then adds it to processing queue.
func (d *BaseDaemon) Process(a Actor) { func (d *BaseDaemon) Process(a Actor) {
d.enqueue(a, false) d.enqueue(a, false)
@ -74,6 +80,41 @@ func (d *BaseDaemon) enqueue(a Actor, system bool) {
} }
} }
// Subscribe subscriasdsdfsdgdfgdfsg sdgsdfg sdfgs dfgdfgdfg.
func (d *BaseDaemon) Subscribe(topic string, fun interface{}) {
d.SystemProcess(func() {
if d.subscribeFunc == nil {
panic(errMissingSubscriptionFun)
}
stream := d.subscribeFunc(d.String(), topic)
defer stream.Close()
cf, err := caller.New(fun)
if err != nil {
panic(err)
}
for {
select {
case msg := <-stream.Messages():
d.Process(func() { cf.Call(msg) })
case <-d.shutdown:
return
}
}
})
}
// Publish sends a message to the publisher.
func (d *BaseDaemon) Publish(msg []byte) {
if d.publisher == nil {
panic(errMissingPublisher)
}
d.publisher.Publish(msg)
}
// HandlePanics sets up a panic handler function for the daemon. // HandlePanics sets up a panic handler function for the daemon.
func (d *BaseDaemon) HandlePanics(f func()) { func (d *BaseDaemon) HandlePanics(f func()) {
d.panicHandler = f d.panicHandler = f
@ -104,15 +145,6 @@ func (d *BaseDaemon) String() string {
return d.name return d.name
} }
// initialize saves a reference to the child daemon which is then used to print
// the daemons' name. It also initializes other struct fields.
func (d *BaseDaemon) initialize(self Daemon, queue chan<- *task) {
d.self = self
d.stats = newStatistics()
d.queue = queue
d.shutdown = make(chan struct{})
}
// base is a (hack) function that allows the Daemon interface to reference // base is a (hack) function that allows the Daemon interface to reference
// underlying BaseDaemon structure. // underlying BaseDaemon structure.
func (d *BaseDaemon) base() *BaseDaemon { func (d *BaseDaemon) base() *BaseDaemon {