diff --git a/hub/hub.go b/hub/hub.go index 235907f..4813135 100644 --- a/hub/hub.go +++ b/hub/hub.go @@ -47,9 +47,9 @@ func (h *Hub) Pub(queue string, msg []byte) bool { } func (h *Hub) Sub(s *Subscription) { - for _, q := range s.Queues { - if msg, ok := h.storage.Get(q); ok { - s.Send(Message{q, msg}) + for _, queue := range s.Queues { + if msg, ok := h.storage.Get(queue, s.Done()); ok { + s.Send(Message{queue, msg}) return } } diff --git a/storage/storage.go b/storage/storage.go index f2f8beb..1fea3b3 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -35,7 +35,7 @@ func New(path string) (s *Storage, err error) { return } -func (s *Storage) Get(queue string) (message []byte, ok bool) { +func (s *Storage) Get(queue string, done <-chan struct{}) (message []byte, ok bool) { if _, exist := s.counters[queue]; !exist { return } @@ -46,7 +46,7 @@ func (s *Storage) Get(queue string) (message []byte, ok bool) { var index uint select { case index = <-s.counters[queue].stream: - default: + case <-done: return }