diff --git a/queue.go b/queue.go index 1758c10..1df629f 100644 --- a/queue.go +++ b/queue.go @@ -1,9 +1,5 @@ package main -import ( - "github.com/stvp/rollbar" -) - type ( Queue struct { Name string @@ -16,26 +12,35 @@ var ( ) func (q *Queue) Push(msg Message) { - saver <- Payload{Queue: q, Message: msg} + saver <- Payload{ + Queue: q, + Message: msg, + } } -func (q *Queue) Fetch() (Message, error) { - i := q.Counter.Next() +func (q *Queue) TryFetch() (Message, bool) { + if q.Counter.Distance() > 0 { + return q.Fetch() + } else { + return Message{}, false + } +} + +func (q *Queue) Fetch() (Message, bool) { + i := q.Counter.Read() key := NewKey(q.Name, i) msg, err := storage.Get(key) if err != nil { - rollbar.Error("error", err) - return msg, err + Error(err, "Failed to read record '%s'", key) } - defer func() { - if err := storage.Remove(key); err != nil { - rollbar.Error("error", err) - } - }() + err = storage.Remove(key) + if err != nil { + Error(err, "Failed to delete record '%s'", key) + } - return msg, nil + return msg, (err == nil) } func GetQueue(name string) *Queue { @@ -46,5 +51,8 @@ func GetQueue(name string) *Queue { } func RegisterQueue(name string, wi, ri uint) { - queues[name] = &Queue{Name: name, Counter: NewCounter(wi, ri)} + queues[name] = &Queue{ + Name: name, + Counter: NewCounter(wi, ri), + } } diff --git a/state.go b/state.go index 1b4b670..322621c 100644 --- a/state.go +++ b/state.go @@ -20,8 +20,8 @@ func SaveState() { state := make(ServerState) for _, q := range queues { state[q.Name] = QueueState{ - "wi": q.Counter.Write, - "ri": q.Counter.Read, + "wi": q.Counter.WriteIndex, + "ri": q.Counter.ReadIndex, } }