diff --git a/hub/hub.go b/hub/hub.go index 9388f6d..445e1fc 100644 --- a/hub/hub.go +++ b/hub/hub.go @@ -37,6 +37,7 @@ func New(st *storage.Storage) *Hub { func (h *Hub) Pub(queue string, msg []byte) bool { for _, s := range h.subscribers { if ok := s.Need(queue); ok { + // Check if subscription is already served select { case <-s.Done(): continue @@ -77,11 +78,11 @@ func (h *Hub) Flush(queues []string) []MessageDump { return messages } -func (h *Hub) Info() map[string]map[string]uint { - info := make(map[string]map[string]uint) +func (h *Hub) Info() map[string]map[string]int64 { + info := make(map[string]map[string]int64) for queue, size := range h.storage.QueueSizes() { - info[queue] = map[string]uint{ + info[queue] = map[string]int64{ "messages": size, "subscriptions": 0, } @@ -89,7 +90,7 @@ func (h *Hub) Info() map[string]map[string]uint { for _, sub := range h.subscribers { for _, queue := range sub.Queues { if _, ok := info[queue]; !ok { - info[queue] = map[string]uint{"messages": 0} + info[queue] = map[string]int64{"messages": 0} } if _, ok := info[queue]["subscriptions"]; !ok { info[queue]["subscriptions"] = 0 diff --git a/storage/counter.go b/storage/counter.go index 5a83bf1..2940c39 100644 --- a/storage/counter.go +++ b/storage/counter.go @@ -5,31 +5,31 @@ import ( ) const ( - maxIndex = ^uint(0) // Max unit value + maxIndex = ^int64(0) // Max unit value ) type ( // Counter is responsible for operating queue read and write indexes counter struct { - write uint // Number of the record last written to the queue - read uint // Number of the record last read from the queue + write int64 // Number of the record last written to the queue + read int64 // Number of the record last read from the queue // If write index is greater than read index then there are unread messages // If write index is less tham read index then max index was reached mutex sync.Mutex - stream chan uint + stream chan int64 streaming *sync.Cond } ) -func newCounter(wi, ri uint) *counter { +func newCounter(wi, ri int64) *counter { m := &sync.Mutex{} m.Lock() c := &counter{ write: wi, read: ri, - stream: make(chan uint), + stream: make(chan int64), streaming: sync.NewCond(m), } @@ -38,17 +38,20 @@ func newCounter(wi, ri uint) *counter { return c } -func (c *counter) tryWrite(fn func(i uint) bool) { +func (c *counter) tryWrite(fn func(i int64) bool) { c.mutex.Lock() defer c.mutex.Unlock() if ok := fn(c.write + 1); ok { - c.write++ + if c.write++; c.write < 0 { + c.write = 0 + } + c.streaming.Signal() } } -func (c *counter) distance() uint { +func (c *counter) distance() int64 { d := c.write - c.read if d < 0 { d += maxIndex @@ -62,7 +65,12 @@ func (c *counter) increment() { c.streaming.Wait() } - c.stream <- c.read + 1 - c.read++ + next := c.read + 1 + if next < 0 { + next = 0 + } + + c.stream <- next + c.read = next } } diff --git a/storage/storage.go b/storage/storage.go index 64af24f..8b4e473 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -43,7 +43,7 @@ func (s *Storage) Get(queue string, done <-chan struct{}) (message []byte, ok bo return } - var index uint + var index int64 select { case index = <-s.counters[queue].stream: case <-done: @@ -68,7 +68,7 @@ func (s *Storage) Put(queue string, message []byte) (err error) { s.counters[queue] = newCounter(0, 0) } - s.counters[queue].tryWrite(func(index uint) bool { + s.counters[queue].tryWrite(func(index int64) bool { key := makeKey(queue, index) err = s.kyoto.Set(key, message) @@ -92,8 +92,8 @@ func (s *Storage) Flush(queue string) (messages [][]byte) { return } -func (s *Storage) QueueSizes() map[string]uint { - info := make(map[string]uint) +func (s *Storage) QueueSizes() map[string]int64 { + info := make(map[string]int64) for queue, c := range s.counters { info[queue] = c.distance() @@ -136,9 +136,9 @@ func (s *Storage) Close() (err error) { // State func (s *Storage) saveState() (err error) { - state := make(map[string]map[string]uint) + state := make(map[string]map[string]int64) for queue, ctr := range s.counters { - state[queue] = map[string]uint{ + state[queue] = map[string]int64{ "wi": ctr.write, "ri": ctr.read, } @@ -153,7 +153,7 @@ func (s *Storage) saveState() (err error) { func (s *Storage) loadState() (err error) { var ( jsn []byte - state = make(map[string]map[string]uint) + state = make(map[string]map[string]int64) ) if jsn, err = s.kyoto.Get([]byte(stateMetaKey)); err != nil { @@ -186,7 +186,9 @@ func (s *Storage) keepStatePersisted() { } } -func makeKey(queue string, index uint) []byte { - // TODO: There should be a faster way - return []byte(strings.Join([]string{queue, strconv.FormatUint(uint64(index), 10)}, "_")) +func makeKey(queue string, index int64) []byte { + return []byte(strings.Join([]string{ + queue, + strconv.FormatInt(index, 10), + }, "_")) }