2014-07-10 12:19:39 +00:00
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
|
|
|
"github.com/ezotrank/cabinetgo"
|
2014-07-12 10:42:26 +00:00
|
|
|
"strconv"
|
|
|
|
"strings"
|
2014-07-10 12:19:39 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
type (
|
2014-07-12 10:42:26 +00:00
|
|
|
Message []byte
|
|
|
|
Key []byte
|
2014-07-10 12:19:39 +00:00
|
|
|
Payload struct {
|
|
|
|
Queue *Queue
|
|
|
|
Message Message
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
|
|
storage = cabinet.New()
|
|
|
|
saver = make(chan Payload, 1000)
|
|
|
|
)
|
|
|
|
|
2014-07-12 10:42:26 +00:00
|
|
|
func NewKey(queue string, index uint) Key {
|
|
|
|
istr := strconv.FormatUint(uint64(index), 10)
|
|
|
|
key := strings.Join([]string{queue, istr}, "_")
|
|
|
|
return Key(key)
|
|
|
|
}
|
|
|
|
|
2014-07-10 12:19:39 +00:00
|
|
|
func SetupStorage() {
|
|
|
|
err := storage.Open(cfg.Storage, cabinet.KCOWRITER|cabinet.KCOCREATE)
|
|
|
|
if err != nil {
|
2014-07-12 10:42:26 +00:00
|
|
|
Error(err, "Failed to open database '%s'", cfg.Storage)
|
2014-07-10 12:19:39 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func PersistMessages() {
|
|
|
|
for {
|
2014-07-12 10:42:04 +00:00
|
|
|
p := <-saver
|
2014-07-10 12:19:39 +00:00
|
|
|
|
2014-07-12 10:42:04 +00:00
|
|
|
p.Queue.Counter.Write(func(i uint) bool {
|
|
|
|
key := NewKey(p.Queue.Name, i)
|
|
|
|
err := storage.Set(key, p.Message)
|
|
|
|
if err != nil {
|
|
|
|
Error(err, "Failed to write %d bytes to record '%s'", len(p.Message), key)
|
|
|
|
}
|
|
|
|
|
|
|
|
return (err != nil)
|
|
|
|
})
|
2014-07-10 12:19:39 +00:00
|
|
|
}
|
|
|
|
}
|