diff --git a/example/main.go b/example/main.go index 674c31c..c9261a9 100644 --- a/example/main.go +++ b/example/main.go @@ -9,6 +9,7 @@ import ( "github.com/localhots/satan" "github.com/localhots/satan/example/daemons" "github.com/localhots/satan/example/kafka" + "github.com/localhots/satan/server" "github.com/localhots/satan/stats" ) @@ -24,9 +25,13 @@ func main() { statsLogger := stats.NewStdoutLogger(0) defer statsLogger.Print() + statsServer := stats.NewServer() + server := server.New(6464, statsServer) + server.Start() + s := satan.Summon() s.SubscribeFunc = kafka.Subscribe - s.DaemonStats = statsLogger + s.DaemonStats = stats.NewGroup(statsLogger, statsServer) s.AddDaemon(&daemons.NumberPrinter{}) s.AddDaemon(&daemons.PriceConsumer{}) diff --git a/server/server.go b/server/server.go index fe23076..5c17013 100644 --- a/server/server.go +++ b/server/server.go @@ -3,25 +3,26 @@ package server import ( "fmt" "net/http" + + "github.com/localhots/satan/stats" ) type Server struct { port int + ss *stats.Server mux *http.ServeMux } -func NewServer(port int) *Server { +func New(port int, ss *stats.Server) *Server { return &Server{ port: port, + ss: ss, mux: http.NewServeMux(), } } -func (s *Server) Handle(pattern string, handler http.Handler) { - s.mux.Handle(pattern, handler) -} - func (s *Server) Start() { - addr := fmt.Sprintf(":%d", port) + addr := fmt.Sprintf(":%d", s.port) + s.mux.HandleFunc("/stats.json", s.ss.History) go http.ListenAndServe(addr, s.mux) } diff --git a/stats/base.go b/stats/base.go index c3c9255..0e882fd 100644 --- a/stats/base.go +++ b/stats/base.go @@ -2,6 +2,7 @@ package stats import ( "fmt" + "math" "sync" "time" @@ -151,13 +152,14 @@ func (s *baseStats) String() string { func (s *baseStats) snapshot() *baseSnapshot { return &baseSnapshot{ + Timestamp: time.Now().UTC().Unix(), ProcessedVal: s.time.Count(), ErrorsVal: s.errors.Count(), - MinVal: s.time.Min(), - MeanVal: s.time.Mean(), - P95Val: s.time.Percentile(0.95), - MaxVal: s.time.Max(), - StdDevVal: s.time.StdDev(), + MinVal: round(float64(s.time.Min())/1000000, 6), + MeanVal: round(s.time.Mean()/1000000, 6), + P95Val: round(s.time.Percentile(0.95)/1000000, 6), + MaxVal: round(float64(s.time.Max())/1000000, 6), + StdDevVal: round(s.time.StdDev()/1000000, 6), } } @@ -166,41 +168,21 @@ func (s *baseStats) snapshot() *baseSnapshot { // type baseSnapshot struct { - ProcessedVal int64 `json:"processed"` - ErrorsVal int64 `json:"errors"` - MinVal int64 `json:"min"` - MeanVal float64 `json:"mean"` - P95Val float64 `json:"95%"` - MaxVal int64 `json:"max"` - StdDevVal float64 `json:"stddev"` + timestamp int64 + processed int64 + errors int64 + min float64 + mean float64 + p95 float64 + max float64 + stddev float64 } -func (s *baseSnapshot) Processed() int64 { - return s.ProcessedVal -} - -func (s *baseSnapshot) Errors() int64 { - return s.ErrorsVal -} - -func (s *baseSnapshot) Min() int64 { - return s.MinVal -} - -func (s *baseSnapshot) Mean() float64 { - return s.MeanVal -} - -func (s *baseSnapshot) P95() float64 { - return s.P95Val -} - -func (s *baseSnapshot) Max() int64 { - return s.MaxVal -} - -func (s *baseSnapshot) StdDev() float64 { - return s.StdDevVal +// Implements json.Marshaler +func (s *baseSnapshot) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf("[%d,%d,%d,%.6f,%.6f,%.6f,%.6f,%.6f]", + s.timestamp, s.processed, s.errors, s.min, + s.mean, s.p95, s.max, s.stddev)), nil } // @@ -219,3 +201,8 @@ func formatDuration(dur float64) string { return fmt.Sprintf("%10.3fs", dur/1000000000) } } + +func round(num float64, decimals int) float64 { + pow := math.Pow(10, float64(decimals)) + return float64(int(num*pow+0.5)) / pow +} diff --git a/stats/server.go b/stats/server.go index 7e09133..6d6ff41 100644 --- a/stats/server.go +++ b/stats/server.go @@ -2,38 +2,50 @@ package stats import ( "encoding/json" + "fmt" "net/http" + "time" ) type Server struct { base + + history map[string][]*baseSnapshot } +const ( + serverSnapshotIntervl = 5 * time.Second + serverHistorySize = 360 // 30 minutes of 5 second snapshots +) + func NewServer() *Server { s := &Server{} s.init() + s.history = make(map[string][]*baseSnapshot) + go s.takeSnapshots() return s } -func (s *Server) ServeHTTP(rw http.ResponseWriter, _ *http.Request) { - stats := make(map[string]map[string]interface{}) - for name, stat := range s.stats { - stats[name] = map[string]interface{}{ - "processed": stat.time.Count(), - "errors": stat.errors.Count(), - "min": float64(stat.time.Min()) / 1000000, - "mean": stat.time.Mean() / 1000000, - "95%": stat.time.Percentile(0.95) / 1000000, - "max": float64(stat.time.Max()) / 1000000, - "stddev": stat.time.StdDev() / 1000000, - } - } - - encoded, err := json.MarshalIndent(stats, "", " ") +func (s *Server) History(rw http.ResponseWriter, _ *http.Request) { + encoded, err := json.Marshal(s.history) if err != nil { - panic(err) + http.Error(rw, fmt.Sprintf("%v", err), http.StatusInternalServerError) + return } rw.Write(encoded) } + +func (s *Server) takeSnapshots() { + for range time.NewTicker(serverSnapshotIntervl).C { + s.Lock() + for name, stat := range s.stats { + if len(s.history[name]) >= serverHistorySize { + s.history[name] = s.history[name][1:] + } + s.history[name] = append(s.history[name], stat.snapshot()) + } + s.Unlock() + } +}