1
0
Fork 0

Move all stats related code to stats package

This commit is contained in:
Gregory Eremin 2015-10-27 02:54:00 +03:00
parent 1cace592e6
commit 6f56486566
6 changed files with 131 additions and 53 deletions

View File

@ -26,7 +26,7 @@ func main() {
s := satan.Summon() s := satan.Summon()
s.SubscribeFunc = kafka.Subscribe s.SubscribeFunc = kafka.Subscribe
s.Statistics = statsLogger s.DaemonStats = statsLogger
s.AddDaemon(&daemons.NumberPrinter{}) s.AddDaemon(&daemons.NumberPrinter{})
s.AddDaemon(&daemons.PriceConsumer{}) s.AddDaemon(&daemons.PriceConsumer{})

View File

@ -8,17 +8,20 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/localhots/satan/stats"
) )
// Satan is the master daemon. // Satan is the master daemon.
type Satan struct { type Satan struct {
SubscribeFunc SubscribeFunc SubscribeFunc SubscribeFunc
Publisher Publisher Publisher Publisher
Statistics StatsPublisher DaemonStats stats.Publisher
Logger *log.Logger Logger *log.Logger
daemons []Daemon daemons []Daemon
queue chan *task queue chan *task
runtimeStats stats.Manager
wgWorkers sync.WaitGroup wgWorkers sync.WaitGroup
wgSystem sync.WaitGroup wgSystem sync.WaitGroup
@ -46,26 +49,6 @@ type Publisher interface {
Close() Close()
} }
type StatsManager interface {
StatsPublisher
StatsFetcher
}
type StatsPublisher interface {
Add(name string, dur time.Duration)
Error(name string)
}
type StatsFetcher interface {
Processed(name string) int64
Errors(name string) int64
Min(name string) int64
Max(name string) int64
P95(name string) float64
Mean(name string) float64
StdDev(name string) float64
}
type task struct { type task struct {
daemon Daemon daemon Daemon
actor Actor actor Actor
@ -87,6 +70,7 @@ func Summon() *Satan {
return &Satan{ return &Satan{
Logger: log.New(os.Stdout, "[daemons] ", log.LstdFlags), Logger: log.New(os.Stdout, "[daemons] ", log.LstdFlags),
queue: make(chan *task), queue: make(chan *task),
runtimeStats: stats.NewBasicStats(),
shutdownWorkers: make(chan struct{}), shutdownWorkers: make(chan struct{}),
shutdownSystem: make(chan struct{}), shutdownSystem: make(chan struct{}),
} }
@ -122,6 +106,9 @@ func (s *Satan) StopDaemons() {
close(s.shutdownWorkers) close(s.shutdownWorkers)
s.wgWorkers.Wait() s.wgWorkers.Wait()
close(s.queue) close(s.queue)
fmt.Println(s.runtimeStats.Fetch(stats.Latency))
fmt.Println(s.runtimeStats.Fetch(stats.TaskWait))
} }
func (s *Satan) addWorkers(num int) { func (s *Satan) addWorkers(num int) {
@ -156,7 +143,7 @@ func (s *Satan) runWorker() {
select { select {
case t := <-s.queue: case t := <-s.queue:
dur := time.Now().UnixNano() - start.UnixNano() dur := time.Now().UnixNano() - start.UnixNano()
s.Statistics.Add("TaskWait", time.Duration(dur)) s.runtimeStats.Add(stats.TaskWait, time.Duration(dur))
s.processTask(t) s.processTask(t)
case <-s.shutdownWorkers: case <-s.shutdownWorkers:
s.Logger.Printf("Worker #%d has stopped", i) s.Logger.Printf("Worker #%d has stopped", i)
@ -167,7 +154,7 @@ func (s *Satan) runWorker() {
func (s *Satan) processTask(t *task) { func (s *Satan) processTask(t *task) {
dur := time.Now().UnixNano() - t.createdAt.UnixNano() dur := time.Now().UnixNano() - t.createdAt.UnixNano()
s.Statistics.Add("Latency", time.Duration(dur)) s.runtimeStats.Add(stats.Latency, time.Duration(dur))
if t.system { if t.system {
s.processSystemTask(t) s.processSystemTask(t)
@ -198,18 +185,18 @@ func (s *Satan) processSystemTask(t *task) {
func (s *Satan) processGeneralTask(t *task) { func (s *Satan) processGeneralTask(t *task) {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
if s.Statistics != nil { if s.DaemonStats != nil {
s.Statistics.Error(t.daemon.base().String()) s.DaemonStats.Error(t.daemon.base().String())
} }
t.daemon.base().handlePanic(err) t.daemon.base().handlePanic(err)
s.Logger.Printf("Daemon %s recovered from a panic\nError: %v\n", t.daemon.base(), err) s.Logger.Printf("Daemon %s recovered from a panic\nError: %v\n", t.daemon.base(), err)
debug.PrintStack() debug.PrintStack()
} }
}() }()
if s.Statistics != nil { if s.DaemonStats != nil {
defer func(start time.Time) { defer func(start time.Time) {
dur := time.Now().UnixNano() - start.UnixNano() dur := time.Now().UnixNano() - start.UnixNano()
s.Statistics.Add(t.daemon.base().String(), time.Duration(dur)) s.DaemonStats.Add(t.daemon.base().String(), time.Duration(dur))
}(time.Now()) }(time.Now())
} }

View File

@ -1,22 +1,53 @@
package stats package stats
import ( import (
"fmt"
"sync" "sync"
"time" "time"
"github.com/rcrowley/go-metrics" "github.com/rcrowley/go-metrics"
) )
type Manager interface {
Publisher
Fetcher
}
type Publisher interface {
Add(name string, dur time.Duration)
Error(name string)
}
type Fetcher interface {
Fetch(name string) Stats
}
type Stats interface {
Processed() int64
Errors() int64
Min() int64
Max() int64
P95() float64
Mean() float64
StdDev() float64
}
type base struct { type base struct {
sync.Mutex sync.Mutex
stats map[string]*baseStats stats map[string]*baseStats
} }
type baseStats struct { type baseStats struct {
name string
time metrics.Histogram time metrics.Histogram
errors metrics.Counter errors metrics.Counter
} }
const (
Latency = "Latency"
TaskWait = "TaskWait"
)
func (b *base) Add(name string, dur time.Duration) { func (b *base) Add(name string, dur time.Duration) {
b.metrics(name).time.Update(int64(dur)) b.metrics(name).time.Update(int64(dur))
} }
@ -25,6 +56,58 @@ func (b *base) Error(name string) {
b.metrics(name).errors.Inc(1) b.metrics(name).errors.Inc(1)
} }
func (b *base) Fetch(name string) Stats {
return b.metrics(name)
}
func (s *baseStats) Processed() int64 {
return s.time.Count()
}
func (s *baseStats) Errors() int64 {
return s.errors.Count()
}
func (s *baseStats) Min() int64 {
return s.time.Min()
}
func (s *baseStats) Max() int64 {
return s.time.Max()
}
func (s *baseStats) P95() float64 {
return s.time.Percentile(0.95)
}
func (s *baseStats) Mean() float64 {
return s.time.Mean()
}
func (s *baseStats) StdDev() float64 {
return s.time.StdDev()
}
func (s *baseStats) String() string {
return fmt.Sprintf("%s statistics:\n"+
"Processed: %10d\n"+
"Errors: %10d\n"+
"Min: %10s\n"+
"Mean: %10s\n"+
"95%%: %10s\n"+
"Max: %10s\n"+
"StdDev: %10s",
s.name,
s.time.Count(),
s.errors.Count(),
formatDuration(float64(s.time.Min())),
formatDuration(s.time.Mean()),
formatDuration(s.time.Percentile(0.95)),
formatDuration(float64(s.time.Max())),
formatDuration(s.time.StdDev()),
)
}
func (b *base) init() { func (b *base) init() {
b.stats = make(map[string]*baseStats) b.stats = make(map[string]*baseStats)
} }
@ -40,6 +123,7 @@ func (b *base) metrics(name string) *baseStats {
} }
b.stats[name] = &baseStats{ b.stats[name] = &baseStats{
name: name,
time: metrics.NewHistogram(metrics.NewUniformSample(1000)), time: metrics.NewHistogram(metrics.NewUniformSample(1000)),
errors: metrics.NewCounter(), errors: metrics.NewCounter(),
} }
@ -47,3 +131,16 @@ func (b *base) metrics(name string) *baseStats {
return b.stats[name] return b.stats[name]
} }
func formatDuration(dur float64) string {
switch {
case dur < 1000:
return fmt.Sprintf("%10.0fns", dur)
case dur < 1000000:
return fmt.Sprintf("%10.3fμs", dur/1000)
case dur < 1000000000:
return fmt.Sprintf("%10.3fms", dur/1000000)
default:
return fmt.Sprintf("%10.3fs", dur/1000000000)
}
}

12
stats/basic.go Normal file
View File

@ -0,0 +1,12 @@
package stats
type Basic struct {
base
}
func NewBasicStats() *Basic {
b := &Basic{}
b.init()
return b
}

View File

@ -2,15 +2,13 @@ package stats
import ( import (
"time" "time"
"github.com/localhots/satan"
) )
type Group struct { type Group struct {
backends []satan.StatsPublisher backends []Publisher
} }
func NewGroup(backends ...satan.StatsPublisher) *Group { func NewGroup(backends ...Publisher) *Group {
return &Group{ return &Group{
backends: backends, backends: backends,
} }

View File

@ -1,7 +1,6 @@
package stats package stats
import ( import (
"fmt"
"io" "io"
"os" "os"
"time" "time"
@ -30,24 +29,9 @@ func NewStdoutLogger(interval time.Duration) *Logger {
} }
func (l *Logger) Print() { func (l *Logger) Print() {
for name, s := range l.stats { for _, s := range l.stats {
fmt.Fprintf(l.out, "%s statistics:\n"+ l.out.Write([]byte(s.String()))
"Processed: %d\n"+ l.out.Write([]byte{'\n'})
"Errors: %d\n"+
"Min: %.8fms\n"+
"Max: %.8fms\n"+
"95%%: %.8fms\n"+
"Mean: %.8fms\n"+
"StdDev: %.8fms\n",
name,
s.time.Count(),
s.errors.Count(),
float64(s.time.Min())/1000000,
float64(s.time.Max())/1000000,
s.time.Percentile(0.95)/1000000,
s.time.Mean()/1000000,
s.time.StdDev()/1000000,
)
s.time.Clear() s.time.Clear()
s.errors.Clear() s.errors.Clear()
} }