1
0
Fork 0

Client flush func

This commit is contained in:
Gregory Eremin 2014-09-24 20:13:29 +04:00
parent 185ebcd7a0
commit 1a4f3aaa4c
1 changed files with 35 additions and 30 deletions

View File

@ -14,6 +14,7 @@ import (
const ( const (
publishEndpoint = "/publish" publishEndpoint = "/publish"
subscribeEndpoint = "/subscribe" subscribeEndpoint = "/subscribe"
flushEndpoint = "/flush"
statusEndpoit = "/status" statusEndpoit = "/status"
debugEndpoint = "/debug" debugEndpoint = "/debug"
) )
@ -94,37 +95,35 @@ func (c *Client) Publish(m *Message) bool {
func (c *Client) Subscribe(queues ...string) *Message { func (c *Client) Subscribe(queues ...string) *Message {
url := c.url(subscribeEndpoint, "?queues=", strings.Join(queues, ",")) url := c.url(subscribeEndpoint, "?queues=", strings.Join(queues, ","))
head, body := c.get(url)
res, err := c.httpClient.Get(url)
if err != nil {
return nil
}
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil
}
res.Body.Close()
return &Message{ return &Message{
Queue: res.Header.Get("Queue"), Queue: head.Get("Queue"),
Body: body, Body: body,
} }
} }
func (c *Client) Status() (stat []*QueueInfo) { func (c *Client) Flush(queues ...string) (messages []*Message) {
url := c.url(statusEndpoit) url := c.url(flushEndpoint, "?queues=", strings.Join(queues, ","))
res, err := http.Get(url) _, body := c.get(url)
if err != nil {
var tmp []map[string]string
if err := json.Unmarshal(body, &tmp); err != nil {
return return
} }
defer res.Body.Close() messages = []*Message{}
body, err := ioutil.ReadAll(res.Body) for _, msg := range tmp {
if err != nil { messages = append(messages, &Message{msg["queue"], []byte(msg["message"])})
return
} }
return
}
func (c *Client) Status() (stat []*QueueInfo) {
url := c.url(statusEndpoit)
_, body := c.get(url)
tmp := make(map[string]map[string]int) tmp := make(map[string]map[string]int)
if err := json.Unmarshal(body, &tmp); err != nil { if err := json.Unmarshal(body, &tmp); err != nil {
return return
@ -144,16 +143,7 @@ func (c *Client) Status() (stat []*QueueInfo) {
func (c *Client) Debug() *DebugInfo { func (c *Client) Debug() *DebugInfo {
url := c.url(debugEndpoint) url := c.url(debugEndpoint)
res, err := http.Get(url) _, body := c.get(url)
if err != nil {
return nil
}
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil
}
res.Body.Close()
var dbg DebugInfo var dbg DebugInfo
if err := json.Unmarshal(body, &dbg); err != nil { if err := json.Unmarshal(body, &dbg); err != nil {
@ -163,6 +153,21 @@ func (c *Client) Debug() *DebugInfo {
return &dbg return &dbg
} }
func (c *Client) get(url string) (http.Header, []byte) {
res, err := http.Get(url)
if err != nil {
return nil, nil
}
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, nil
}
res.Body.Close()
return res.Header, body
}
func (c *Client) url(path ...string) string { func (c *Client) url(path ...string) string {
parts := []string{"http://", c.Config.Host, ":", strconv.Itoa(c.Config.Port)} parts := []string{"http://", c.Config.Host, ":", strconv.Itoa(c.Config.Port)}
parts = append(parts, path...) parts = append(parts, path...)