diff --git a/example/main.go b/example/main.go index c41aa1b..60ecbc4 100644 --- a/example/main.go +++ b/example/main.go @@ -31,7 +31,7 @@ func main() { server.Start() s := satan.Summon() - s.SubscribeFunc = kafka.Subscribe + s.Subscriber = kafka.Subscriber{} s.DaemonStats = stats.NewGroup(statsLogger, statsServer) s.AddDaemon(&daemons.NumberPrinter{}) diff --git a/stats/server.go b/stats/server.go index 1cd4d22..f337727 100644 --- a/stats/server.go +++ b/stats/server.go @@ -13,9 +13,22 @@ type Server struct { history map[string][]*serverStatsSnapshot } +type serverStatsSnapshot struct { + timestamp int64 + processed int64 + errors int64 + min float64 + p25 float64 + mean float64 + median float64 + p75 float64 + max float64 +} + const ( - serverSnapshotIntervl = 3 * time.Second - serverHistorySize = 30 + // 60 of 10 second snapshots is 10 minutes worth of stats + serverSnapshotIntervl = 10 * time.Second + serverHistorySize = 61 // +1 extra ) func NewServer() *Server { @@ -58,6 +71,7 @@ func (s *Server) takeSnapshots() { func makeServerStatsSnapshot(s *baseStats) *serverStatsSnapshot { ps := s.time.Percentiles([]float64{0.25, 0.5, 0.75}) + return &serverStatsSnapshot{ timestamp: time.Now().UTC().Unix(), processed: s.time.Count(), @@ -71,18 +85,6 @@ func makeServerStatsSnapshot(s *baseStats) *serverStatsSnapshot { } } -type serverStatsSnapshot struct { - timestamp int64 - processed int64 - errors int64 - min float64 - p25 float64 - mean float64 - median float64 - p75 float64 - max float64 -} - // Implements json.Marshaler func (s *serverStatsSnapshot) MarshalJSON() ([]byte, error) { return []byte(fmt.Sprintf("[%d,%d,%d,%.6f,%.6f,%.6f,%.6f,%.6f,%.6f]",