diff --git a/hub/hub.go b/hub/hub.go index 445e1fc..db27585 100644 --- a/hub/hub.go +++ b/hub/hub.go @@ -4,6 +4,7 @@ import ( "sync" "time" + "github.com/KosyanMedia/burlesque/stats" "github.com/KosyanMedia/burlesque/storage" ) @@ -12,6 +13,7 @@ type ( storage *storage.Storage subscribers []*Subscription lock sync.Mutex + statistics *stats.Stats } Message struct { Queue string @@ -27,6 +29,7 @@ func New(st *storage.Storage) *Hub { h := &Hub{ storage: st, subscribers: []*Subscription{}, + statistics: stats.New(), } go h.cleanupEverySecond() @@ -35,6 +38,8 @@ func New(st *storage.Storage) *Hub { } func (h *Hub) Pub(queue string, msg []byte) bool { + h.statistics.AddMessage(queue) + for _, s := range h.subscribers { if ok := s.Need(queue); ok { // Check if subscription is already served @@ -45,6 +50,7 @@ func (h *Hub) Pub(queue string, msg []byte) bool { } if ok := s.Send(Message{queue, msg}); ok { + h.statistics.AddDelivery(queue) return true } } @@ -57,9 +63,11 @@ func (h *Hub) Pub(queue string, msg []byte) bool { func (h *Hub) Sub(s *Subscription) { for _, queue := range s.Queues { - if msg, ok := h.storage.Get(queue, s.Done()); ok { - s.Send(Message{queue, msg}) - return + if msg, okGot := h.storage.Get(queue, s.Done()); okGot { + if okSent := s.Send(Message{queue, msg}); okSent { + h.statistics.AddDelivery(queue) + return + } } } @@ -82,9 +90,12 @@ func (h *Hub) Info() map[string]map[string]int64 { info := make(map[string]map[string]int64) for queue, size := range h.storage.QueueSizes() { + inRate, outRate := h.statistics.Rates(queue) info[queue] = map[string]int64{ "messages": size, "subscriptions": 0, + "in_rate": inRate, + "out_rate": outRate, } } for _, sub := range h.subscribers { @@ -102,6 +113,15 @@ func (h *Hub) Info() map[string]map[string]int64 { return info } +func (h *Hub) RateHistory() map[string]map[string][]int64 { + hist := map[string]map[string][]int64{} + for queue, _ := range h.storage.QueueSizes() { + hist[queue] = h.statistics.RateHistory(queue) + } + + return hist +} + func (h *Hub) StorageInfo() map[string]interface{} { return h.storage.Info() }