1
0
Fork 0
burlesque/queue.go

76 lines
1.2 KiB
Go
Raw Normal View History

2014-07-10 12:19:39 +00:00
package main
type (
Queue struct {
Name string
2014-07-16 17:45:48 +00:00
Counter *Counter
2014-07-10 12:19:39 +00:00
}
)
var (
queues = make(map[string]*Queue)
)
2014-07-16 17:47:08 +00:00
func (q *Queue) Push(msg Message) bool {
var err error
q.Counter.Write(func(i uint) bool {
key := NewKey(q.Name, i)
err = storage.Set(key, msg)
if err != nil {
Error(err, "Failed to write %d bytes to record '%s'", len(msg), key)
}
return (err == nil)
})
return (err == nil)
2014-07-10 12:19:39 +00:00
}
2014-07-16 17:50:05 +00:00
func (q *Queue) TryFetch(abort chan bool) (Message, bool) {
2014-07-12 10:40:02 +00:00
if q.Counter.Distance() > 0 {
2014-07-16 17:50:05 +00:00
return q.Fetch(abort)
2014-07-12 10:40:02 +00:00
} else {
return Message{}, false
}
}
2014-07-16 17:50:05 +00:00
func (q *Queue) Fetch(abort chan bool) (Message, bool) {
2014-07-16 18:27:30 +00:00
var i uint
2014-07-10 12:19:39 +00:00
2014-07-16 18:27:30 +00:00
select {
case i = <-q.Counter.Read:
case <-abort:
return Message{}, false
}
key := NewKey(q.Name, i)
2014-07-10 12:19:39 +00:00
msg, err := storage.Get(key)
if err != nil {
2014-07-12 10:40:02 +00:00
Error(err, "Failed to read record '%s'", key)
return msg, false
2014-07-10 12:19:39 +00:00
}
2014-07-12 10:40:02 +00:00
err = storage.Remove(key)
if err != nil {
Error(err, "Failed to delete record '%s'", key)
return msg, false
2014-07-12 10:40:02 +00:00
}
2014-07-10 12:19:39 +00:00
return msg, true
2014-07-10 12:19:39 +00:00
}
func GetQueue(name string) *Queue {
if _, ok := queues[name]; !ok {
RegisterQueue(name, 0, 0)
}
return queues[name]
}
func RegisterQueue(name string, wi, ri uint) {
2014-07-12 10:40:02 +00:00
queues[name] = &Queue{
Name: name,
Counter: NewCounter(wi, ri),
}
2014-07-10 12:19:39 +00:00
}