1
0
Fork 0
burlesque/hub/hub.go

69 lines
1.0 KiB
Go
Raw Normal View History

2014-09-10 11:44:38 +00:00
package hub
import (
2014-09-11 10:27:03 +00:00
"sync"
2014-09-10 11:44:38 +00:00
"github.com/KosyanMedia/burlesque/storage"
)
type (
Hub struct {
storage *storage.Storage
2014-09-10 12:52:18 +00:00
subscribers []*Subscription
2014-09-11 10:27:03 +00:00
lock sync.Mutex
2014-09-10 11:44:38 +00:00
}
)
2014-09-10 12:52:18 +00:00
func New(st *storage.Storage) *Hub {
return &Hub{
storage: st,
subscribers: []*Subscription{},
}
2014-09-10 11:44:38 +00:00
}
2014-09-10 12:52:18 +00:00
func (h *Hub) Pub(queue string, msg []byte) bool {
for _, s := range h.subscribers {
2014-09-11 10:27:03 +00:00
if ok := s.Need(queue); ok {
2014-09-10 12:52:18 +00:00
select {
case <-s.Done():
continue
default:
}
2014-09-11 10:27:03 +00:00
if ok := s.Send(Result{queue, msg}); ok {
2014-09-10 12:52:18 +00:00
return true
}
}
}
err := h.storage.Put(queue, msg)
return (err == nil)
2014-09-10 11:44:38 +00:00
}
2014-09-10 12:52:18 +00:00
func (h *Hub) Sub(s *Subscription) {
2014-09-11 10:27:03 +00:00
for _, q := range s.Queues {
if msg, ok := h.storage.Get(q); ok {
s.Send(Result{q, msg})
return
}
}
h.subscribers = append(h.subscribers, s)
}
func (h *Hub) cleanup() {
h.lock.Lock()
defer h.lock.Unlock()
deleted := 0
for i, s := range h.subscribers {
select {
case <-s.Done():
h.subscribers = append(h.subscribers[:i-deleted], h.subscribers[i-deleted+1:]...)
deleted++
default:
}
2014-09-10 12:52:18 +00:00
}
2014-09-10 11:44:38 +00:00
}