diff --git a/hub/hub.go b/hub/hub.go index a02300f..9388f6d 100644 --- a/hub/hub.go +++ b/hub/hub.go @@ -65,18 +65,16 @@ func (h *Hub) Sub(s *Subscription) { h.subscribers = append(h.subscribers, s) } -func (h *Hub) Flush(queues []string) (messages []MessageDump) { +func (h *Hub) Flush(queues []string) []MessageDump { + messages := []MessageDump{} + for _, queue := range queues { for _, msg := range h.storage.Flush(queue) { messages = append(messages, MessageDump{queue, string(msg)}) } } - if messages == nil { - messages = []MessageDump{} - } - - return + return messages } func (h *Hub) Info() map[string]map[string]uint { @@ -96,7 +94,7 @@ func (h *Hub) Info() map[string]map[string]uint { if _, ok := info[queue]["subscriptions"]; !ok { info[queue]["subscriptions"] = 0 } - info[queue]["subscriptions"] += 1 + info[queue]["subscriptions"]++ } } diff --git a/server/server.go b/server/server.go index 530739e..f951b7f 100644 --- a/server/server.go +++ b/server/server.go @@ -47,6 +47,8 @@ func (s *Server) Start() { func (s *Server) statusHandler(w http.ResponseWriter, r *http.Request) { info := s.hub.Info() jsn, _ := json.Marshal(info) + + w.Header().Set("Content-Type", "application/json") w.Write(jsn) } @@ -56,8 +58,9 @@ func (s *Server) debugHandler(w http.ResponseWriter, r *http.Request) { info["gomaxprocs"] = runtime.GOMAXPROCS(-1) info["goroutines"] = runtime.NumGoroutine() info["kyoto_cabinet"] = s.hub.StorageInfo() - jsn, _ := json.Marshal(info) + + w.Header().Set("Content-Type", "application/json") w.Write(jsn) } @@ -102,7 +105,8 @@ func (s *Server) subHandler(w http.ResponseWriter, r *http.Request) { func (s *Server) flushHandler(w http.ResponseWriter, r *http.Request) { queues := strings.Split(r.FormValue("queues"), ",") messages := s.hub.Flush(queues) - jsn, _ := json.Marshal(messages) + + w.Header().Set("Content-Type", "application/json") w.Write(jsn) } diff --git a/storage/counter.go b/storage/counter.go index 2aab039..5a83bf1 100644 --- a/storage/counter.go +++ b/storage/counter.go @@ -22,11 +22,11 @@ type ( } ) -func newCounter(wi, ri uint) (c *counter) { +func newCounter(wi, ri uint) *counter { m := &sync.Mutex{} m.Lock() - c = &counter{ + c := &counter{ write: wi, read: ri, stream: make(chan uint), @@ -35,7 +35,7 @@ func newCounter(wi, ri uint) (c *counter) { go c.increment() - return + return c } func (c *counter) tryWrite(fn func(i uint) bool) {