package hub import ( "sync" "time" "github.com/KosyanMedia/burlesque/storage" ) type ( Hub struct { storage *storage.Storage subscribers []*Subscription lock sync.Mutex } Message struct { Queue string Message []byte } MessageDump struct { Queue string `json:"queue"` Message string `json:"message"` } ) func New(st *storage.Storage) *Hub { h := &Hub{ storage: st, subscribers: []*Subscription{}, } go h.cleanupEverySecond() return h } func (h *Hub) Pub(queue string, msg []byte) bool { for _, s := range h.subscribers { if ok := s.Need(queue); ok { select { case <-s.Done(): continue default: } if ok := s.Send(Message{queue, msg}); ok { return true } } } err := h.storage.Put(queue, msg) return (err == nil) } func (h *Hub) Sub(s *Subscription) { for _, queue := range s.Queues { if msg, ok := h.storage.Get(queue, s.Done()); ok { s.Send(Message{queue, msg}) return } } h.subscribers = append(h.subscribers, s) } func (h *Hub) Flush(queues []string) []MessageDump { messages := []MessageDump{} for _, queue := range queues { for _, msg := range h.storage.Flush(queue) { messages = append(messages, MessageDump{queue, string(msg)}) } } return messages } func (h *Hub) Info() map[string]map[string]uint { info := make(map[string]map[string]uint) for queue, size := range h.storage.QueueSizes() { info[queue] = map[string]uint{ "messages": size, "subscriptions": 0, } } for _, sub := range h.subscribers { for _, queue := range sub.Queues { if _, ok := info[queue]; !ok { info[queue] = map[string]uint{"messages": 0} } if _, ok := info[queue]["subscriptions"]; !ok { info[queue]["subscriptions"] = 0 } info[queue]["subscriptions"]++ } } return info } func (h *Hub) StorageInfo() map[string]interface{} { return h.storage.Info() } func (h *Hub) cleanupEverySecond() { t := time.NewTicker(1 * time.Second) for { select { case <-t.C: h.cleanup() } } } func (h *Hub) cleanup() { h.lock.Lock() defer h.lock.Unlock() deleted := 0 for i, s := range h.subscribers { select { case <-s.Done(): h.subscribers = append(h.subscribers[:i-deleted], h.subscribers[i-deleted+1:]...) deleted++ default: } } }