From 638a61e8abab5fbb524a7325d635368735c4f66a Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Thu, 11 Sep 2014 14:27:03 +0400 Subject: [PATCH] Hub cleanup --- hub/hub.go | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/hub/hub.go b/hub/hub.go index 2487148..14d891c 100644 --- a/hub/hub.go +++ b/hub/hub.go @@ -1,6 +1,8 @@ package hub import ( + "sync" + "github.com/KosyanMedia/burlesque/storage" ) @@ -8,6 +10,7 @@ type ( Hub struct { storage *storage.Storage subscribers []*Subscription + lock sync.Mutex } ) @@ -20,15 +23,14 @@ func New(st *storage.Storage) *Hub { func (h *Hub) Pub(queue string, msg []byte) bool { for _, s := range h.subscribers { - if s.Queue == queue { + if ok := s.Need(queue); ok { select { case <-s.Done(): - // FIXME: Cleanup needed continue default: } - if ok := s.Send(msg); ok { + if ok := s.Send(Result{queue, msg}); ok { return true } } @@ -40,10 +42,27 @@ func (h *Hub) Pub(queue string, msg []byte) bool { } func (h *Hub) Sub(s *Subscription) { - if msg, ok := h.storage.Get(s.Queue); ok { - s.Send(msg) - } else { - // FIXME: Race condition - h.subscribers = append(h.subscribers, s) + for _, q := range s.Queues { + if msg, ok := h.storage.Get(q); ok { + s.Send(Result{q, msg}) + return + } + } + + h.subscribers = append(h.subscribers, s) +} + +func (h *Hub) cleanup() { + h.lock.Lock() + defer h.lock.Unlock() + + deleted := 0 + for i, s := range h.subscribers { + select { + case <-s.Done(): + h.subscribers = append(h.subscribers[:i-deleted], h.subscribers[i-deleted+1:]...) + deleted++ + default: + } } }