diff --git a/counter.go b/counter.go index 5971fc4..b0acfaf 100644 --- a/counter.go +++ b/counter.go @@ -18,20 +18,22 @@ type ( mutex sync.Mutex stream chan uint - streaming bool + streaming *sync.Cond } ) -func NewCounter(wi, ri uint) Counter { - c := Counter{ +func NewCounter(wi, ri uint) *Counter { + m := &sync.Mutex{} + m.Lock() + + c := &Counter{ WriteIndex: wi, ReadIndex: ri, stream: make(chan uint), - streaming: false, - } - if c.Distance() > 0 { - go c.Stream() + streaming: sync.NewCond(m), } + + go c.Stream() return c } @@ -42,9 +44,7 @@ func (c *Counter) Write(proc func(i uint) bool) { ok := proc(c.WriteIndex + 1) if ok { c.WriteIndex++ - if !c.streaming { - go c.Stream() - } + c.streaming.Signal() } } @@ -61,10 +61,12 @@ func (c *Counter) Distance() uint { } func (c *Counter) Stream() { - c.streaming = true - for c.WriteIndex > c.ReadIndex { + for { + if c.Distance() == 0 { + c.streaming.Wait() + } + c.stream <- c.ReadIndex + 1 c.ReadIndex++ } - c.streaming = false }