1
0
Fork 0

Introduce Satan

This commit is contained in:
Gregory Eremin 2015-10-14 04:11:29 +03:00
parent 583ac5da5b
commit a2bfadfa66
3 changed files with 99 additions and 7 deletions

View File

@ -48,18 +48,18 @@ type BaseDaemon struct {
self Daemon
name string
stats *statistics
enqueue func(*task)
queue chan<- *task
panicHandler func()
shutdown chan struct{}
}
// Process creates a task and then adds it to processing queue.
func (b *BaseDaemon) Process(a Actor) {
b.enqueue(&task{
b.queue <- &task{
daemon: b.self,
actor: a,
createdAt: time.Now(),
})
}
}
// HandlePanics sets up a panic handler function for the daemon.
@ -89,10 +89,10 @@ func (b *BaseDaemon) String() string {
// initialize saves a reference to the child daemon which is then used to print
// the daemons' name. It also initializes other struct fields.
func (b *BaseDaemon) initialize(self Daemon, enqueue func(*task)) {
func (b *BaseDaemon) initialize(self Daemon, queue chan<- *task) {
b.self = self
b.stats = newStatistics()
b.enqueue = enqueue
b.queue = queue
b.shutdown = make(chan struct{})
}

View File

@ -21,8 +21,8 @@ func main() {
s := satan.Summon()
s.AddDaemon(&daemons.NumberPrinter{})
s.Start()
defer s.Stop()
s.StartDaemons()
defer s.StopDaemons()
sig := make(chan os.Signal)
signal.Notify(sig, os.Interrupt)

92
satan.go Normal file
View File

@ -0,0 +1,92 @@
package satan
import (
"log"
"sync"
"time"
)
// Satan is the master daemon.
type Satan struct {
daemons []Daemon
queue chan *task
wg sync.WaitGroup
latency *statistics
shutdown chan struct{}
}
type task struct {
daemon Daemon
actor Actor
createdAt time.Time
}
const (
defaultNumWorkers = 10
)
// Summon creates a new instance of Satan.
func Summon() *Satan {
return &Satan{
queue: make(chan *task),
latency: newStatistics(),
shutdown: make(chan struct{}),
}
}
// AddDaemon adds a new daemon.
func (s *Satan) AddDaemon(d Daemon) {
d.base().initialize(d, s.queue)
go d.Startup()
s.daemons = append(s.daemons, d)
}
// StartDaemons starts all registered daemons.
func (s *Satan) StartDaemons() {
s.wg.Add(defaultNumWorkers)
for i := 0; i < defaultNumWorkers; i++ {
go func(i int) {
s.runWorker(i)
s.wg.Done()
}(i)
}
}
// StopDaemons stops all running daemons.
func (s *Satan) StopDaemons() {
for _, d := range s.daemons {
close(d.base().shutdown)
d.Shutdown()
log.Printf("%s daemon performace statistics:\n%s\n",
d.base(), d.base().stats.snapshot())
}
close(s.queue)
s.wg.Wait()
log.Printf("Task processing latency statistics:\n%s\n", s.latency.snapshot())
}
func (s *Satan) runWorker(i int) {
log.Printf("Starting worker #%d", i+1)
defer log.Printf("Worker #%d has stopped", i+1)
for t := range s.queue {
dur := time.Now().UnixNano() - t.createdAt.UnixNano()
s.latency.add(time.Duration(dur))
log.Printf("Daemon #%d got some job to do!", i+1)
s.processTask(t)
}
}
func (s *Satan) processTask(t *task) {
defer t.daemon.base().handlePanic()
start := time.Now()
t.actor()
dur := time.Now().UnixNano() - start.UnixNano()
t.daemon.base().stats.add(time.Duration(dur))
}