diff --git a/queue.go b/queue.go index 1df629f..c93a5c9 100644 --- a/queue.go +++ b/queue.go @@ -12,10 +12,11 @@ var ( ) func (q *Queue) Push(msg Message) { - saver <- Payload{ + p := Payload{ Queue: q, Message: msg, } + Persist(p) } func (q *Queue) TryFetch() (Message, bool) { diff --git a/storage.go b/storage.go index 63f58e0..444e08c 100644 --- a/storage.go +++ b/storage.go @@ -16,8 +16,8 @@ type ( ) var ( - storage = cabinet.New() - saver = make(chan Payload, 1000) + storage = cabinet.New() + payloads = make(chan Payload, 1000) ) func NewKey(queue string, index uint) Key { @@ -33,9 +33,13 @@ func SetupStorage() { } } +func Persist(p Payload) { + payloads <- p +} + func PersistMessages() { for { - p := <-saver + p := <-payloads p.Queue.Counter.Write(func(i uint) bool { key := NewKey(p.Queue.Name, i) @@ -44,7 +48,7 @@ func PersistMessages() { Error(err, "Failed to write %d bytes to record '%s'", len(p.Message), key) } - return (err != nil) + return (err == nil) }) } }