1
0
Fork 0
shezmu/daemon.go

178 lines
4.3 KiB
Go
Raw Normal View History

2015-10-14 00:50:43 +00:00
package satan
2015-10-14 00:01:47 +00:00
2015-10-14 00:02:22 +00:00
import (
2015-10-17 00:40:17 +00:00
"errors"
2015-10-14 00:02:22 +00:00
"fmt"
"log"
2015-10-14 00:02:22 +00:00
"strings"
"time"
2015-10-17 00:40:17 +00:00
2015-10-17 02:33:46 +00:00
"github.com/juju/ratelimit"
2015-10-17 00:40:17 +00:00
"github.com/localhots/satan/caller"
2015-10-14 00:02:22 +00:00
)
2015-10-14 00:01:47 +00:00
// Daemon is the interface that contains a set of methods required to be
// implemented in order to be treated as a daemon.
type Daemon interface {
2015-10-14 00:29:44 +00:00
// Startup implementation should:
2015-10-14 00:01:47 +00:00
//
// func (d *DaemonName) Startup() {
2015-10-14 00:29:44 +00:00
// // 1. Set up a panic handler
2015-10-14 00:01:47 +00:00
// b.HandlePanics(func() {
// log.Error("Oh, crap!")
// })
//
// // 2. If the daemon is doing some IO it is a good idea to limit the
// // rate of its execution
// b.LimitRate(10, 1 * time.Second)
//
// // 3. If the daemon is also a consumer we need to subscribe for
2015-10-14 00:29:44 +00:00
// // topics that would be consumed by the daemon
2015-10-14 00:01:47 +00:00
// b.Subscribe("ProductPriceUpdates", func(p PriceUpdate) {
// log.Printf("Price for %q is now $%.2f", p.Product, p.Amount)
// })
// }
Startup()
// Shutdown implementation should clean up all daemon related stuff:
// close channels, process the last batch of items, etc.
Shutdown()
// base is a (hack) function that allows the Daemon interface to reference
// underlying BaseDaemon structure.
base() *BaseDaemon
}
2015-10-14 00:02:22 +00:00
// BaseDaemon is the parent structure for all daemons.
type BaseDaemon struct {
2015-10-17 00:40:17 +00:00
subscribeFunc SubscribeFunc
publisher Publisher
self Daemon
name string
queue chan<- *task
2015-10-18 00:22:07 +00:00
panicHandler PanicHandler
2015-10-17 00:40:17 +00:00
shutdown chan struct{}
2015-10-17 02:33:46 +00:00
limit *ratelimit.Bucket
2015-10-14 00:02:22 +00:00
}
2015-10-18 00:22:07 +00:00
// PanicHandler is a function that handles panics. Duh!
type PanicHandler func(interface{})
2015-10-17 00:40:17 +00:00
var (
errMissingSubscriptionFun = errors.New("subscription function is not set up")
errMissingPublisher = errors.New("publisher is not set up")
)
2015-10-14 00:02:22 +00:00
// Process creates a task and then adds it to processing queue.
func (d *BaseDaemon) Process(a Actor) {
2015-10-17 02:33:46 +00:00
if d.limit != nil {
d.limit.Wait(1)
}
d.queue <- &task{
daemon: d.self,
actor: a,
createdAt: time.Now(),
}
}
// SystemProcess creates a system task that is restarted in case of failure
// and then adds it to processing queue.
func (d *BaseDaemon) SystemProcess(name string, a Actor) {
d.queue <- &task{
daemon: d.self,
2015-10-14 00:02:22 +00:00
actor: a,
createdAt: time.Now(),
system: true,
name: name,
2015-10-14 01:11:29 +00:00
}
2015-10-14 00:02:22 +00:00
}
2015-10-17 00:40:17 +00:00
// Subscribe subscriasdsdfsdgdfgdfsg sdgsdfg sdfgs dfgdfgdfg.
func (d *BaseDaemon) Subscribe(topic string, fun interface{}) {
2015-10-18 00:22:07 +00:00
name := fmt.Sprintf("Subscription for topic %q", topic)
d.SystemProcess(name, func() {
2015-10-17 00:40:17 +00:00
if d.subscribeFunc == nil {
panic(errMissingSubscriptionFun)
}
stream := d.subscribeFunc(d.String(), topic)
defer stream.Close()
cf, err := caller.New(fun)
if err != nil {
panic(err)
}
for {
select {
case msg := <-stream.Messages():
d.Process(func() { cf.Call(msg) })
case <-d.shutdown:
return
}
}
})
}
// Publish sends a message to the publisher.
func (d *BaseDaemon) Publish(msg []byte) {
if d.publisher == nil {
panic(errMissingPublisher)
}
d.publisher.Publish(msg)
}
2015-10-17 02:37:13 +00:00
// LimitRate limits the daemons' processing rate.
2015-10-17 02:33:46 +00:00
func (d *BaseDaemon) LimitRate(times int, per time.Duration) {
rate := float64(time.Second) / float64(per) * float64(times)
2015-10-17 02:37:13 +00:00
if rate <= 0 {
2015-10-17 02:33:46 +00:00
log.Println("Daemon %s processing rate was limited to %d. Using 1 instead", d.base(), rate)
rate = 1.0
}
log.Printf("Daemon %s processing rate is limited to %.2f ops/s", d.base(), rate)
d.limit = ratelimit.NewBucketWithRate(rate, 1)
}
2015-10-14 00:02:22 +00:00
// HandlePanics sets up a panic handler function for the daemon.
2015-10-18 00:22:07 +00:00
func (d *BaseDaemon) HandlePanics(f PanicHandler) {
d.panicHandler = f
2015-10-14 00:02:22 +00:00
}
// ShutdownRequested returns a channel that is closed the moment daemon shutdown
// is requested.
func (d *BaseDaemon) ShutdownRequested() <-chan struct{} {
return d.shutdown
2015-10-14 00:02:22 +00:00
}
2015-10-15 23:27:03 +00:00
// Continue returns true if daemon should proceed and false if it should stop.
func (d *BaseDaemon) Continue() bool {
select {
case <-d.shutdown:
return false
2015-10-15 23:27:03 +00:00
default:
return true
}
2015-10-14 00:02:22 +00:00
}
// String returns the name of the Deamon unerlying struct.
func (d *BaseDaemon) String() string {
if d.name == "" {
d.name = strings.Split(fmt.Sprintf("%T", d.self), ".")[1]
2015-10-14 00:02:22 +00:00
}
return d.name
2015-10-14 00:02:22 +00:00
}
// base is a (hack) function that allows the Daemon interface to reference
// underlying BaseDaemon structure.
func (d *BaseDaemon) base() *BaseDaemon {
return d
2015-10-14 00:02:22 +00:00
}
2015-10-23 23:40:20 +00:00
func (d *BaseDaemon) handlePanic(err interface{}) {
if d.panicHandler != nil {
2015-10-18 00:22:07 +00:00
d.panicHandler(err)
2015-10-14 00:02:22 +00:00
}
}