1
0
Fork 0

Fix counter increment condition

This commit is contained in:
Gregory Eremin 2014-07-12 17:59:15 +07:00
parent da544b85b5
commit bd561cd73a
2 changed files with 10 additions and 5 deletions

View File

@ -12,10 +12,11 @@ var (
) )
func (q *Queue) Push(msg Message) { func (q *Queue) Push(msg Message) {
saver <- Payload{ p := Payload{
Queue: q, Queue: q,
Message: msg, Message: msg,
} }
Persist(p)
} }
func (q *Queue) TryFetch() (Message, bool) { func (q *Queue) TryFetch() (Message, bool) {

View File

@ -16,8 +16,8 @@ type (
) )
var ( var (
storage = cabinet.New() storage = cabinet.New()
saver = make(chan Payload, 1000) payloads = make(chan Payload, 1000)
) )
func NewKey(queue string, index uint) Key { func NewKey(queue string, index uint) Key {
@ -33,9 +33,13 @@ func SetupStorage() {
} }
} }
func Persist(p Payload) {
payloads <- p
}
func PersistMessages() { func PersistMessages() {
for { for {
p := <-saver p := <-payloads
p.Queue.Counter.Write(func(i uint) bool { p.Queue.Counter.Write(func(i uint) bool {
key := NewKey(p.Queue.Name, i) key := NewKey(p.Queue.Name, i)
@ -44,7 +48,7 @@ func PersistMessages() {
Error(err, "Failed to write %d bytes to record '%s'", len(p.Message), key) Error(err, "Failed to write %d bytes to record '%s'", len(p.Message), key)
} }
return (err != nil) return (err == nil)
}) })
} }
} }