diff --git a/storage.go b/storage.go index bf556f9..c57d645 100644 --- a/storage.go +++ b/storage.go @@ -9,15 +9,10 @@ import ( type ( Message []byte Key []byte - Payload struct { - Queue *Queue - Message Message - } ) var ( - storage = cabinet.New() - payloads = make(chan Payload, 1000) + storage = cabinet.New() ) func NewKey(queue string, index uint) Key { @@ -33,22 +28,20 @@ func SetupStorage() { } } -func Persist(p Payload) { - payloads <- p -} +func CloseStorage() { + var err error -func PersistMessages() { - for { - p := <-payloads + err = storage.Sync(true) + if err != nil { + Error(err, "Failed to sync storage") + } else { + Log("Storage synchronized") + } - p.Queue.Counter.Write(func(i uint) bool { - key := NewKey(p.Queue.Name, i) - err := storage.Set(key, p.Message) - if err != nil { - Error(err, "Failed to write %d bytes to record '%s'", len(p.Message), key) - } - - return (err == nil) - }) + err = storage.Close() + if err != nil { + Error(err, "Failed to close storage") + } else { + Log("Storage closed") } }