1
0
Fork 0

Implement statistics collection

This commit is contained in:
Gregory Eremin 2015-01-27 13:07:17 +07:00
parent a51d2c5cb4
commit ee789dacb5
1 changed files with 23 additions and 3 deletions

View File

@ -4,6 +4,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/KosyanMedia/burlesque/stats"
"github.com/KosyanMedia/burlesque/storage" "github.com/KosyanMedia/burlesque/storage"
) )
@ -12,6 +13,7 @@ type (
storage *storage.Storage storage *storage.Storage
subscribers []*Subscription subscribers []*Subscription
lock sync.Mutex lock sync.Mutex
statistics *stats.Stats
} }
Message struct { Message struct {
Queue string Queue string
@ -27,6 +29,7 @@ func New(st *storage.Storage) *Hub {
h := &Hub{ h := &Hub{
storage: st, storage: st,
subscribers: []*Subscription{}, subscribers: []*Subscription{},
statistics: stats.New(),
} }
go h.cleanupEverySecond() go h.cleanupEverySecond()
@ -35,6 +38,8 @@ func New(st *storage.Storage) *Hub {
} }
func (h *Hub) Pub(queue string, msg []byte) bool { func (h *Hub) Pub(queue string, msg []byte) bool {
h.statistics.AddMessage(queue)
for _, s := range h.subscribers { for _, s := range h.subscribers {
if ok := s.Need(queue); ok { if ok := s.Need(queue); ok {
// Check if subscription is already served // Check if subscription is already served
@ -45,6 +50,7 @@ func (h *Hub) Pub(queue string, msg []byte) bool {
} }
if ok := s.Send(Message{queue, msg}); ok { if ok := s.Send(Message{queue, msg}); ok {
h.statistics.AddDelivery(queue)
return true return true
} }
} }
@ -57,11 +63,13 @@ func (h *Hub) Pub(queue string, msg []byte) bool {
func (h *Hub) Sub(s *Subscription) { func (h *Hub) Sub(s *Subscription) {
for _, queue := range s.Queues { for _, queue := range s.Queues {
if msg, ok := h.storage.Get(queue, s.Done()); ok { if msg, okGot := h.storage.Get(queue, s.Done()); okGot {
s.Send(Message{queue, msg}) if okSent := s.Send(Message{queue, msg}); okSent {
h.statistics.AddDelivery(queue)
return return
} }
} }
}
h.subscribers = append(h.subscribers, s) h.subscribers = append(h.subscribers, s)
} }
@ -82,9 +90,12 @@ func (h *Hub) Info() map[string]map[string]int64 {
info := make(map[string]map[string]int64) info := make(map[string]map[string]int64)
for queue, size := range h.storage.QueueSizes() { for queue, size := range h.storage.QueueSizes() {
inRate, outRate := h.statistics.Rates(queue)
info[queue] = map[string]int64{ info[queue] = map[string]int64{
"messages": size, "messages": size,
"subscriptions": 0, "subscriptions": 0,
"in_rate": inRate,
"out_rate": outRate,
} }
} }
for _, sub := range h.subscribers { for _, sub := range h.subscribers {
@ -102,6 +113,15 @@ func (h *Hub) Info() map[string]map[string]int64 {
return info return info
} }
func (h *Hub) RateHistory() map[string]map[string][]int64 {
hist := map[string]map[string][]int64{}
for queue, _ := range h.storage.QueueSizes() {
hist[queue] = h.statistics.RateHistory(queue)
}
return hist
}
func (h *Hub) StorageInfo() map[string]interface{} { func (h *Hub) StorageInfo() map[string]interface{} {
return h.storage.Info() return h.storage.Info()
} }