Storage subpackage
This commit is contained in:
parent
49acb0076f
commit
5d85154030
|
@ -0,0 +1,68 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
maxIndex = ^uint(0) // Max unit value
|
||||
)
|
||||
|
||||
type (
|
||||
// Counter is responsible for operating queue read and write indexes
|
||||
counter struct {
|
||||
write uint // Number of the record last written to the queue
|
||||
read uint // Number of the record last read from the queue
|
||||
// If write index is greater than read index then there are unread messages
|
||||
// If write index is less tham read index then max index was reached
|
||||
|
||||
mutex sync.Mutex
|
||||
stream chan uint
|
||||
streaming *sync.Cond
|
||||
}
|
||||
)
|
||||
|
||||
func newCounter(wi, ri uint) (c *counter) {
|
||||
m := &sync.Mutex{}
|
||||
m.Lock()
|
||||
|
||||
c = &counter{
|
||||
write: wi,
|
||||
read: ri,
|
||||
stream: make(chan uint),
|
||||
streaming: sync.NewCond(m),
|
||||
}
|
||||
|
||||
go c.increment()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *counter) tryWrite(fn func(i uint) bool) {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
if ok := fn(c.write + 1); ok {
|
||||
c.write++
|
||||
c.streaming.Signal()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *counter) distance() uint {
|
||||
d := c.write - c.read
|
||||
if d < 0 {
|
||||
d += maxIndex
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
func (c *counter) increment() {
|
||||
for {
|
||||
if c.distance() == 0 {
|
||||
c.streaming.Wait()
|
||||
}
|
||||
|
||||
c.stream <- c.read + 1
|
||||
c.read++
|
||||
}
|
||||
}
|
|
@ -0,0 +1,144 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"bitbucket.org/ww/cabinet"
|
||||
)
|
||||
|
||||
const (
|
||||
stateMetaKey = "state"
|
||||
stateSaveInterval = 1 // seconds
|
||||
)
|
||||
|
||||
type (
|
||||
Storage struct {
|
||||
kyoto *cabinet.KCDB
|
||||
counters map[string]*counter
|
||||
}
|
||||
)
|
||||
|
||||
func New(path string) (s *Storage, err error) {
|
||||
kyoto := cabinet.New()
|
||||
if err = kyoto.Open(path, cabinet.KCOWRITER|cabinet.KCOCREATE); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
s = &Storage{
|
||||
kyoto: kyoto,
|
||||
counters: make(map[string]*counter),
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Storage) Get(queue string, abort <-chan struct{}) (message []byte, ok bool) {
|
||||
if _, ok := s.counters[queue]; !ok {
|
||||
s.counters[queue] = newCounter(0, 0)
|
||||
}
|
||||
|
||||
var index uint
|
||||
|
||||
select {
|
||||
case index = <-s.counters[queue].stream:
|
||||
case <-abort:
|
||||
return
|
||||
}
|
||||
|
||||
key := makeKey(queue, index)
|
||||
message, err := s.kyoto.Get(key)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err = s.kyoto.Remove(key); err != nil {
|
||||
return
|
||||
}
|
||||
ok = true
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Storage) Put(queue string, message []byte) (err error) {
|
||||
if _, ok := s.counters[queue]; !ok {
|
||||
s.counters[queue] = newCounter(0, 0)
|
||||
}
|
||||
|
||||
s.counters[queue].tryWrite(func(index uint) bool {
|
||||
key := makeKey(queue, index)
|
||||
err = s.kyoto.Set(key, message)
|
||||
|
||||
return (err == nil)
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Storage) Close() (err error) {
|
||||
if err = s.kyoto.Sync(true); err != nil {
|
||||
return
|
||||
}
|
||||
err = s.kyoto.Close()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// State
|
||||
|
||||
func (s *Storage) saveState() (err error) {
|
||||
state := make(map[string]map[string]uint)
|
||||
for queue, ctr := range s.counters {
|
||||
state[queue] = map[string]uint{
|
||||
"wi": ctr.write,
|
||||
"ri": ctr.read,
|
||||
}
|
||||
}
|
||||
|
||||
jsn, _ := json.Marshal(state)
|
||||
err = s.kyoto.Set([]byte(stateMetaKey), jsn)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Storage) loadState() (err error) {
|
||||
var (
|
||||
jsn []byte
|
||||
state = make(map[string]map[string]uint)
|
||||
)
|
||||
|
||||
if jsn, err = s.kyoto.Get([]byte(stateMetaKey)); err != nil {
|
||||
return
|
||||
}
|
||||
if err = json.Unmarshal(jsn, &state); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for queue, meta := range state {
|
||||
s.counters[queue] = newCounter(meta["wi"], meta["ri"])
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Storage) keepStatePersisted() {
|
||||
t := time.NewTicker(stateSaveInterval * time.Second)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
if err := s.saveState(); err != nil {
|
||||
panic("Failed to persist state")
|
||||
}
|
||||
if err := s.kyoto.Sync(false); err != nil {
|
||||
panic("Failed to sync storage")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func makeKey(queue string, index uint) []byte {
|
||||
return []byte(strings.Join([]string{queue, strconv.FormatUint(uint64(index), 10)}, "_"))
|
||||
}
|
Loading…
Reference in New Issue