1
0
Fork 0
shezmu/satan.go

196 lines
4.0 KiB
Go
Raw Normal View History

2015-10-14 01:11:29 +00:00
package satan
import (
2015-10-18 00:22:07 +00:00
"fmt"
2015-10-14 01:11:29 +00:00
"log"
2015-10-18 00:22:07 +00:00
"runtime/debug"
2015-10-14 01:11:29 +00:00
"sync"
"sync/atomic"
2015-10-14 01:11:29 +00:00
"time"
)
// Satan is the master daemon.
type Satan struct {
2015-10-17 00:41:21 +00:00
SubscribeFunc SubscribeFunc
Publisher Publisher
2015-10-23 23:41:19 +00:00
Statistics Statistics
2015-10-17 00:41:21 +00:00
2015-10-18 00:22:07 +00:00
daemons []Daemon
queue chan *task
wgWorkers sync.WaitGroup
wgSystem sync.WaitGroup
shutdownWorkers chan struct{}
shutdownSystem chan struct{}
2015-10-14 01:11:29 +00:00
}
2015-10-15 23:07:04 +00:00
// Actor is a function that could be executed by daemon workers.
type Actor func()
2015-10-17 00:41:21 +00:00
// SubscribeFunc is a function that is used by daemons to subscribe to messages.
type SubscribeFunc func(consumer, topic string) Streamer
// Streamer is the interface that wraps message consumers. Error handling
// should be provided by the implementation. Feel free to panic.
type Streamer interface {
Messages() <-chan []byte
Close()
}
// Publisher is the interface that wraps message publishers. Error handling
// should be provided by the implementation. Feel free to panic.
type Publisher interface {
Publish(msg []byte)
Close()
}
2015-10-23 23:41:19 +00:00
type Statistics interface {
Add(daemonName string, dur time.Duration)
Error(daemonName string)
}
2015-10-18 00:22:07 +00:00
type task struct {
daemon Daemon
actor Actor
createdAt time.Time
system bool
name string
}
2015-10-14 01:11:29 +00:00
const (
defaultNumWorkers = 10
)
var (
workerIndex uint64
)
2015-10-14 01:11:29 +00:00
// Summon creates a new instance of Satan.
func Summon() *Satan {
return &Satan{
2015-10-18 00:22:07 +00:00
queue: make(chan *task),
shutdownWorkers: make(chan struct{}),
shutdownSystem: make(chan struct{}),
2015-10-14 01:11:29 +00:00
}
}
// AddDaemon adds a new daemon.
func (s *Satan) AddDaemon(d Daemon) {
2015-10-17 00:41:21 +00:00
base := d.base()
base.self = d
base.subscribeFunc = s.SubscribeFunc
base.publisher = s.Publisher
base.queue = s.queue
2015-10-18 00:22:07 +00:00
base.shutdown = s.shutdownSystem
2015-10-14 01:11:29 +00:00
2015-10-17 00:41:21 +00:00
go d.Startup()
2015-10-14 01:11:29 +00:00
s.daemons = append(s.daemons, d)
}
// StartDaemons starts all registered daemons.
func (s *Satan) StartDaemons() {
s.addWorkers(defaultNumWorkers)
2015-10-14 01:11:29 +00:00
}
// StopDaemons stops all running daemons.
func (s *Satan) StopDaemons() {
2015-10-18 00:22:07 +00:00
close(s.shutdownSystem)
2015-10-14 01:11:29 +00:00
for _, d := range s.daemons {
d.Shutdown()
}
2015-10-17 02:14:09 +00:00
2015-10-18 00:22:07 +00:00
s.wgSystem.Wait()
close(s.shutdownWorkers)
s.wgWorkers.Wait()
2015-10-15 23:07:04 +00:00
close(s.queue)
2015-10-14 01:11:29 +00:00
}
func (s *Satan) addWorkers(num int) {
for i := 0; i < num; i++ {
go s.runWorker()
}
}
func (s *Satan) stopWorkers(num int) {
for i := 0; i < num; i++ {
s.shutdownWorkers <- struct{}{}
}
}
func (s *Satan) runWorker() {
2015-10-18 00:22:07 +00:00
s.wgWorkers.Add(1)
defer s.wgWorkers.Done()
i := atomic.AddUint64(&workerIndex, 1)
2015-10-23 23:41:19 +00:00
log.Printf("Starting worker #%d", i)
defer log.Printf("Worker #%d has stopped", i)
2015-10-14 01:11:29 +00:00
for {
select {
case t := <-s.queue:
2015-10-18 00:22:07 +00:00
s.processTask(t)
case <-s.shutdownWorkers:
return
}
2015-10-14 01:11:29 +00:00
}
}
2015-10-18 00:22:07 +00:00
func (s *Satan) processTask(t *task) {
dur := time.Now().UnixNano() - t.createdAt.UnixNano()
2015-10-23 23:41:19 +00:00
s.Statistics.Add("Latency", time.Duration(dur))
2015-10-18 00:22:07 +00:00
if t.system {
s.processSystemTask(t)
} else {
s.processGeneralTask(t)
}
}
func (s *Satan) processSystemTask(t *task) {
s.wgSystem.Add(1)
defer s.wgSystem.Done()
defer func() {
if err := recover(); err != nil {
log.Printf("System task %s recovered from a panic\nError: %v\n", t, err)
debug.PrintStack()
t.createdAt = time.Now()
2015-10-18 00:22:07 +00:00
s.queue <- t // Restarting task
} else {
log.Printf("System task %s has stopped\n", t)
}
}()
log.Printf("Starting system task %s\n", t)
t.actor() // <--- THE ACTION HAPPENS HERE
}
func (s *Satan) processGeneralTask(t *task) {
2015-10-23 23:40:20 +00:00
defer func() {
if err := recover(); err != nil {
2015-10-23 23:41:19 +00:00
if s.Statistics != nil {
s.Statistics.Error(t.daemon.base().String())
}
2015-10-23 23:40:20 +00:00
t.daemon.base().handlePanic(err)
log.Printf("Daemon %s recovered from a panic\nError: %v\n", t.daemon.base(), err)
debug.PrintStack()
}
}()
2015-10-23 23:41:19 +00:00
if s.Statistics != nil {
defer func(start time.Time) {
dur := time.Now().UnixNano() - start.UnixNano()
s.Statistics.Add(t.daemon.base().String(), time.Duration(dur))
}(time.Now())
}
2015-10-18 00:22:07 +00:00
t.actor() // <--- THE ACTION HAPPENS HERE
}
func (t *task) String() string {
if t.name == "" {
return fmt.Sprintf("[unnamed %s process]", t.daemon.base())
}
return fmt.Sprintf("%s[%s]", t.daemon.base(), t.name)
}