diff --git a/example/main.go b/example/main.go index 0ca346b..674c31c 100644 --- a/example/main.go +++ b/example/main.go @@ -26,7 +26,7 @@ func main() { s := satan.Summon() s.SubscribeFunc = kafka.Subscribe - s.Statistics = statsLogger + s.DaemonStats = statsLogger s.AddDaemon(&daemons.NumberPrinter{}) s.AddDaemon(&daemons.PriceConsumer{}) diff --git a/satan.go b/satan.go index 2e3e3df..73bfb0e 100644 --- a/satan.go +++ b/satan.go @@ -8,17 +8,20 @@ import ( "sync" "sync/atomic" "time" + + "github.com/localhots/satan/stats" ) // Satan is the master daemon. type Satan struct { SubscribeFunc SubscribeFunc Publisher Publisher - Statistics StatsPublisher + DaemonStats stats.Publisher Logger *log.Logger - daemons []Daemon - queue chan *task + daemons []Daemon + queue chan *task + runtimeStats stats.Manager wgWorkers sync.WaitGroup wgSystem sync.WaitGroup @@ -46,26 +49,6 @@ type Publisher interface { Close() } -type StatsManager interface { - StatsPublisher - StatsFetcher -} - -type StatsPublisher interface { - Add(name string, dur time.Duration) - Error(name string) -} - -type StatsFetcher interface { - Processed(name string) int64 - Errors(name string) int64 - Min(name string) int64 - Max(name string) int64 - P95(name string) float64 - Mean(name string) float64 - StdDev(name string) float64 -} - type task struct { daemon Daemon actor Actor @@ -87,6 +70,7 @@ func Summon() *Satan { return &Satan{ Logger: log.New(os.Stdout, "[daemons] ", log.LstdFlags), queue: make(chan *task), + runtimeStats: stats.NewBasicStats(), shutdownWorkers: make(chan struct{}), shutdownSystem: make(chan struct{}), } @@ -122,6 +106,9 @@ func (s *Satan) StopDaemons() { close(s.shutdownWorkers) s.wgWorkers.Wait() close(s.queue) + + fmt.Println(s.runtimeStats.Fetch(stats.Latency)) + fmt.Println(s.runtimeStats.Fetch(stats.TaskWait)) } func (s *Satan) addWorkers(num int) { @@ -156,7 +143,7 @@ func (s *Satan) runWorker() { select { case t := <-s.queue: dur := time.Now().UnixNano() - start.UnixNano() - s.Statistics.Add("TaskWait", time.Duration(dur)) + s.runtimeStats.Add(stats.TaskWait, time.Duration(dur)) s.processTask(t) case <-s.shutdownWorkers: s.Logger.Printf("Worker #%d has stopped", i) @@ -167,7 +154,7 @@ func (s *Satan) runWorker() { func (s *Satan) processTask(t *task) { dur := time.Now().UnixNano() - t.createdAt.UnixNano() - s.Statistics.Add("Latency", time.Duration(dur)) + s.runtimeStats.Add(stats.Latency, time.Duration(dur)) if t.system { s.processSystemTask(t) @@ -198,18 +185,18 @@ func (s *Satan) processSystemTask(t *task) { func (s *Satan) processGeneralTask(t *task) { defer func() { if err := recover(); err != nil { - if s.Statistics != nil { - s.Statistics.Error(t.daemon.base().String()) + if s.DaemonStats != nil { + s.DaemonStats.Error(t.daemon.base().String()) } t.daemon.base().handlePanic(err) s.Logger.Printf("Daemon %s recovered from a panic\nError: %v\n", t.daemon.base(), err) debug.PrintStack() } }() - if s.Statistics != nil { + if s.DaemonStats != nil { defer func(start time.Time) { dur := time.Now().UnixNano() - start.UnixNano() - s.Statistics.Add(t.daemon.base().String(), time.Duration(dur)) + s.DaemonStats.Add(t.daemon.base().String(), time.Duration(dur)) }(time.Now()) } diff --git a/stats/base.go b/stats/base.go index 9269285..8889b44 100644 --- a/stats/base.go +++ b/stats/base.go @@ -1,22 +1,53 @@ package stats import ( + "fmt" "sync" "time" "github.com/rcrowley/go-metrics" ) +type Manager interface { + Publisher + Fetcher +} + +type Publisher interface { + Add(name string, dur time.Duration) + Error(name string) +} + +type Fetcher interface { + Fetch(name string) Stats +} + +type Stats interface { + Processed() int64 + Errors() int64 + Min() int64 + Max() int64 + P95() float64 + Mean() float64 + StdDev() float64 +} + type base struct { sync.Mutex stats map[string]*baseStats } type baseStats struct { + name string time metrics.Histogram errors metrics.Counter } +const ( + Latency = "Latency" + TaskWait = "TaskWait" +) + func (b *base) Add(name string, dur time.Duration) { b.metrics(name).time.Update(int64(dur)) } @@ -25,6 +56,58 @@ func (b *base) Error(name string) { b.metrics(name).errors.Inc(1) } +func (b *base) Fetch(name string) Stats { + return b.metrics(name) +} + +func (s *baseStats) Processed() int64 { + return s.time.Count() +} + +func (s *baseStats) Errors() int64 { + return s.errors.Count() +} + +func (s *baseStats) Min() int64 { + return s.time.Min() +} + +func (s *baseStats) Max() int64 { + return s.time.Max() +} + +func (s *baseStats) P95() float64 { + return s.time.Percentile(0.95) +} + +func (s *baseStats) Mean() float64 { + return s.time.Mean() +} + +func (s *baseStats) StdDev() float64 { + return s.time.StdDev() +} + +func (s *baseStats) String() string { + return fmt.Sprintf("%s statistics:\n"+ + "Processed: %10d\n"+ + "Errors: %10d\n"+ + "Min: %10s\n"+ + "Mean: %10s\n"+ + "95%%: %10s\n"+ + "Max: %10s\n"+ + "StdDev: %10s", + s.name, + s.time.Count(), + s.errors.Count(), + formatDuration(float64(s.time.Min())), + formatDuration(s.time.Mean()), + formatDuration(s.time.Percentile(0.95)), + formatDuration(float64(s.time.Max())), + formatDuration(s.time.StdDev()), + ) +} + func (b *base) init() { b.stats = make(map[string]*baseStats) } @@ -40,6 +123,7 @@ func (b *base) metrics(name string) *baseStats { } b.stats[name] = &baseStats{ + name: name, time: metrics.NewHistogram(metrics.NewUniformSample(1000)), errors: metrics.NewCounter(), } @@ -47,3 +131,16 @@ func (b *base) metrics(name string) *baseStats { return b.stats[name] } + +func formatDuration(dur float64) string { + switch { + case dur < 1000: + return fmt.Sprintf("%10.0fns", dur) + case dur < 1000000: + return fmt.Sprintf("%10.3fμs", dur/1000) + case dur < 1000000000: + return fmt.Sprintf("%10.3fms", dur/1000000) + default: + return fmt.Sprintf("%10.3fs", dur/1000000000) + } +} diff --git a/stats/basic.go b/stats/basic.go new file mode 100644 index 0000000..c89d4ed --- /dev/null +++ b/stats/basic.go @@ -0,0 +1,12 @@ +package stats + +type Basic struct { + base +} + +func NewBasicStats() *Basic { + b := &Basic{} + b.init() + + return b +} diff --git a/stats/group.go b/stats/group.go index 8e86501..7644822 100644 --- a/stats/group.go +++ b/stats/group.go @@ -2,15 +2,13 @@ package stats import ( "time" - - "github.com/localhots/satan" ) type Group struct { - backends []satan.StatsPublisher + backends []Publisher } -func NewGroup(backends ...satan.StatsPublisher) *Group { +func NewGroup(backends ...Publisher) *Group { return &Group{ backends: backends, } diff --git a/stats/logger.go b/stats/logger.go index d6480b9..a6291fb 100644 --- a/stats/logger.go +++ b/stats/logger.go @@ -1,7 +1,6 @@ package stats import ( - "fmt" "io" "os" "time" @@ -30,24 +29,9 @@ func NewStdoutLogger(interval time.Duration) *Logger { } func (l *Logger) Print() { - for name, s := range l.stats { - fmt.Fprintf(l.out, "%s statistics:\n"+ - "Processed: %d\n"+ - "Errors: %d\n"+ - "Min: %.8fms\n"+ - "Max: %.8fms\n"+ - "95%%: %.8fms\n"+ - "Mean: %.8fms\n"+ - "StdDev: %.8fms\n", - name, - s.time.Count(), - s.errors.Count(), - float64(s.time.Min())/1000000, - float64(s.time.Max())/1000000, - s.time.Percentile(0.95)/1000000, - s.time.Mean()/1000000, - s.time.StdDev()/1000000, - ) + for _, s := range l.stats { + l.out.Write([]byte(s.String())) + l.out.Write([]byte{'\n'}) s.time.Clear() s.errors.Clear() }