diff --git a/client/client.go b/client/client.go index f5060c5..54796fa 100644 --- a/client/client.go +++ b/client/client.go @@ -14,6 +14,7 @@ import ( const ( publishEndpoint = "/publish" subscribeEndpoint = "/subscribe" + flushEndpoint = "/flush" statusEndpoit = "/status" debugEndpoint = "/debug" ) @@ -94,37 +95,35 @@ func (c *Client) Publish(m *Message) bool { func (c *Client) Subscribe(queues ...string) *Message { url := c.url(subscribeEndpoint, "?queues=", strings.Join(queues, ",")) - - res, err := c.httpClient.Get(url) - if err != nil { - return nil - } - - body, err := ioutil.ReadAll(res.Body) - if err != nil { - return nil - } - res.Body.Close() + head, body := c.get(url) return &Message{ - Queue: res.Header.Get("Queue"), + Queue: head.Get("Queue"), Body: body, } } -func (c *Client) Status() (stat []*QueueInfo) { - url := c.url(statusEndpoit) - res, err := http.Get(url) - if err != nil { +func (c *Client) Flush(queues ...string) (messages []*Message) { + url := c.url(flushEndpoint, "?queues=", strings.Join(queues, ",")) + _, body := c.get(url) + + var tmp []map[string]string + if err := json.Unmarshal(body, &tmp); err != nil { return } - defer res.Body.Close() - body, err := ioutil.ReadAll(res.Body) - if err != nil { - return + messages = []*Message{} + for _, msg := range tmp { + messages = append(messages, &Message{msg["queue"], []byte(msg["message"])}) } + return +} + +func (c *Client) Status() (stat []*QueueInfo) { + url := c.url(statusEndpoit) + _, body := c.get(url) + tmp := make(map[string]map[string]int) if err := json.Unmarshal(body, &tmp); err != nil { return @@ -144,16 +143,7 @@ func (c *Client) Status() (stat []*QueueInfo) { func (c *Client) Debug() *DebugInfo { url := c.url(debugEndpoint) - res, err := http.Get(url) - if err != nil { - return nil - } - - body, err := ioutil.ReadAll(res.Body) - if err != nil { - return nil - } - res.Body.Close() + _, body := c.get(url) var dbg DebugInfo if err := json.Unmarshal(body, &dbg); err != nil { @@ -163,6 +153,21 @@ func (c *Client) Debug() *DebugInfo { return &dbg } +func (c *Client) get(url string) (http.Header, []byte) { + res, err := http.Get(url) + if err != nil { + return nil, nil + } + + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, nil + } + res.Body.Close() + + return res.Header, body +} + func (c *Client) url(path ...string) string { parts := []string{"http://", c.Config.Host, ":", strconv.Itoa(c.Config.Port)} parts = append(parts, path...)