1
0
Fork 0

Fix queue

This commit is contained in:
Gregory Eremin 2014-07-12 17:40:02 +07:00
parent 77d4f7f253
commit 31cd3431c3
2 changed files with 26 additions and 18 deletions

View File

@ -1,9 +1,5 @@
package main package main
import (
"github.com/stvp/rollbar"
)
type ( type (
Queue struct { Queue struct {
Name string Name string
@ -16,26 +12,35 @@ var (
) )
func (q *Queue) Push(msg Message) { func (q *Queue) Push(msg Message) {
saver <- Payload{Queue: q, Message: msg} saver <- Payload{
Queue: q,
Message: msg,
}
} }
func (q *Queue) Fetch() (Message, error) { func (q *Queue) TryFetch() (Message, bool) {
i := q.Counter.Next() if q.Counter.Distance() > 0 {
return q.Fetch()
} else {
return Message{}, false
}
}
func (q *Queue) Fetch() (Message, bool) {
i := q.Counter.Read()
key := NewKey(q.Name, i) key := NewKey(q.Name, i)
msg, err := storage.Get(key) msg, err := storage.Get(key)
if err != nil { if err != nil {
rollbar.Error("error", err) Error(err, "Failed to read record '%s'", key)
return msg, err
} }
defer func() { err = storage.Remove(key)
if err := storage.Remove(key); err != nil { if err != nil {
rollbar.Error("error", err) Error(err, "Failed to delete record '%s'", key)
} }
}()
return msg, nil return msg, (err == nil)
} }
func GetQueue(name string) *Queue { func GetQueue(name string) *Queue {
@ -46,5 +51,8 @@ func GetQueue(name string) *Queue {
} }
func RegisterQueue(name string, wi, ri uint) { func RegisterQueue(name string, wi, ri uint) {
queues[name] = &Queue{Name: name, Counter: NewCounter(wi, ri)} queues[name] = &Queue{
Name: name,
Counter: NewCounter(wi, ri),
}
} }

View File

@ -20,8 +20,8 @@ func SaveState() {
state := make(ServerState) state := make(ServerState)
for _, q := range queues { for _, q := range queues {
state[q.Name] = QueueState{ state[q.Name] = QueueState{
"wi": q.Counter.Write, "wi": q.Counter.WriteIndex,
"ri": q.Counter.Read, "ri": q.Counter.ReadIndex,
} }
} }