package satan import ( "fmt" "log" "os" "runtime/debug" "sync" "sync/atomic" "time" "github.com/localhots/satan/stats" ) // Satan is the master daemon. type Satan struct { SubscribeFunc SubscribeFunc Publisher Publisher DaemonStats stats.Publisher Logger *log.Logger MinNumWorkers uint32 MaxNumWorkers uint32 numWorkers int64 ScalePlan *ScalePlan daemons []Daemon queue chan *task runtimeStats stats.Manager wgWorkers sync.WaitGroup wgSystem sync.WaitGroup shutdownWorkers chan struct{} shutdownSystem chan struct{} } // Actor is a function that could be executed by daemon workers. type Actor func() // SubscribeFunc is a function that is used by daemons to subscribe to messages. type SubscribeFunc func(consumer, topic string) Streamer // Streamer is the interface that wraps message consumers. Error handling // should be provided by the implementation. Feel free to panic. type Streamer interface { Messages() <-chan []byte Close() } // Publisher is the interface that wraps message publishers. Error handling // should be provided by the implementation. Feel free to panic. type Publisher interface { Publish(msg []byte) Close() } type ScalePlan struct { Interval time.Duration MinProcessedTasks uint32 LatencyThreshold time.Duration TaskWaitThreshold time.Duration AdjustmentStep uint32 } type task struct { daemon Daemon actor Actor createdAt time.Time system bool name string } var ( workerIndex uint64 ) // Summon creates a new instance of Satan. func Summon() *Satan { return &Satan{ Logger: log.New(os.Stdout, "[daemons] ", log.LstdFlags), MinNumWorkers: 10, MaxNumWorkers: 1000, queue: make(chan *task), runtimeStats: stats.NewBasicStats(), shutdownWorkers: make(chan struct{}), shutdownSystem: make(chan struct{}), } } // AddDaemon adds a new daemon. func (s *Satan) AddDaemon(d Daemon) { base := d.base() base.self = d base.subscribeFunc = s.SubscribeFunc base.publisher = s.Publisher base.queue = s.queue base.logger = s.Logger base.shutdown = s.shutdownSystem go d.Startup() s.daemons = append(s.daemons, d) } // StartDaemons starts all registered daemons. func (s *Satan) StartDaemons() { s.addWorkers(s.MinNumWorkers) if s.ScalePlan != nil { go s.autoScale() } } // StopDaemons stops all running daemons. func (s *Satan) StopDaemons() { close(s.shutdownSystem) for _, d := range s.daemons { d.Shutdown() } s.wgSystem.Wait() close(s.shutdownWorkers) s.wgWorkers.Wait() close(s.queue) fmt.Println(s.runtimeStats.Fetch(stats.Latency)) fmt.Println(s.runtimeStats.Fetch(stats.TaskWait)) } func (s *Satan) addWorkers(num uint32) { for i := uint32(0); i < num; i++ { go s.runWorker() } } func (s *Satan) stopWorkers(num uint32) { for i := uint32(0); i < num; i++ { s.shutdownWorkers <- struct{}{} } } func (s *Satan) runWorker() { s.wgWorkers.Add(1) defer s.wgWorkers.Done() atomic.AddInt64(&s.numWorkers, 1) defer atomic.AddInt64(&s.numWorkers, -1) i := atomic.AddUint64(&workerIndex, 1) s.Logger.Printf("Starting worker #%d", i) defer func() { if err := recover(); err != nil { s.Logger.Printf("Worker #%d crashed. Error: %v\n", i, err) debug.PrintStack() go s.runWorker() // Restarting worker } }() for { start := time.Now() select { case t := <-s.queue: dur := time.Now().UnixNano() - start.UnixNano() s.runtimeStats.Add(stats.TaskWait, time.Duration(dur)) s.processTask(t) case <-s.shutdownWorkers: s.Logger.Printf("Worker #%d has stopped", i) return } } } func (s *Satan) processTask(t *task) { dur := time.Now().UnixNano() - t.createdAt.UnixNano() s.runtimeStats.Add(stats.Latency, time.Duration(dur)) if t.system { s.processSystemTask(t) } else { s.processGeneralTask(t) } } func (s *Satan) processSystemTask(t *task) { s.wgSystem.Add(1) defer s.wgSystem.Done() defer func() { if err := recover(); err != nil { s.Logger.Printf("System task %s recovered from a panic\nError: %v\n", t, err) debug.PrintStack() t.createdAt = time.Now() s.queue <- t // Restarting task } else { s.Logger.Printf("System task %s has stopped\n", t) } }() s.Logger.Printf("Starting system task %s\n", t) t.actor() // <--- ACTION STARTS HERE } func (s *Satan) processGeneralTask(t *task) { defer func() { if err := recover(); err != nil { if s.DaemonStats != nil { s.DaemonStats.Error(t.daemon.base().String()) } t.daemon.base().handlePanic(err) s.Logger.Printf("Daemon %s recovered from a panic\nError: %v\n", t.daemon.base(), err) debug.PrintStack() } }() if s.DaemonStats != nil { defer func(start time.Time) { dur := time.Now().UnixNano() - start.UnixNano() s.DaemonStats.Add(t.daemon.base().String(), time.Duration(dur)) }(time.Now()) } t.actor() // <--- ACTION STARTS HERE } func (s *Satan) autoScale() { t := time.NewTicker(s.ScalePlan.Interval) defer t.Stop() for { select { case <-t.C: s.adjustNumWorkers() case <-s.shutdownSystem: return } } } func (s *Satan) adjustNumWorkers() { lat := s.runtimeStats.Fetch(stats.Latency) tw := s.runtimeStats.Fetch(stats.TaskWait) if lat.Processed() < int64(s.ScalePlan.MinProcessedTasks) { return } if uint32(s.numWorkers)+s.ScalePlan.AdjustmentStep > s.MaxNumWorkers { return } if lat.P95() > float64(s.ScalePlan.LatencyThreshold) { s.addWorkers(s.ScalePlan.AdjustmentStep) s.runtimeStats.Reset() return } if uint32(s.numWorkers)-s.ScalePlan.AdjustmentStep < s.MinNumWorkers { return } if tw.P95() > float64(s.ScalePlan.TaskWaitThreshold) { s.stopWorkers(s.ScalePlan.AdjustmentStep) s.runtimeStats.Reset() return } } func (t *task) String() string { if t.name == "" { return fmt.Sprintf("[unnamed %s process]", t.daemon) } return fmt.Sprintf("%s[%s]", t.daemon, t.name) }