System processes are now named and supervised

This commit is contained in:
Gregory Eremin 2015-10-17 04:51:05 +03:00
parent a8443577d2
commit 40dbc556f5
3 changed files with 53 additions and 28 deletions

View File

@ -62,27 +62,29 @@ var (
// Process creates a task and then adds it to processing queue. // Process creates a task and then adds it to processing queue.
func (d *BaseDaemon) Process(a Actor) { func (d *BaseDaemon) Process(a Actor) {
d.enqueue(a, false)
}
// SystemProcess creates a system task that is restarted in case of failure
// and then adds it to processing queue.
func (d *BaseDaemon) SystemProcess(a Actor) {
d.enqueue(a, true)
}
func (d *BaseDaemon) enqueue(a Actor, system bool) {
d.queue <- &task{ d.queue <- &task{
daemon: d.self, daemon: d.self,
actor: a, actor: a,
createdAt: time.Now(), createdAt: time.Now(),
system: system, }
}
// SystemProcess creates a system task that is restarted in case of failure
// and then adds it to processing queue.
func (d *BaseDaemon) SystemProcess(name string, a Actor) {
d.queue <- &task{
daemon: d.self,
actor: a,
createdAt: time.Now(),
system: true,
name: name,
} }
} }
// Subscribe subscriasdsdfsdgdfgdfsg sdgsdfg sdfgs dfgdfgdfg. // Subscribe subscriasdsdfsdgdfgdfsg sdgsdfg sdfgs dfgdfgdfg.
func (d *BaseDaemon) Subscribe(topic string, fun interface{}) { func (d *BaseDaemon) Subscribe(topic string, fun interface{}) {
d.SystemProcess(func() { name := fmt.Sprintf("%s subscription for topic %q", d.String(), topic)
d.SystemProcess(name, func() {
if d.subscribeFunc == nil { if d.subscribeFunc == nil {
panic(errMissingSubscriptionFun) panic(errMissingSubscriptionFun)
} }
@ -151,13 +153,11 @@ func (d *BaseDaemon) base() *BaseDaemon {
return d return d
} }
func (d *BaseDaemon) handlePanic() { func (d *BaseDaemon) handlePanic(err interface{}) {
if err := recover(); err != nil {
d.stats.registerError() d.stats.registerError()
if d.panicHandler != nil { if d.panicHandler != nil {
d.panicHandler() d.panicHandler()
} }
log.Printf("Daemon %s recovered from panic. Error: %v\n", d, err) log.Printf("Daemon %s recovered from a panic\nError: %v\n", d, err)
debug.PrintStack() debug.PrintStack()
} }
}

View File

@ -19,14 +19,17 @@ func (n *NumberPrinter) Startup() {
log.Println("Oh, crap! There was a panic, take a look:") log.Println("Oh, crap! There was a panic, take a look:")
}) })
n.SystemProcess(n.enqueue) n.SystemProcess("Random Number Generator", n.generateNumbers)
} }
// Shutdown is empty due to the lack of cleanup. // Shutdown is empty due to the lack of cleanup.
func (n *NumberPrinter) Shutdown() {} func (n *NumberPrinter) Shutdown() {}
func (n *NumberPrinter) enqueue() { func (n *NumberPrinter) generateNumbers() {
for n.Continue() { for n.Continue() {
if rand.Intn(7) == 0 {
panic("Number generator don't work on Sundays!")
}
// Generate a random number between 1000 and 9999 and print it // Generate a random number between 1000 and 9999 and print it
num := 1000 + rand.Intn(9000) num := 1000 + rand.Intn(9000)
n.Process(n.makeActor(num)) n.Process(n.makeActor(num))
@ -39,10 +42,10 @@ func (n *NumberPrinter) enqueue() {
func (n *NumberPrinter) makeActor(num int) satan.Actor { func (n *NumberPrinter) makeActor(num int) satan.Actor {
return func() { return func() {
// Making it crash sometimes // Making it crash sometimes
if rand.Intn(20) == 0 { if rand.Intn(10) == 0 {
panic("Noooooooooo!") panic("Nooooo! Random number generator returned a zero!")
} }
log.Println("NumberPrinter says", num) log.Println("Number printer says:", num)
} }
} }

View File

@ -1,7 +1,9 @@
package satan package satan
import ( import (
"fmt"
"log" "log"
"runtime/debug"
"sync" "sync"
"time" "time"
) )
@ -43,6 +45,7 @@ type task struct {
actor Actor actor Actor
createdAt time.Time createdAt time.Time
system bool system bool
name string
} }
const ( const (
@ -108,8 +111,6 @@ func (s *Satan) runWorker(i int) {
case t := <-s.queue: case t := <-s.queue:
dur := time.Now().UnixNano() - t.createdAt.UnixNano() dur := time.Now().UnixNano() - t.createdAt.UnixNano()
s.latency.add(time.Duration(dur)) s.latency.add(time.Duration(dur))
// log.Printf("Daemon #%d got some job to do!", i+1)
s.processTask(t) s.processTask(t)
default: default:
select { select {
@ -122,7 +123,20 @@ func (s *Satan) runWorker(i int) {
} }
func (s *Satan) processTask(t *task) { func (s *Satan) processTask(t *task) {
defer t.daemon.base().handlePanic() defer func() {
if err := recover(); err != nil {
if t.system {
log.Printf("System process %s recovered from a panic\nError: %v\n", t, err)
debug.PrintStack()
// Restarting system task
s.queue <- t
} else {
t.daemon.base().handlePanic(err)
}
}
}()
start := time.Now() start := time.Now()
t.actor() // <--- THE ACTION HAPPENS HERE t.actor() // <--- THE ACTION HAPPENS HERE
@ -130,3 +144,11 @@ func (s *Satan) processTask(t *task) {
dur := time.Now().UnixNano() - start.UnixNano() dur := time.Now().UnixNano() - start.UnixNano()
t.daemon.base().stats.add(time.Duration(dur)) t.daemon.base().stats.add(time.Duration(dur))
} }
func (t *task) String() string {
if t.name == "" {
return fmt.Sprintf("[unnamed %s process]", t.daemon.base())
}
return fmt.Sprintf("%s[%s]", t.daemon.base(), t.name)
}