1
0
Fork 0
shezmu/shezmu.go

239 lines
5.0 KiB
Go
Raw Normal View History

2016-01-24 16:49:57 +00:00
package shezmu
2015-10-14 01:11:29 +00:00
import (
2015-10-18 00:22:07 +00:00
"fmt"
2015-10-14 01:11:29 +00:00
"log"
2015-10-24 16:25:16 +00:00
"os"
2016-07-26 21:52:48 +00:00
"os/signal"
2015-10-18 00:22:07 +00:00
"runtime/debug"
2015-10-14 01:11:29 +00:00
"sync"
2016-07-26 21:52:48 +00:00
"syscall"
2015-10-14 01:11:29 +00:00
"time"
2016-01-24 16:49:57 +00:00
"github.com/localhots/shezmu/stats"
2015-10-14 01:11:29 +00:00
)
2016-01-24 16:49:57 +00:00
// Shezmu is the master daemon.
type Shezmu struct {
DaemonStats stats.Publisher
2016-01-17 13:42:05 +00:00
Logger Logger
NumWorkers int
daemons []Daemon
queue chan *task
runtimeStats stats.Manager
2015-10-18 00:22:07 +00:00
wgWorkers sync.WaitGroup
wgSystem sync.WaitGroup
shutdownWorkers chan struct{}
shutdownSystem chan struct{}
2015-10-14 01:11:29 +00:00
}
2015-10-15 23:07:04 +00:00
// Actor is a function that could be executed by daemon workers.
type Actor func()
2016-01-17 13:42:05 +00:00
// Logger is the interface that implements minimal logging functions.
type Logger interface {
Printf(format string, v ...interface{})
Println(v ...interface{})
}
2015-10-18 00:22:07 +00:00
type task struct {
daemon Daemon
actor Actor
createdAt time.Time
system bool
name string
}
const (
// DefaultNumWorkers is the default number of workers that would process
// tasks.
2015-10-27 00:56:41 +00:00
DefaultNumWorkers = 100
)
2016-01-24 16:49:57 +00:00
// Summon creates a new instance of Shezmu.
func Summon() *Shezmu {
return &Shezmu{
DaemonStats: &stats.Void{},
2016-07-19 15:29:55 +00:00
Logger: log.New(os.Stdout, "", log.LstdFlags),
NumWorkers: DefaultNumWorkers,
2015-10-27 00:42:00 +00:00
queue: make(chan *task),
runtimeStats: stats.NewBasicStats(),
shutdownWorkers: make(chan struct{}),
shutdownSystem: make(chan struct{}),
2015-10-14 01:11:29 +00:00
}
}
// AddDaemon adds a new daemon.
2016-01-24 16:49:57 +00:00
func (s *Shezmu) AddDaemon(d Daemon) {
2015-10-17 00:41:21 +00:00
base := d.base()
base.self = d
base.queue = s.queue
2015-10-24 16:25:16 +00:00
base.logger = s.Logger
2015-10-18 00:22:07 +00:00
base.shutdown = s.shutdownSystem
2015-10-14 01:11:29 +00:00
s.daemons = append(s.daemons, d)
}
2016-01-24 22:39:48 +00:00
// ClearDaemons clears the list of added daemons. StopDaemons() function MUST be
// called before calling ClearDaemons().
func (s *Shezmu) ClearDaemons() {
s.daemons = []Daemon{}
}
2015-10-14 01:11:29 +00:00
// StartDaemons starts all registered daemons.
2016-01-24 16:49:57 +00:00
func (s *Shezmu) StartDaemons() {
2015-10-27 00:56:08 +00:00
s.Logger.Printf("Starting %d workers", s.NumWorkers)
for i := 0; i < s.NumWorkers; i++ {
go s.runWorker()
2015-10-27 00:33:04 +00:00
}
2016-01-25 01:15:17 +00:00
s.Logger.Println("Setting up daemons")
for _, d := range s.daemons {
s.setupDaemon(d)
}
2015-10-14 01:11:29 +00:00
}
// StopDaemons stops all running daemons.
2016-01-24 16:49:57 +00:00
func (s *Shezmu) StopDaemons() {
2015-10-18 00:22:07 +00:00
close(s.shutdownSystem)
2015-10-14 01:11:29 +00:00
for _, d := range s.daemons {
d.Shutdown()
}
2015-10-17 02:14:09 +00:00
2015-10-18 00:22:07 +00:00
s.wgSystem.Wait()
close(s.shutdownWorkers)
2016-07-19 15:30:40 +00:00
2015-10-18 00:22:07 +00:00
s.wgWorkers.Wait()
2015-10-15 23:07:04 +00:00
close(s.queue)
2016-07-19 15:30:40 +00:00
// Re-open closed channels to allow starting new deamons afterwards
s.shutdownSystem = make(chan struct{})
s.shutdownWorkers = make(chan struct{})
s.queue = make(chan *task)
fmt.Println(s.runtimeStats.Fetch(stats.Latency))
2015-10-14 01:11:29 +00:00
}
2016-07-26 21:52:48 +00:00
func (s *Shezmu) HandleSignals() {
ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGINT)
for {
switch sig := <-ch; sig {
case syscall.SIGINT:
s.StopDaemons()
return
default:
s.Logger.Printf("Signal ignored: %s", sig)
}
}
}
2016-01-25 01:15:17 +00:00
func (s *Shezmu) setupDaemon(d Daemon) {
defer func() {
if err := recover(); err != nil {
s.Logger.Printf("Failed to setup daemon %s due to process termination", d)
}
}()
s.queue <- &task{
daemon: d,
actor: d.Startup,
createdAt: time.Now(),
system: true,
name: "startup",
}
}
2016-01-24 16:49:57 +00:00
func (s *Shezmu) runWorker() {
2015-10-18 00:22:07 +00:00
s.wgWorkers.Add(1)
defer s.wgWorkers.Done()
2015-10-24 15:49:38 +00:00
defer func() {
if err := recover(); err != nil {
2015-10-27 00:56:08 +00:00
s.Logger.Printf("Worker crashed. Error: %v\n", err)
2015-10-24 15:49:38 +00:00
debug.PrintStack()
go s.runWorker() // Restarting worker
}
}()
2015-10-14 01:11:29 +00:00
for {
select {
case t := <-s.queue:
2015-10-18 00:22:07 +00:00
s.processTask(t)
case <-s.shutdownWorkers:
return
}
2015-10-14 01:11:29 +00:00
}
}
2015-10-18 00:22:07 +00:00
2016-01-24 16:49:57 +00:00
func (s *Shezmu) processTask(t *task) {
dur := time.Now().Sub(t.createdAt)
s.runtimeStats.Add(stats.Latency, dur)
2015-10-18 00:22:07 +00:00
if t.system {
s.processSystemTask(t)
} else {
s.processGeneralTask(t)
}
}
2016-01-24 16:49:57 +00:00
func (s *Shezmu) processSystemTask(t *task) {
// Abort starting a system task if shutdown was already called. Otherwise
// incrementing a wait group counter will cause a panic. This should be an
// extremely rare scenario when a system task crashes and tries to restart
// after a shutdown call.
select {
case <-s.shutdownSystem:
return
default:
}
2015-10-18 00:22:07 +00:00
s.wgSystem.Add(1)
defer s.wgSystem.Done()
defer func() {
if err := recover(); err != nil {
2015-10-24 16:25:16 +00:00
s.Logger.Printf("System task %s recovered from a panic\nError: %v\n", t, err)
2015-10-18 00:22:07 +00:00
debug.PrintStack()
t.createdAt = time.Now()
2015-10-18 00:22:07 +00:00
s.queue <- t // Restarting task
} else {
2016-01-25 01:15:17 +00:00
s.Logger.Printf("System task %s finished\n", t)
2015-10-18 00:22:07 +00:00
}
}()
2015-10-24 16:25:16 +00:00
s.Logger.Printf("Starting system task %s\n", t)
2015-10-24 00:07:47 +00:00
t.actor() // <--- ACTION STARTS HERE
2015-10-18 00:22:07 +00:00
}
2016-01-24 16:49:57 +00:00
func (s *Shezmu) processGeneralTask(t *task) {
2015-10-23 23:40:20 +00:00
defer func() {
2016-07-26 22:05:22 +00:00
if val := recover(); val != nil {
err := interfaceToError(val)
2016-01-24 22:39:38 +00:00
s.DaemonStats.Error(t.daemon.String())
2015-10-23 23:40:20 +00:00
t.daemon.base().handlePanic(err)
2016-07-26 22:05:22 +00:00
s.Logger.Printf("Daemon %s recovered from a panic\nError: %s\n", t.daemon, err.Error())
2015-10-23 23:40:20 +00:00
debug.PrintStack()
}
}()
defer func(start time.Time) {
dur := time.Now().Sub(start)
2016-01-24 22:39:38 +00:00
s.DaemonStats.Add(t.daemon.String(), dur)
}(time.Now())
2015-10-18 00:22:07 +00:00
2015-10-24 00:07:47 +00:00
t.actor() // <--- ACTION STARTS HERE
2015-10-18 00:22:07 +00:00
}
func (t *task) String() string {
2015-10-23 23:42:58 +00:00
return fmt.Sprintf("%s[%s]", t.daemon, t.name)
2015-10-18 00:22:07 +00:00
}
2016-07-26 22:05:22 +00:00
func interfaceToError(val interface{}) error {
if terr, ok := val.(error); ok {
return terr
}
return fmt.Errorf("%v", val)
}