1
0
Fork 0

Convert numeric types to int64

This commit is contained in:
Gregory Eremin 2015-01-27 01:03:51 +07:00
parent e11882c556
commit c06d5d4f94
3 changed files with 36 additions and 25 deletions

View File

@ -37,6 +37,7 @@ func New(st *storage.Storage) *Hub {
func (h *Hub) Pub(queue string, msg []byte) bool { func (h *Hub) Pub(queue string, msg []byte) bool {
for _, s := range h.subscribers { for _, s := range h.subscribers {
if ok := s.Need(queue); ok { if ok := s.Need(queue); ok {
// Check if subscription is already served
select { select {
case <-s.Done(): case <-s.Done():
continue continue
@ -77,11 +78,11 @@ func (h *Hub) Flush(queues []string) []MessageDump {
return messages return messages
} }
func (h *Hub) Info() map[string]map[string]uint { func (h *Hub) Info() map[string]map[string]int64 {
info := make(map[string]map[string]uint) info := make(map[string]map[string]int64)
for queue, size := range h.storage.QueueSizes() { for queue, size := range h.storage.QueueSizes() {
info[queue] = map[string]uint{ info[queue] = map[string]int64{
"messages": size, "messages": size,
"subscriptions": 0, "subscriptions": 0,
} }
@ -89,7 +90,7 @@ func (h *Hub) Info() map[string]map[string]uint {
for _, sub := range h.subscribers { for _, sub := range h.subscribers {
for _, queue := range sub.Queues { for _, queue := range sub.Queues {
if _, ok := info[queue]; !ok { if _, ok := info[queue]; !ok {
info[queue] = map[string]uint{"messages": 0} info[queue] = map[string]int64{"messages": 0}
} }
if _, ok := info[queue]["subscriptions"]; !ok { if _, ok := info[queue]["subscriptions"]; !ok {
info[queue]["subscriptions"] = 0 info[queue]["subscriptions"] = 0

View File

@ -5,31 +5,31 @@ import (
) )
const ( const (
maxIndex = ^uint(0) // Max unit value maxIndex = ^int64(0) // Max unit value
) )
type ( type (
// Counter is responsible for operating queue read and write indexes // Counter is responsible for operating queue read and write indexes
counter struct { counter struct {
write uint // Number of the record last written to the queue write int64 // Number of the record last written to the queue
read uint // Number of the record last read from the queue read int64 // Number of the record last read from the queue
// If write index is greater than read index then there are unread messages // If write index is greater than read index then there are unread messages
// If write index is less tham read index then max index was reached // If write index is less tham read index then max index was reached
mutex sync.Mutex mutex sync.Mutex
stream chan uint stream chan int64
streaming *sync.Cond streaming *sync.Cond
} }
) )
func newCounter(wi, ri uint) *counter { func newCounter(wi, ri int64) *counter {
m := &sync.Mutex{} m := &sync.Mutex{}
m.Lock() m.Lock()
c := &counter{ c := &counter{
write: wi, write: wi,
read: ri, read: ri,
stream: make(chan uint), stream: make(chan int64),
streaming: sync.NewCond(m), streaming: sync.NewCond(m),
} }
@ -38,17 +38,20 @@ func newCounter(wi, ri uint) *counter {
return c return c
} }
func (c *counter) tryWrite(fn func(i uint) bool) { func (c *counter) tryWrite(fn func(i int64) bool) {
c.mutex.Lock() c.mutex.Lock()
defer c.mutex.Unlock() defer c.mutex.Unlock()
if ok := fn(c.write + 1); ok { if ok := fn(c.write + 1); ok {
c.write++ if c.write++; c.write < 0 {
c.write = 0
}
c.streaming.Signal() c.streaming.Signal()
} }
} }
func (c *counter) distance() uint { func (c *counter) distance() int64 {
d := c.write - c.read d := c.write - c.read
if d < 0 { if d < 0 {
d += maxIndex d += maxIndex
@ -62,7 +65,12 @@ func (c *counter) increment() {
c.streaming.Wait() c.streaming.Wait()
} }
c.stream <- c.read + 1 next := c.read + 1
c.read++ if next < 0 {
next = 0
}
c.stream <- next
c.read = next
} }
} }

View File

@ -43,7 +43,7 @@ func (s *Storage) Get(queue string, done <-chan struct{}) (message []byte, ok bo
return return
} }
var index uint var index int64
select { select {
case index = <-s.counters[queue].stream: case index = <-s.counters[queue].stream:
case <-done: case <-done:
@ -68,7 +68,7 @@ func (s *Storage) Put(queue string, message []byte) (err error) {
s.counters[queue] = newCounter(0, 0) s.counters[queue] = newCounter(0, 0)
} }
s.counters[queue].tryWrite(func(index uint) bool { s.counters[queue].tryWrite(func(index int64) bool {
key := makeKey(queue, index) key := makeKey(queue, index)
err = s.kyoto.Set(key, message) err = s.kyoto.Set(key, message)
@ -92,8 +92,8 @@ func (s *Storage) Flush(queue string) (messages [][]byte) {
return return
} }
func (s *Storage) QueueSizes() map[string]uint { func (s *Storage) QueueSizes() map[string]int64 {
info := make(map[string]uint) info := make(map[string]int64)
for queue, c := range s.counters { for queue, c := range s.counters {
info[queue] = c.distance() info[queue] = c.distance()
@ -136,9 +136,9 @@ func (s *Storage) Close() (err error) {
// State // State
func (s *Storage) saveState() (err error) { func (s *Storage) saveState() (err error) {
state := make(map[string]map[string]uint) state := make(map[string]map[string]int64)
for queue, ctr := range s.counters { for queue, ctr := range s.counters {
state[queue] = map[string]uint{ state[queue] = map[string]int64{
"wi": ctr.write, "wi": ctr.write,
"ri": ctr.read, "ri": ctr.read,
} }
@ -153,7 +153,7 @@ func (s *Storage) saveState() (err error) {
func (s *Storage) loadState() (err error) { func (s *Storage) loadState() (err error) {
var ( var (
jsn []byte jsn []byte
state = make(map[string]map[string]uint) state = make(map[string]map[string]int64)
) )
if jsn, err = s.kyoto.Get([]byte(stateMetaKey)); err != nil { if jsn, err = s.kyoto.Get([]byte(stateMetaKey)); err != nil {
@ -186,7 +186,9 @@ func (s *Storage) keepStatePersisted() {
} }
} }
func makeKey(queue string, index uint) []byte { func makeKey(queue string, index int64) []byte {
// TODO: There should be a faster way return []byte(strings.Join([]string{
return []byte(strings.Join([]string{queue, strconv.FormatUint(uint64(index), 10)}, "_")) queue,
strconv.FormatInt(index, 10),
}, "_"))
} }