1
0
Fork 0
burlesque/queue.go

76 lines
1.2 KiB
Go
Raw Normal View History

2014-07-10 12:19:39 +00:00
package main
type (
2014-09-09 08:20:40 +00:00
queue struct {
name string
counter *counter
2014-07-10 12:19:39 +00:00
}
)
var (
2014-09-09 08:20:40 +00:00
queues = make(map[string]*queue)
2014-07-10 12:19:39 +00:00
)
2014-09-09 08:20:40 +00:00
func (q *queue) push(msg message) bool {
2014-07-16 17:47:08 +00:00
var err error
2014-09-09 08:20:40 +00:00
q.counter.write(func(i uint) bool {
key := newKey(q.name, i)
2014-07-16 17:47:08 +00:00
err = storage.Set(key, msg)
if err != nil {
2014-09-09 08:20:40 +00:00
alert(err, "Failed to write %d bytes to record '%s'", len(msg), key)
2014-07-16 17:47:08 +00:00
}
return (err == nil)
})
return (err == nil)
2014-07-10 12:19:39 +00:00
}
2014-09-09 08:20:40 +00:00
func (q *queue) tryFetch(abort chan bool) (message, bool) {
if q.counter.distance() > 0 {
return q.fetch(abort)
2014-07-12 10:40:02 +00:00
} else {
2014-09-09 08:20:40 +00:00
return message{}, false
2014-07-12 10:40:02 +00:00
}
}
2014-09-09 08:20:40 +00:00
func (q *queue) fetch(abort chan bool) (message, bool) {
2014-07-16 18:27:30 +00:00
var i uint
2014-07-10 12:19:39 +00:00
2014-07-16 18:27:30 +00:00
select {
2014-09-09 08:20:40 +00:00
case i = <-q.counter.read:
2014-07-16 18:27:30 +00:00
case <-abort:
2014-09-09 08:20:40 +00:00
return message{}, false
2014-07-16 18:27:30 +00:00
}
2014-09-09 08:20:40 +00:00
k := newKey(q.name, i)
msg, err := storage.Get(k)
2014-07-10 12:19:39 +00:00
if err != nil {
2014-09-09 08:20:40 +00:00
alert(err, "Failed to read record '%s'", k)
return msg, false
2014-07-10 12:19:39 +00:00
}
2014-09-09 08:20:40 +00:00
err = storage.Remove(k)
2014-07-12 10:40:02 +00:00
if err != nil {
2014-09-09 08:20:40 +00:00
alert(err, "Failed to delete record '%s'", k)
return msg, false
2014-07-12 10:40:02 +00:00
}
2014-07-10 12:19:39 +00:00
return msg, true
2014-07-10 12:19:39 +00:00
}
2014-09-09 08:20:40 +00:00
func getQueue(name string) *queue {
2014-07-10 12:19:39 +00:00
if _, ok := queues[name]; !ok {
2014-09-09 08:20:40 +00:00
registerQueue(name, 0, 0)
2014-07-10 12:19:39 +00:00
}
return queues[name]
}
2014-09-09 08:20:40 +00:00
func registerQueue(name string, wi, ri uint) {
queues[name] = &queue{
name: name,
counter: newCounter(wi, ri),
2014-07-12 10:40:02 +00:00
}
2014-07-10 12:19:39 +00:00
}