1
0
Fork 0

Upgraded Counter

This commit is contained in:
Gregory Eremin 2014-07-17 00:45:26 +07:00
parent 977f53ae5e
commit 4a597c15bd
1 changed files with 15 additions and 13 deletions

View File

@ -18,20 +18,22 @@ type (
mutex sync.Mutex mutex sync.Mutex
stream chan uint stream chan uint
streaming bool streaming *sync.Cond
} }
) )
func NewCounter(wi, ri uint) Counter { func NewCounter(wi, ri uint) *Counter {
c := Counter{ m := &sync.Mutex{}
m.Lock()
c := &Counter{
WriteIndex: wi, WriteIndex: wi,
ReadIndex: ri, ReadIndex: ri,
stream: make(chan uint), stream: make(chan uint),
streaming: false, streaming: sync.NewCond(m),
}
if c.Distance() > 0 {
go c.Stream()
} }
go c.Stream()
return c return c
} }
@ -42,9 +44,7 @@ func (c *Counter) Write(proc func(i uint) bool) {
ok := proc(c.WriteIndex + 1) ok := proc(c.WriteIndex + 1)
if ok { if ok {
c.WriteIndex++ c.WriteIndex++
if !c.streaming { c.streaming.Signal()
go c.Stream()
}
} }
} }
@ -61,10 +61,12 @@ func (c *Counter) Distance() uint {
} }
func (c *Counter) Stream() { func (c *Counter) Stream() {
c.streaming = true for {
for c.WriteIndex > c.ReadIndex { if c.Distance() == 0 {
c.streaming.Wait()
}
c.stream <- c.ReadIndex + 1 c.stream <- c.ReadIndex + 1
c.ReadIndex++ c.ReadIndex++
} }
c.streaming = false
} }