Add statistics interface
This commit is contained in:
parent
d2041b69a1
commit
0e5670707f
@ -4,7 +4,6 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"runtime/debug"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -50,7 +49,6 @@ type BaseDaemon struct {
|
|||||||
publisher Publisher
|
publisher Publisher
|
||||||
self Daemon
|
self Daemon
|
||||||
name string
|
name string
|
||||||
stats *statistics
|
|
||||||
queue chan<- *task
|
queue chan<- *task
|
||||||
panicHandler PanicHandler
|
panicHandler PanicHandler
|
||||||
shutdown chan struct{}
|
shutdown chan struct{}
|
||||||
|
34
satan.go
34
satan.go
@ -13,10 +13,10 @@ import (
|
|||||||
type Satan struct {
|
type Satan struct {
|
||||||
SubscribeFunc SubscribeFunc
|
SubscribeFunc SubscribeFunc
|
||||||
Publisher Publisher
|
Publisher Publisher
|
||||||
|
Statistics Statistics
|
||||||
|
|
||||||
daemons []Daemon
|
daemons []Daemon
|
||||||
queue chan *task
|
queue chan *task
|
||||||
latency *statistics
|
|
||||||
|
|
||||||
wgWorkers sync.WaitGroup
|
wgWorkers sync.WaitGroup
|
||||||
wgSystem sync.WaitGroup
|
wgSystem sync.WaitGroup
|
||||||
@ -44,6 +44,11 @@ type Publisher interface {
|
|||||||
Close()
|
Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Statistics interface {
|
||||||
|
Add(daemonName string, dur time.Duration)
|
||||||
|
Error(daemonName string)
|
||||||
|
}
|
||||||
|
|
||||||
type task struct {
|
type task struct {
|
||||||
daemon Daemon
|
daemon Daemon
|
||||||
actor Actor
|
actor Actor
|
||||||
@ -64,7 +69,6 @@ var (
|
|||||||
func Summon() *Satan {
|
func Summon() *Satan {
|
||||||
return &Satan{
|
return &Satan{
|
||||||
queue: make(chan *task),
|
queue: make(chan *task),
|
||||||
latency: newStatistics(),
|
|
||||||
shutdownWorkers: make(chan struct{}),
|
shutdownWorkers: make(chan struct{}),
|
||||||
shutdownSystem: make(chan struct{}),
|
shutdownSystem: make(chan struct{}),
|
||||||
}
|
}
|
||||||
@ -78,7 +82,6 @@ func (s *Satan) AddDaemon(d Daemon) {
|
|||||||
base.publisher = s.Publisher
|
base.publisher = s.Publisher
|
||||||
base.queue = s.queue
|
base.queue = s.queue
|
||||||
base.shutdown = s.shutdownSystem
|
base.shutdown = s.shutdownSystem
|
||||||
base.stats = newStatistics()
|
|
||||||
|
|
||||||
go d.Startup()
|
go d.Startup()
|
||||||
s.daemons = append(s.daemons, d)
|
s.daemons = append(s.daemons, d)
|
||||||
@ -100,12 +103,6 @@ func (s *Satan) StopDaemons() {
|
|||||||
close(s.shutdownWorkers)
|
close(s.shutdownWorkers)
|
||||||
s.wgWorkers.Wait()
|
s.wgWorkers.Wait()
|
||||||
close(s.queue)
|
close(s.queue)
|
||||||
|
|
||||||
for _, d := range s.daemons {
|
|
||||||
stats := d.base().stats.snapshot()
|
|
||||||
log.Printf("%s daemon performace statistics:\n%s\n", d.base(), stats)
|
|
||||||
}
|
|
||||||
log.Printf("Task processing latency statistics:\n%s\n", s.latency.snapshot())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Satan) addWorkers(num int) {
|
func (s *Satan) addWorkers(num int) {
|
||||||
@ -125,8 +122,8 @@ func (s *Satan) runWorker() {
|
|||||||
defer s.wgWorkers.Done()
|
defer s.wgWorkers.Done()
|
||||||
|
|
||||||
i := atomic.AddUint64(&workerIndex, 1)
|
i := atomic.AddUint64(&workerIndex, 1)
|
||||||
log.Printf("Starting worker #%d", i+1)
|
log.Printf("Starting worker #%d", i)
|
||||||
defer log.Printf("Worker #%d has stopped", i+1)
|
defer log.Printf("Worker #%d has stopped", i)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -140,7 +137,7 @@ func (s *Satan) runWorker() {
|
|||||||
|
|
||||||
func (s *Satan) processTask(t *task) {
|
func (s *Satan) processTask(t *task) {
|
||||||
dur := time.Now().UnixNano() - t.createdAt.UnixNano()
|
dur := time.Now().UnixNano() - t.createdAt.UnixNano()
|
||||||
s.latency.add(time.Duration(dur))
|
s.Statistics.Add("Latency", time.Duration(dur))
|
||||||
|
|
||||||
if t.system {
|
if t.system {
|
||||||
s.processSystemTask(t)
|
s.processSystemTask(t)
|
||||||
@ -169,17 +166,22 @@ func (s *Satan) processSystemTask(t *task) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Satan) processGeneralTask(t *task) {
|
func (s *Satan) processGeneralTask(t *task) {
|
||||||
defer func(start time.Time) {
|
|
||||||
dur := time.Now().UnixNano() - start.UnixNano()
|
|
||||||
t.daemon.base().stats.add(time.Duration(dur))
|
|
||||||
}(time.Now())
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
|
if s.Statistics != nil {
|
||||||
|
s.Statistics.Error(t.daemon.base().String())
|
||||||
|
}
|
||||||
t.daemon.base().handlePanic(err)
|
t.daemon.base().handlePanic(err)
|
||||||
log.Printf("Daemon %s recovered from a panic\nError: %v\n", t.daemon.base(), err)
|
log.Printf("Daemon %s recovered from a panic\nError: %v\n", t.daemon.base(), err)
|
||||||
debug.PrintStack()
|
debug.PrintStack()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
if s.Statistics != nil {
|
||||||
|
defer func(start time.Time) {
|
||||||
|
dur := time.Now().UnixNano() - start.UnixNano()
|
||||||
|
s.Statistics.Add(t.daemon.base().String(), time.Duration(dur))
|
||||||
|
}(time.Now())
|
||||||
|
}
|
||||||
|
|
||||||
t.actor() // <--- THE ACTION HAPPENS HERE
|
t.actor() // <--- THE ACTION HAPPENS HERE
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user