From 959bc56bc37105abf5a140b6724b1632aa85b7c0 Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Wed, 24 Sep 2014 19:34:39 +0400 Subject: [PATCH] Pass done channel to counter --- hub/hub.go | 6 +++--- storage/storage.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/hub/hub.go b/hub/hub.go index 235907f..4813135 100644 --- a/hub/hub.go +++ b/hub/hub.go @@ -47,9 +47,9 @@ func (h *Hub) Pub(queue string, msg []byte) bool { } func (h *Hub) Sub(s *Subscription) { - for _, q := range s.Queues { - if msg, ok := h.storage.Get(q); ok { - s.Send(Message{q, msg}) + for _, queue := range s.Queues { + if msg, ok := h.storage.Get(queue, s.Done()); ok { + s.Send(Message{queue, msg}) return } } diff --git a/storage/storage.go b/storage/storage.go index f2f8beb..1fea3b3 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -35,7 +35,7 @@ func New(path string) (s *Storage, err error) { return } -func (s *Storage) Get(queue string) (message []byte, ok bool) { +func (s *Storage) Get(queue string, done <-chan struct{}) (message []byte, ok bool) { if _, exist := s.counters[queue]; !exist { return } @@ -46,7 +46,7 @@ func (s *Storage) Get(queue string) (message []byte, ok bool) { var index uint select { case index = <-s.counters[queue].stream: - default: + case <-done: return }