1
0
Fork 0
burlesque/hub/hub.go

150 lines
2.7 KiB
Go
Raw Permalink Normal View History

2014-09-10 11:44:38 +00:00
package hub
import (
2014-09-11 10:27:03 +00:00
"sync"
2014-09-11 11:11:48 +00:00
"time"
2014-09-11 10:27:03 +00:00
2015-01-27 06:07:17 +00:00
"github.com/KosyanMedia/burlesque/stats"
2014-09-10 11:44:38 +00:00
"github.com/KosyanMedia/burlesque/storage"
)
type (
Hub struct {
storage *storage.Storage
2014-09-10 12:52:18 +00:00
subscribers []*Subscription
2014-09-11 10:27:03 +00:00
lock sync.Mutex
2015-01-27 06:07:17 +00:00
statistics *stats.Stats
2014-09-10 11:44:38 +00:00
}
2014-09-24 15:36:09 +00:00
Message struct {
Queue string
Message []byte
}
2014-09-24 15:37:33 +00:00
MessageDump struct {
Queue string `json:"queue"`
Message string `json:"message"`
}
2014-09-10 11:44:38 +00:00
)
2014-09-10 12:52:18 +00:00
func New(st *storage.Storage) *Hub {
2014-09-11 11:11:48 +00:00
h := &Hub{
2014-09-10 12:52:18 +00:00
storage: st,
subscribers: []*Subscription{},
2015-01-27 06:07:17 +00:00
statistics: stats.New(),
2014-09-10 12:52:18 +00:00
}
2014-09-11 11:11:48 +00:00
2014-09-11 18:53:53 +00:00
go h.cleanupEverySecond()
2014-09-11 11:11:48 +00:00
return h
2014-09-10 11:44:38 +00:00
}
2014-09-10 12:52:18 +00:00
func (h *Hub) Pub(queue string, msg []byte) bool {
2015-01-27 06:07:17 +00:00
h.statistics.AddMessage(queue)
2014-09-10 12:52:18 +00:00
for _, s := range h.subscribers {
2014-09-11 10:27:03 +00:00
if ok := s.Need(queue); ok {
2015-01-26 18:03:51 +00:00
// Check if subscription is already served
2014-09-10 12:52:18 +00:00
select {
case <-s.Done():
continue
default:
}
2014-09-24 12:52:31 +00:00
if ok := s.Send(Message{queue, msg}); ok {
2015-01-27 06:07:17 +00:00
h.statistics.AddDelivery(queue)
2014-09-10 12:52:18 +00:00
return true
}
}
}
err := h.storage.Put(queue, msg)
return (err == nil)
2014-09-10 11:44:38 +00:00
}
2014-09-10 12:52:18 +00:00
func (h *Hub) Sub(s *Subscription) {
2014-09-24 15:34:39 +00:00
for _, queue := range s.Queues {
2015-01-27 06:07:17 +00:00
if msg, okGot := h.storage.Get(queue, s.Done()); okGot {
if okSent := s.Send(Message{queue, msg}); okSent {
h.statistics.AddDelivery(queue)
return
}
2014-09-11 10:27:03 +00:00
}
}
h.subscribers = append(h.subscribers, s)
}
func (h *Hub) Flush(queues []string) []MessageDump {
messages := []MessageDump{}
2014-09-24 15:37:33 +00:00
for _, queue := range queues {
for _, msg := range h.storage.Flush(queue) {
messages = append(messages, MessageDump{queue, string(msg)})
}
}
return messages
2014-09-24 15:37:33 +00:00
}
2015-01-26 18:03:51 +00:00
func (h *Hub) Info() map[string]map[string]int64 {
info := make(map[string]map[string]int64)
2014-09-11 19:08:12 +00:00
2014-09-24 10:13:08 +00:00
for queue, size := range h.storage.QueueSizes() {
2015-01-26 18:03:51 +00:00
info[queue] = map[string]int64{
2014-09-11 19:08:12 +00:00
"messages": size,
"subscriptions": 0,
}
}
for _, sub := range h.subscribers {
for _, queue := range sub.Queues {
if _, ok := info[queue]; !ok {
2015-01-26 18:03:51 +00:00
info[queue] = map[string]int64{"messages": 0}
2014-09-11 19:08:12 +00:00
}
if _, ok := info[queue]["subscriptions"]; !ok {
info[queue]["subscriptions"] = 0
}
info[queue]["subscriptions"]++
2014-09-11 19:08:12 +00:00
}
}
return info
}
2015-01-27 06:57:44 +00:00
func (h *Hub) Rates(queue string) (in, out int64) {
return h.statistics.Rates(queue)
}
2015-01-27 06:07:17 +00:00
2015-01-27 06:57:44 +00:00
func (h *Hub) RateHistory(queue string) (in, out []int64) {
return h.statistics.RateHistory(queue)
2015-01-27 06:07:17 +00:00
}
2014-09-24 10:13:08 +00:00
func (h *Hub) StorageInfo() map[string]interface{} {
return h.storage.Info()
}
2014-09-11 18:53:53 +00:00
func (h *Hub) cleanupEverySecond() {
2014-09-11 11:11:48 +00:00
t := time.NewTicker(1 * time.Second)
for {
select {
case <-t.C:
h.cleanup()
}
}
}
2014-09-11 10:27:03 +00:00
func (h *Hub) cleanup() {
h.lock.Lock()
defer h.lock.Unlock()
deleted := 0
for i, s := range h.subscribers {
select {
case <-s.Done():
h.subscribers = append(h.subscribers[:i-deleted], h.subscribers[i-deleted+1:]...)
deleted++
default:
}
2014-09-10 12:52:18 +00:00
}
2014-09-10 11:44:38 +00:00
}