From 12b3a12c51e4bbea83c251414b4515028de0ba09 Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Wed, 24 Sep 2014 19:37:33 +0400 Subject: [PATCH] Queue flushing --- hub/hub.go | 18 ++++++++++++++++++ server/server.go | 9 +++++++++ storage/storage.go | 14 ++++++++++++++ 3 files changed, 41 insertions(+) diff --git a/hub/hub.go b/hub/hub.go index a1072d2..a02300f 100644 --- a/hub/hub.go +++ b/hub/hub.go @@ -17,6 +17,10 @@ type ( Queue string Message []byte } + MessageDump struct { + Queue string `json:"queue"` + Message string `json:"message"` + } ) func New(st *storage.Storage) *Hub { @@ -61,6 +65,20 @@ func (h *Hub) Sub(s *Subscription) { h.subscribers = append(h.subscribers, s) } +func (h *Hub) Flush(queues []string) (messages []MessageDump) { + for _, queue := range queues { + for _, msg := range h.storage.Flush(queue) { + messages = append(messages, MessageDump{queue, string(msg)}) + } + } + + if messages == nil { + messages = []MessageDump{} + } + + return +} + func (h *Hub) Info() map[string]map[string]uint { info := make(map[string]map[string]uint) diff --git a/server/server.go b/server/server.go index b561572..530739e 100644 --- a/server/server.go +++ b/server/server.go @@ -32,6 +32,7 @@ func New(port int, h *hub.Hub) *Server { http.HandleFunc("/debug", s.debugHandler) http.HandleFunc("/publish", s.pubHandler) http.HandleFunc("/subscribe", s.subHandler) + http.HandleFunc("/flush", s.flushHandler) return &s } @@ -97,3 +98,11 @@ func (s *Server) subHandler(w http.ResponseWriter, r *http.Request) { w.Write(res.Message) } } + +func (s *Server) flushHandler(w http.ResponseWriter, r *http.Request) { + queues := strings.Split(r.FormValue("queues"), ",") + messages := s.hub.Flush(queues) + + jsn, _ := json.Marshal(messages) + w.Write(jsn) +} diff --git a/storage/storage.go b/storage/storage.go index 1fea3b3..64af24f 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -78,6 +78,20 @@ func (s *Storage) Put(queue string, message []byte) (err error) { return } +func (s *Storage) Flush(queue string) (messages [][]byte) { + done := make(chan struct{}) + + for { + if msg, ok := s.Get(queue, done); ok { + messages = append(messages, msg) + } else { + return + } + } + + return +} + func (s *Storage) QueueSizes() map[string]uint { info := make(map[string]uint)