Queue flushing

This commit is contained in:
2014-09-24 19:37:33 +04:00
parent 9d6d240bbc
commit 12b3a12c51
3 changed files with 41 additions and 0 deletions
+14
View File
@@ -78,6 +78,20 @@ func (s *Storage) Put(queue string, message []byte) (err error) {
return
}
func (s *Storage) Flush(queue string) (messages [][]byte) {
done := make(chan struct{})
for {
if msg, ok := s.Get(queue, done); ok {
messages = append(messages, msg)
} else {
return
}
}
return
}
func (s *Storage) QueueSizes() map[string]uint {
info := make(map[string]uint)