77 lines
1.2 KiB
Go
77 lines
1.2 KiB
Go
package storage
|
|
|
|
import (
|
|
"sync"
|
|
)
|
|
|
|
const (
|
|
maxIndex = ^int64(0) // Max unit value
|
|
)
|
|
|
|
type (
|
|
// Counter is responsible for operating queue read and write indexes
|
|
counter struct {
|
|
write int64 // Number of the record last written to the queue
|
|
read int64 // Number of the record last read from the queue
|
|
// If write index is greater than read index then there are unread messages
|
|
// If write index is less tham read index then max index was reached
|
|
|
|
mutex sync.Mutex
|
|
stream chan int64
|
|
streaming *sync.Cond
|
|
}
|
|
)
|
|
|
|
func newCounter(wi, ri int64) *counter {
|
|
m := &sync.Mutex{}
|
|
m.Lock()
|
|
|
|
c := &counter{
|
|
write: wi,
|
|
read: ri,
|
|
stream: make(chan int64),
|
|
streaming: sync.NewCond(m),
|
|
}
|
|
|
|
go c.increment()
|
|
|
|
return c
|
|
}
|
|
|
|
func (c *counter) tryWrite(fn func(i int64) bool) {
|
|
c.mutex.Lock()
|
|
defer c.mutex.Unlock()
|
|
|
|
if ok := fn(c.write + 1); ok {
|
|
if c.write++; c.write < 0 {
|
|
c.write = 0
|
|
}
|
|
|
|
c.streaming.Signal()
|
|
}
|
|
}
|
|
|
|
func (c *counter) distance() int64 {
|
|
d := c.write - c.read
|
|
if d < 0 {
|
|
d += maxIndex
|
|
}
|
|
return d
|
|
}
|
|
|
|
func (c *counter) increment() {
|
|
for {
|
|
if c.distance() == 0 {
|
|
c.streaming.Wait()
|
|
}
|
|
|
|
next := c.read + 1
|
|
if next < 0 {
|
|
next = 0
|
|
}
|
|
|
|
c.stream <- next
|
|
c.read = next
|
|
}
|
|
}
|