Hub cleanup

This commit is contained in:
Gregory Eremin 2014-09-11 14:27:03 +04:00
parent 76403807db
commit 638a61e8ab

View File

@ -1,6 +1,8 @@
package hub package hub
import ( import (
"sync"
"github.com/KosyanMedia/burlesque/storage" "github.com/KosyanMedia/burlesque/storage"
) )
@ -8,6 +10,7 @@ type (
Hub struct { Hub struct {
storage *storage.Storage storage *storage.Storage
subscribers []*Subscription subscribers []*Subscription
lock sync.Mutex
} }
) )
@ -20,15 +23,14 @@ func New(st *storage.Storage) *Hub {
func (h *Hub) Pub(queue string, msg []byte) bool { func (h *Hub) Pub(queue string, msg []byte) bool {
for _, s := range h.subscribers { for _, s := range h.subscribers {
if s.Queue == queue { if ok := s.Need(queue); ok {
select { select {
case <-s.Done(): case <-s.Done():
// FIXME: Cleanup needed
continue continue
default: default:
} }
if ok := s.Send(msg); ok { if ok := s.Send(Result{queue, msg}); ok {
return true return true
} }
} }
@ -40,10 +42,27 @@ func (h *Hub) Pub(queue string, msg []byte) bool {
} }
func (h *Hub) Sub(s *Subscription) { func (h *Hub) Sub(s *Subscription) {
if msg, ok := h.storage.Get(s.Queue); ok { for _, q := range s.Queues {
s.Send(msg) if msg, ok := h.storage.Get(q); ok {
} else { s.Send(Result{q, msg})
// FIXME: Race condition return
}
}
h.subscribers = append(h.subscribers, s) h.subscribers = append(h.subscribers, s)
} }
func (h *Hub) cleanup() {
h.lock.Lock()
defer h.lock.Unlock()
deleted := 0
for i, s := range h.subscribers {
select {
case <-s.Done():
h.subscribers = append(h.subscribers[:i-deleted], h.subscribers[i-deleted+1:]...)
deleted++
default:
}
}
} }