Make shutdown handling predictable

This commit is contained in:
Gregory Eremin 2015-10-18 03:22:07 +03:00
parent 559886f297
commit d847be21c6
6 changed files with 99 additions and 76 deletions

View File

@ -52,11 +52,14 @@ type BaseDaemon struct {
name string name string
stats *statistics stats *statistics
queue chan<- *task queue chan<- *task
panicHandler func() panicHandler PanicHandler
shutdown chan struct{} shutdown chan struct{}
limit *ratelimit.Bucket limit *ratelimit.Bucket
} }
// PanicHandler is a function that handles panics. Duh!
type PanicHandler func(interface{})
var ( var (
errMissingSubscriptionFun = errors.New("subscription function is not set up") errMissingSubscriptionFun = errors.New("subscription function is not set up")
errMissingPublisher = errors.New("publisher is not set up") errMissingPublisher = errors.New("publisher is not set up")
@ -88,7 +91,7 @@ func (d *BaseDaemon) SystemProcess(name string, a Actor) {
// Subscribe subscriasdsdfsdgdfgdfsg sdgsdfg sdfgs dfgdfgdfg. // Subscribe subscriasdsdfsdgdfgdfsg sdgsdfg sdfgs dfgdfgdfg.
func (d *BaseDaemon) Subscribe(topic string, fun interface{}) { func (d *BaseDaemon) Subscribe(topic string, fun interface{}) {
name := fmt.Sprintf("%s subscription for topic %q", d.String(), topic) name := fmt.Sprintf("Subscription for topic %q", topic)
d.SystemProcess(name, func() { d.SystemProcess(name, func() {
if d.subscribeFunc == nil { if d.subscribeFunc == nil {
panic(errMissingSubscriptionFun) panic(errMissingSubscriptionFun)
@ -134,7 +137,7 @@ func (d *BaseDaemon) LimitRate(times int, per time.Duration) {
} }
// HandlePanics sets up a panic handler function for the daemon. // HandlePanics sets up a panic handler function for the daemon.
func (d *BaseDaemon) HandlePanics(f func()) { func (d *BaseDaemon) HandlePanics(f PanicHandler) {
d.panicHandler = f d.panicHandler = f
} }
@ -169,10 +172,15 @@ func (d *BaseDaemon) base() *BaseDaemon {
return d return d
} }
func (d *BaseDaemon) handlePanic(err interface{}) { func (d *BaseDaemon) handlePanic() {
err := recover()
if err == nil {
return
}
d.stats.registerError() d.stats.registerError()
if d.panicHandler != nil { if d.panicHandler != nil {
d.panicHandler() d.panicHandler(err)
} }
log.Printf("Daemon %s recovered from a panic\nError: %v\n", d, err) log.Printf("Daemon %s recovered from a panic\nError: %v\n", d, err)
debug.PrintStack() debug.PrintStack()

View File

@ -15,12 +15,12 @@ type NumberPrinter struct {
// Startup sets up panic handler and starts enqueuing number printing jobs. // Startup sets up panic handler and starts enqueuing number printing jobs.
func (n *NumberPrinter) Startup() { func (n *NumberPrinter) Startup() {
n.HandlePanics(func() { n.HandlePanics(func(err interface{}) {
log.Println("Oh, crap! There was a panic, take a look:") log.Printf("Oh, crap! There was a panic, take a look: %v", err)
}) })
n.SystemProcess("Random Number Generator", n.generateNumbers)
n.LimitRate(1, 2*time.Second) n.LimitRate(1, 2*time.Second)
n.SystemProcess("Random Number Generator", n.generateNumbers)
} }
// Shutdown is empty due to the lack of cleanup. // Shutdown is empty due to the lack of cleanup.

View File

@ -19,10 +19,10 @@ type PriceUpdate struct {
// Startup creates a new subscription for ProductPriceUpdates topic. // Startup creates a new subscription for ProductPriceUpdates topic.
func (p *PriceConsumer) Startup() { func (p *PriceConsumer) Startup() {
p.LimitRate(1, 500*time.Millisecond)
b.Subscribe("ProductPriceUpdates", func(u PriceUpdate) { b.Subscribe("ProductPriceUpdates", func(u PriceUpdate) {
log.Printf("Price for %q is now $%.2f", u.Product, u.Amount) log.Printf("Price for %q is now $%.2f", u.Product, u.Amount)
}) })
p.LimitRate(1, 500*time.Millisecond)
} }
// Shutdown is empty because PriceConsumer requires no cleanup upon exiting. // Shutdown is empty because PriceConsumer requires no cleanup upon exiting.

View File

@ -88,6 +88,7 @@ func Subscribe(consumer, topic string) satan.Streamer {
case err := <-pc.Errors(): case err := <-pc.Errors():
log.Println("Kafka error:", err.Error()) log.Println("Kafka error:", err.Error())
case <-stream.shutdown: case <-stream.shutdown:
pc.Close()
return return
} }
} }

View File

@ -1,7 +1,9 @@
package satan package satan
import ( import (
"fmt"
"log" "log"
"runtime/debug"
"sync" "sync"
"time" "time"
) )
@ -13,9 +15,12 @@ type Satan struct {
daemons []Daemon daemons []Daemon
queue chan *task queue chan *task
shutdown chan struct{}
wg sync.WaitGroup
latency *statistics latency *statistics
wgWorkers sync.WaitGroup
wgSystem sync.WaitGroup
shutdownWorkers chan struct{}
shutdownSystem chan struct{}
} }
// Actor is a function that could be executed by daemon workers. // Actor is a function that could be executed by daemon workers.
@ -38,6 +43,14 @@ type Publisher interface {
Close() Close()
} }
type task struct {
daemon Daemon
actor Actor
createdAt time.Time
system bool
name string
}
const ( const (
defaultNumWorkers = 10 defaultNumWorkers = 10
) )
@ -47,7 +60,8 @@ func Summon() *Satan {
return &Satan{ return &Satan{
queue: make(chan *task), queue: make(chan *task),
latency: newStatistics(), latency: newStatistics(),
shutdown: make(chan struct{}), shutdownWorkers: make(chan struct{}),
shutdownSystem: make(chan struct{}),
} }
} }
@ -58,7 +72,7 @@ func (s *Satan) AddDaemon(d Daemon) {
base.subscribeFunc = s.SubscribeFunc base.subscribeFunc = s.SubscribeFunc
base.publisher = s.Publisher base.publisher = s.Publisher
base.queue = s.queue base.queue = s.queue
base.shutdown = make(chan struct{}) base.shutdown = s.shutdownSystem
base.stats = newStatistics() base.stats = newStatistics()
go d.Startup() go d.Startup()
@ -67,50 +81,94 @@ func (s *Satan) AddDaemon(d Daemon) {
// StartDaemons starts all registered daemons. // StartDaemons starts all registered daemons.
func (s *Satan) StartDaemons() { func (s *Satan) StartDaemons() {
s.wg.Add(defaultNumWorkers)
for i := 0; i < defaultNumWorkers; i++ { for i := 0; i < defaultNumWorkers; i++ {
go func(i int) { go func(i int) {
s.runWorker(i) s.runWorker(i)
s.wg.Done()
}(i) }(i)
} }
} }
// StopDaemons stops all running daemons. // StopDaemons stops all running daemons.
func (s *Satan) StopDaemons() { func (s *Satan) StopDaemons() {
close(s.shutdownSystem)
for _, d := range s.daemons { for _, d := range s.daemons {
close(d.base().shutdown)
d.Shutdown() d.Shutdown()
}
s.wgSystem.Wait()
close(s.shutdownWorkers)
s.wgWorkers.Wait()
close(s.queue)
for _, d := range s.daemons {
stats := d.base().stats.snapshot() stats := d.base().stats.snapshot()
log.Printf("%s daemon performace statistics:\n%s\n", d.base(), stats) log.Printf("%s daemon performace statistics:\n%s\n", d.base(), stats)
} }
close(s.shutdown)
s.wg.Wait()
close(s.queue)
log.Printf("Task processing latency statistics:\n%s\n", s.latency.snapshot()) log.Printf("Task processing latency statistics:\n%s\n", s.latency.snapshot())
} }
func (s *Satan) runWorker(i int) { func (s *Satan) runWorker(i int) {
s.wgWorkers.Add(1)
defer s.wgWorkers.Done()
log.Printf("Starting worker #%d", i+1) log.Printf("Starting worker #%d", i+1)
defer log.Printf("Worker #%d has stopped", i+1) defer log.Printf("Worker #%d has stopped", i+1)
for { for {
select { select {
case t := <-s.queue: case t := <-s.queue:
dur := time.Now().UnixNano() - t.createdAt.UnixNano() s.processTask(t)
s.latency.add(time.Duration(dur))
if restart := t.process(); restart {
s.queue <- t
}
default: default:
select { select {
case <-s.shutdown: case <-s.shutdownWorkers:
return return
default: default:
} }
} }
} }
} }
func (s *Satan) processTask(t *task) {
dur := time.Now().UnixNano() - t.createdAt.UnixNano()
s.latency.add(time.Duration(dur))
if t.system {
s.processSystemTask(t)
} else {
s.processGeneralTask(t)
}
}
func (s *Satan) processSystemTask(t *task) {
s.wgSystem.Add(1)
defer s.wgSystem.Done()
defer func() {
if err := recover(); err != nil {
log.Printf("System task %s recovered from a panic\nError: %v\n", t, err)
debug.PrintStack()
s.queue <- t // Restarting task
} else {
log.Printf("System task %s has stopped\n", t)
}
}()
log.Printf("Starting system task %s\n", t)
t.actor() // <--- THE ACTION HAPPENS HERE
}
func (s *Satan) processGeneralTask(t *task) {
defer t.daemon.base().handlePanic()
defer func(start time.Time) {
dur := time.Now().UnixNano() - start.UnixNano()
t.daemon.base().stats.add(time.Duration(dur))
}(time.Now())
t.actor() // <--- THE ACTION HAPPENS HERE
}
func (t *task) String() string {
if t.name == "" {
return fmt.Sprintf("[unnamed %s process]", t.daemon.base())
}
return fmt.Sprintf("%s[%s]", t.daemon.base(), t.name)
}

44
task.go
View File

@ -1,44 +0,0 @@
package satan
import (
"fmt"
"log"
"runtime/debug"
"time"
)
type task struct {
daemon Daemon
actor Actor
createdAt time.Time
system bool
name string
}
func (t *task) process() (restart bool) {
defer func(start time.Time) {
dur := time.Now().UnixNano() - start.UnixNano()
t.daemon.base().stats.add(time.Duration(dur))
if err := recover(); err != nil {
if t.system {
log.Printf("System process %s recovered from a panic\nError: %v\n", t, err)
debug.PrintStack()
restart = true
} else {
t.daemon.base().handlePanic(err)
}
}
}(time.Now())
t.actor() // <--- THE ACTION HAPPENS HERE
return
}
func (t *task) String() string {
if t.name == "" {
return fmt.Sprintf("[unnamed %s process]", t.daemon.base())
}
return fmt.Sprintf("%s[%s]", t.daemon.base(), t.name)
}