1
0
Fork 0
burlesque/storage/counter.go

77 lines
1.2 KiB
Go
Raw Permalink Normal View History

2014-09-09 13:40:41 +00:00
package storage
import (
"sync"
)
const (
2015-01-26 18:03:51 +00:00
maxIndex = ^int64(0) // Max unit value
2014-09-09 13:40:41 +00:00
)
type (
// Counter is responsible for operating queue read and write indexes
counter struct {
2015-01-26 18:03:51 +00:00
write int64 // Number of the record last written to the queue
read int64 // Number of the record last read from the queue
2014-09-09 13:40:41 +00:00
// If write index is greater than read index then there are unread messages
// If write index is less tham read index then max index was reached
mutex sync.Mutex
2015-01-26 18:03:51 +00:00
stream chan int64
2014-09-09 13:40:41 +00:00
streaming *sync.Cond
}
)
2015-01-26 18:03:51 +00:00
func newCounter(wi, ri int64) *counter {
2014-09-09 13:40:41 +00:00
m := &sync.Mutex{}
m.Lock()
c := &counter{
2014-09-09 13:40:41 +00:00
write: wi,
read: ri,
2015-01-26 18:03:51 +00:00
stream: make(chan int64),
2014-09-09 13:40:41 +00:00
streaming: sync.NewCond(m),
}
go c.increment()
return c
2014-09-09 13:40:41 +00:00
}
2015-01-26 18:03:51 +00:00
func (c *counter) tryWrite(fn func(i int64) bool) {
2014-09-09 13:40:41 +00:00
c.mutex.Lock()
defer c.mutex.Unlock()
if ok := fn(c.write + 1); ok {
2015-01-26 18:03:51 +00:00
if c.write++; c.write < 0 {
c.write = 0
}
2014-09-09 13:40:41 +00:00
c.streaming.Signal()
}
}
2015-01-26 18:03:51 +00:00
func (c *counter) distance() int64 {
2014-09-09 13:40:41 +00:00
d := c.write - c.read
if d < 0 {
d += maxIndex
}
return d
}
func (c *counter) increment() {
for {
if c.distance() == 0 {
c.streaming.Wait()
}
2015-01-26 18:03:51 +00:00
next := c.read + 1
if next < 0 {
next = 0
}
c.stream <- next
c.read = next
2014-09-09 13:40:41 +00:00
}
}