1
0
Fork 0

Working server with 5s stats snapshots

This commit is contained in:
Gregory Eremin 2015-10-28 01:23:39 +03:00
parent 3c77fec5e3
commit bd345af332
4 changed files with 66 additions and 61 deletions

View File

@ -9,6 +9,7 @@ import (
"github.com/localhots/satan" "github.com/localhots/satan"
"github.com/localhots/satan/example/daemons" "github.com/localhots/satan/example/daemons"
"github.com/localhots/satan/example/kafka" "github.com/localhots/satan/example/kafka"
"github.com/localhots/satan/server"
"github.com/localhots/satan/stats" "github.com/localhots/satan/stats"
) )
@ -24,9 +25,13 @@ func main() {
statsLogger := stats.NewStdoutLogger(0) statsLogger := stats.NewStdoutLogger(0)
defer statsLogger.Print() defer statsLogger.Print()
statsServer := stats.NewServer()
server := server.New(6464, statsServer)
server.Start()
s := satan.Summon() s := satan.Summon()
s.SubscribeFunc = kafka.Subscribe s.SubscribeFunc = kafka.Subscribe
s.DaemonStats = statsLogger s.DaemonStats = stats.NewGroup(statsLogger, statsServer)
s.AddDaemon(&daemons.NumberPrinter{}) s.AddDaemon(&daemons.NumberPrinter{})
s.AddDaemon(&daemons.PriceConsumer{}) s.AddDaemon(&daemons.PriceConsumer{})

View File

@ -3,25 +3,26 @@ package server
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"github.com/localhots/satan/stats"
) )
type Server struct { type Server struct {
port int port int
ss *stats.Server
mux *http.ServeMux mux *http.ServeMux
} }
func NewServer(port int) *Server { func New(port int, ss *stats.Server) *Server {
return &Server{ return &Server{
port: port, port: port,
ss: ss,
mux: http.NewServeMux(), mux: http.NewServeMux(),
} }
} }
func (s *Server) Handle(pattern string, handler http.Handler) {
s.mux.Handle(pattern, handler)
}
func (s *Server) Start() { func (s *Server) Start() {
addr := fmt.Sprintf(":%d", port) addr := fmt.Sprintf(":%d", s.port)
s.mux.HandleFunc("/stats.json", s.ss.History)
go http.ListenAndServe(addr, s.mux) go http.ListenAndServe(addr, s.mux)
} }

View File

@ -2,6 +2,7 @@ package stats
import ( import (
"fmt" "fmt"
"math"
"sync" "sync"
"time" "time"
@ -151,13 +152,14 @@ func (s *baseStats) String() string {
func (s *baseStats) snapshot() *baseSnapshot { func (s *baseStats) snapshot() *baseSnapshot {
return &baseSnapshot{ return &baseSnapshot{
Timestamp: time.Now().UTC().Unix(),
ProcessedVal: s.time.Count(), ProcessedVal: s.time.Count(),
ErrorsVal: s.errors.Count(), ErrorsVal: s.errors.Count(),
MinVal: s.time.Min(), MinVal: round(float64(s.time.Min())/1000000, 6),
MeanVal: s.time.Mean(), MeanVal: round(s.time.Mean()/1000000, 6),
P95Val: s.time.Percentile(0.95), P95Val: round(s.time.Percentile(0.95)/1000000, 6),
MaxVal: s.time.Max(), MaxVal: round(float64(s.time.Max())/1000000, 6),
StdDevVal: s.time.StdDev(), StdDevVal: round(s.time.StdDev()/1000000, 6),
} }
} }
@ -166,41 +168,21 @@ func (s *baseStats) snapshot() *baseSnapshot {
// //
type baseSnapshot struct { type baseSnapshot struct {
ProcessedVal int64 `json:"processed"` timestamp int64
ErrorsVal int64 `json:"errors"` processed int64
MinVal int64 `json:"min"` errors int64
MeanVal float64 `json:"mean"` min float64
P95Val float64 `json:"95%"` mean float64
MaxVal int64 `json:"max"` p95 float64
StdDevVal float64 `json:"stddev"` max float64
stddev float64
} }
func (s *baseSnapshot) Processed() int64 { // Implements json.Marshaler
return s.ProcessedVal func (s *baseSnapshot) MarshalJSON() ([]byte, error) {
} return []byte(fmt.Sprintf("[%d,%d,%d,%.6f,%.6f,%.6f,%.6f,%.6f]",
s.timestamp, s.processed, s.errors, s.min,
func (s *baseSnapshot) Errors() int64 { s.mean, s.p95, s.max, s.stddev)), nil
return s.ErrorsVal
}
func (s *baseSnapshot) Min() int64 {
return s.MinVal
}
func (s *baseSnapshot) Mean() float64 {
return s.MeanVal
}
func (s *baseSnapshot) P95() float64 {
return s.P95Val
}
func (s *baseSnapshot) Max() int64 {
return s.MaxVal
}
func (s *baseSnapshot) StdDev() float64 {
return s.StdDevVal
} }
// //
@ -219,3 +201,8 @@ func formatDuration(dur float64) string {
return fmt.Sprintf("%10.3fs", dur/1000000000) return fmt.Sprintf("%10.3fs", dur/1000000000)
} }
} }
func round(num float64, decimals int) float64 {
pow := math.Pow(10, float64(decimals))
return float64(int(num*pow+0.5)) / pow
}

View File

@ -2,38 +2,50 @@ package stats
import ( import (
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"time"
) )
type Server struct { type Server struct {
base base
history map[string][]*baseSnapshot
} }
const (
serverSnapshotIntervl = 5 * time.Second
serverHistorySize = 360 // 30 minutes of 5 second snapshots
)
func NewServer() *Server { func NewServer() *Server {
s := &Server{} s := &Server{}
s.init() s.init()
s.history = make(map[string][]*baseSnapshot)
go s.takeSnapshots()
return s return s
} }
func (s *Server) ServeHTTP(rw http.ResponseWriter, _ *http.Request) { func (s *Server) History(rw http.ResponseWriter, _ *http.Request) {
stats := make(map[string]map[string]interface{}) encoded, err := json.Marshal(s.history)
for name, stat := range s.stats {
stats[name] = map[string]interface{}{
"processed": stat.time.Count(),
"errors": stat.errors.Count(),
"min": float64(stat.time.Min()) / 1000000,
"mean": stat.time.Mean() / 1000000,
"95%": stat.time.Percentile(0.95) / 1000000,
"max": float64(stat.time.Max()) / 1000000,
"stddev": stat.time.StdDev() / 1000000,
}
}
encoded, err := json.MarshalIndent(stats, "", " ")
if err != nil { if err != nil {
panic(err) http.Error(rw, fmt.Sprintf("%v", err), http.StatusInternalServerError)
return
} }
rw.Write(encoded) rw.Write(encoded)
} }
func (s *Server) takeSnapshots() {
for range time.NewTicker(serverSnapshotIntervl).C {
s.Lock()
for name, stat := range s.stats {
if len(s.history[name]) >= serverHistorySize {
s.history[name] = s.history[name][1:]
}
s.history[name] = append(s.history[name], stat.snapshot())
}
s.Unlock()
}
}