From 5d8515403007b5e10fbd162c9cfe5de664a73330 Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Tue, 9 Sep 2014 17:40:41 +0400 Subject: [PATCH] Storage subpackage --- storage/counter.go | 68 +++++++++++++++++++++ storage/storage.go | 144 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 212 insertions(+) create mode 100644 storage/counter.go create mode 100644 storage/storage.go diff --git a/storage/counter.go b/storage/counter.go new file mode 100644 index 0000000..2aab039 --- /dev/null +++ b/storage/counter.go @@ -0,0 +1,68 @@ +package storage + +import ( + "sync" +) + +const ( + maxIndex = ^uint(0) // Max unit value +) + +type ( + // Counter is responsible for operating queue read and write indexes + counter struct { + write uint // Number of the record last written to the queue + read uint // Number of the record last read from the queue + // If write index is greater than read index then there are unread messages + // If write index is less tham read index then max index was reached + + mutex sync.Mutex + stream chan uint + streaming *sync.Cond + } +) + +func newCounter(wi, ri uint) (c *counter) { + m := &sync.Mutex{} + m.Lock() + + c = &counter{ + write: wi, + read: ri, + stream: make(chan uint), + streaming: sync.NewCond(m), + } + + go c.increment() + + return +} + +func (c *counter) tryWrite(fn func(i uint) bool) { + c.mutex.Lock() + defer c.mutex.Unlock() + + if ok := fn(c.write + 1); ok { + c.write++ + c.streaming.Signal() + } +} + +func (c *counter) distance() uint { + d := c.write - c.read + if d < 0 { + d += maxIndex + } + return d +} + +func (c *counter) increment() { + for { + if c.distance() == 0 { + c.streaming.Wait() + } + + c.stream <- c.read + 1 + c.read++ + } +} diff --git a/storage/storage.go b/storage/storage.go new file mode 100644 index 0000000..5e85f2f --- /dev/null +++ b/storage/storage.go @@ -0,0 +1,144 @@ +package storage + +import ( + "encoding/json" + "strconv" + "strings" + "time" + + "bitbucket.org/ww/cabinet" +) + +const ( + stateMetaKey = "state" + stateSaveInterval = 1 // seconds +) + +type ( + Storage struct { + kyoto *cabinet.KCDB + counters map[string]*counter + } +) + +func New(path string) (s *Storage, err error) { + kyoto := cabinet.New() + if err = kyoto.Open(path, cabinet.KCOWRITER|cabinet.KCOCREATE); err != nil { + return + } + + s = &Storage{ + kyoto: kyoto, + counters: make(map[string]*counter), + } + + return +} + +func (s *Storage) Get(queue string, abort <-chan struct{}) (message []byte, ok bool) { + if _, ok := s.counters[queue]; !ok { + s.counters[queue] = newCounter(0, 0) + } + + var index uint + + select { + case index = <-s.counters[queue].stream: + case <-abort: + return + } + + key := makeKey(queue, index) + message, err := s.kyoto.Get(key) + if err != nil { + return + } + + if err = s.kyoto.Remove(key); err != nil { + return + } + ok = true + + return +} + +func (s *Storage) Put(queue string, message []byte) (err error) { + if _, ok := s.counters[queue]; !ok { + s.counters[queue] = newCounter(0, 0) + } + + s.counters[queue].tryWrite(func(index uint) bool { + key := makeKey(queue, index) + err = s.kyoto.Set(key, message) + + return (err == nil) + }) + + return +} + +func (s *Storage) Close() (err error) { + if err = s.kyoto.Sync(true); err != nil { + return + } + err = s.kyoto.Close() + + return +} + +// State + +func (s *Storage) saveState() (err error) { + state := make(map[string]map[string]uint) + for queue, ctr := range s.counters { + state[queue] = map[string]uint{ + "wi": ctr.write, + "ri": ctr.read, + } + } + + jsn, _ := json.Marshal(state) + err = s.kyoto.Set([]byte(stateMetaKey), jsn) + + return +} + +func (s *Storage) loadState() (err error) { + var ( + jsn []byte + state = make(map[string]map[string]uint) + ) + + if jsn, err = s.kyoto.Get([]byte(stateMetaKey)); err != nil { + return + } + if err = json.Unmarshal(jsn, &state); err != nil { + return + } + + for queue, meta := range state { + s.counters[queue] = newCounter(meta["wi"], meta["ri"]) + } + + return +} + +func (s *Storage) keepStatePersisted() { + t := time.NewTicker(stateSaveInterval * time.Second) + + for { + select { + case <-t.C: + if err := s.saveState(); err != nil { + panic("Failed to persist state") + } + if err := s.kyoto.Sync(false); err != nil { + panic("Failed to sync storage") + } + } + } +} + +func makeKey(queue string, index uint) []byte { + return []byte(strings.Join([]string{queue, strconv.FormatUint(uint64(index), 10)}, "_")) +}