1
0
Fork 0
shezmu/daemon.go

115 lines
3.0 KiB
Go
Raw Normal View History

2015-10-14 00:50:43 +00:00
package satan
2015-10-14 00:01:47 +00:00
2015-10-14 00:02:22 +00:00
import (
"fmt"
"log"
"runtime/debug"
2015-10-14 00:02:22 +00:00
"strings"
"time"
)
2015-10-14 00:01:47 +00:00
// Daemon is the interface that contains a set of methods required to be
// implemented in order to be treated as a daemon.
type Daemon interface {
2015-10-14 00:29:44 +00:00
// Startup implementation should:
2015-10-14 00:01:47 +00:00
//
// func (d *DaemonName) Startup() {
2015-10-14 00:29:44 +00:00
// // 1. Set up a panic handler
2015-10-14 00:01:47 +00:00
// b.HandlePanics(func() {
// log.Error("Oh, crap!")
// })
//
2015-10-14 00:29:44 +00:00
// // 2. If the daemon is also a consumer we need to subscribe for
// // topics that would be consumed by the daemon
2015-10-14 00:01:47 +00:00
// b.Subscribe("ProductPriceUpdates", func(p PriceUpdate) {
// log.Printf("Price for %q is now $%.2f", p.Product, p.Amount)
// })
//
2015-10-14 00:29:44 +00:00
// // 3. If the daemon is doing some IO it is a good idea to limit the
// // rate of its execution
// b.LimitRate(10, 1 * time.Second)
2015-10-14 00:01:47 +00:00
// }
Startup()
// Shutdown implementation should clean up all daemon related stuff:
// close channels, process the last batch of items, etc.
Shutdown()
// base is a (hack) function that allows the Daemon interface to reference
// underlying BaseDaemon structure.
base() *BaseDaemon
}
2015-10-14 00:02:22 +00:00
// Actor is a function that could be executed by daemon workers.
type Actor func()
// BaseDaemon is the parent structure for all daemons.
type BaseDaemon struct {
self Daemon
name string
stats *statistics
2015-10-14 01:11:29 +00:00
queue chan<- *task
2015-10-14 00:02:22 +00:00
panicHandler func()
shutdown chan struct{}
}
// Process creates a task and then adds it to processing queue.
func (b *BaseDaemon) Process(a Actor) {
2015-10-14 01:11:29 +00:00
b.queue <- &task{
2015-10-14 00:02:22 +00:00
daemon: b.self,
actor: a,
createdAt: time.Now(),
2015-10-14 01:11:29 +00:00
}
2015-10-14 00:02:22 +00:00
}
// HandlePanics sets up a panic handler function for the daemon.
func (b *BaseDaemon) HandlePanics(f func()) {
b.panicHandler = f
}
// ShutdownRequested returns a channel that is closed the moment daemon shutdown
// is requested.
func (b *BaseDaemon) ShutdownRequested() <-chan struct{} {
return b.shutdown
}
// ShouldShutdown returns true if daemon should shutdown and false otherwise.
func (b *BaseDaemon) ShouldShutdown() bool {
return b.shutdown == nil
}
// String returns the name of the Deamon unerlying struct.
func (b *BaseDaemon) String() string {
if b.name == "" {
b.name = strings.Split(fmt.Sprintf("%T", b.self), ".")[1]
}
return b.name
}
// initialize saves a reference to the child daemon which is then used to print
// the daemons' name. It also initializes other struct fields.
2015-10-14 01:11:29 +00:00
func (b *BaseDaemon) initialize(self Daemon, queue chan<- *task) {
2015-10-14 00:02:22 +00:00
b.self = self
b.stats = newStatistics()
2015-10-14 01:11:29 +00:00
b.queue = queue
2015-10-14 00:02:22 +00:00
b.shutdown = make(chan struct{})
}
// base is a (hack) function that allows the Daemon interface to reference
// underlying BaseDaemon structure.
func (b *BaseDaemon) base() *BaseDaemon {
return b
}
func (b *BaseDaemon) handlePanic() {
if err := recover(); err != nil {
b.stats.registerError()
if b.panicHandler != nil {
b.panicHandler()
}
log.Printf("Daemon %s recovered from panic. Error: %v\n", b, err)
debug.PrintStack()
2015-10-14 00:02:22 +00:00
}
}