1
0
Fork 0

Move client and CLI to the main repo

This commit is contained in:
Gregory Eremin 2014-09-16 15:41:48 +04:00
parent a06b9f36bf
commit a80bb986f2
3 changed files with 283 additions and 0 deletions

101
cli/main.go Normal file
View File

@ -0,0 +1,101 @@
package main
import (
"fmt"
"io/ioutil"
"os"
"strings"
"time"
"github.com/codegangsta/cli"
"github.com/localhots/burlesque-golang/client"
)
func main() {
var bsq *client.Client
app := cli.NewApp()
app.Name = "Burlesque API client"
app.Usage = "Usage details here"
app.Version = "1.0.0"
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "host",
Value: "127.0.0.1",
Usage: "Burlesque server host",
},
cli.IntFlag{
Name: "port",
Value: 4401,
Usage: "Burlesque server port",
},
cli.IntFlag{
Name: "timeout",
Value: 60,
Usage: "Subscription timeout in seconds",
},
}
app.Before = func(c *cli.Context) (err error) {
cfg := &client.Config{
Host: c.String("host"),
Port: c.Int("port"),
Timeout: time.Duration(c.Int("timeout")) * time.Second,
}
bsq = client.NewClient(cfg)
return
}
app.Commands = []cli.Command{
{
Name: "pub",
Usage: "Publish a message to a queue",
Action: func(c *cli.Context) {
var msg *client.Message
switch len(c.Args()) {
case 2:
msg = &client.Message{
Queue: c.Args()[0],
Body: []byte(c.Args()[1]),
}
case 1:
msg = &client.Message{
Queue: c.Args()[0],
}
var err error
msg.Body, err = ioutil.ReadAll(os.Stdin)
if err != nil {
panic(err)
}
case 0:
fmt.Println(app.Usage)
return
}
ok := bsq.Publish(msg)
if ok {
fmt.Printf("Message successfully published to queue %q\n", msg.Queue)
} else {
fmt.Printf("Failed to publish message to queue %q\n", msg.Queue)
}
},
},
{
Name: "sub",
Usage: "Subscribe for message from queue",
Action: func(c *cli.Context) {
msg := bsq.Subscribe(c.Args()...)
if msg != nil {
fmt.Println(string(msg.Body))
} else {
fmt.Printf("Failed to recieve message from queues %s\n", strings.Join(c.Args(), ", "))
}
},
},
}
app.CommandNotFound = func(c *cli.Context, cmd string) {
}
app.Run(os.Args)
}

163
client/client.go Normal file
View File

@ -0,0 +1,163 @@
package client
import (
"net"
"bytes"
"encoding/json"
"io/ioutil"
"net/http"
"strconv"
"strings"
)
const (
publishEndpoint = "/publish"
subscribeEndpoint = "/subscribe"
statusEndpoit = "/status"
debugEndpoint = "/debug"
)
type (
Client struct {
Config *Config
httpClient *http.Client
}
Message struct {
Queue string
Body []byte
}
QueueInfo struct {
Name string
Messages int
Subscribers int
}
DebugInfo struct {
Version string `json:"version"`
Goroutines int `json:"goroutines"`
KyotoCabinet map[string]interface{} `json:"kyoto_cabinet"`
}
)
func NewClient(c *Config) *Client {
return &Client{
Config: c,
httpClient: &http.Client{},
}
}
func (c *Client) Publish(m *Message) (ok bool) {
rdr := bytes.NewReader(m.Body)
url := c.url(publishEndpoint, "?queue=", m.Queue)
res, err := http.Post(url, "text/plain", rdr)
if err != nil {
return
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return
}
ok = (string(body) == "OK")
return
}
func (c *Client) Subscribe(queues ...string) (m *Message) {
url := c.url(subscribeEndpoint, "?queues=", strings.Join(queues, ","))
transport := http.Transport{
Dial: net.DialTimeout(network, addr, timeout),
}
rdr := bytes.NewReader([]byte)
req := http.NewRequest("GET", url, rdr)
req.
res, err := http.Get(url)
if err != nil {
return
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return
}
m = &Message{
Queue: res.Header.Get("Queue"),
Body: body,
}
return
}
func (c *Client) Status() (stat []*QueueInfo) {
url := c.url(statusEndpoit)
res, err := http.Get(url)
if err != nil {
return
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return
}
tmp := make(map[string]map[string]int)
if err := json.Unmarshal(body, &tmp); err != nil {
return
}
for queue, info := range tmp {
qi := &QueueInfo{
Name: queue,
Messages: info["messages"],
Subscribers: info["subscribers"],
}
stat = append(stat, qi)
}
return
}
func (c *Client) Debug() *DebugInfo {
url := c.url(debugEndpoint)
res, err := http.Get(url)
if err != nil {
return nil
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil
}
dbg := DebugInfo{}
if err := json.Unmarshal(body, &dbg); err != nil {
return nil
}
return &dbg
}
func (c *Client) url(path ...string) string {
parts := []string{"http://", c.Config.Host, ":", strconv.Itoa(c.Config.Port)}
parts = append(parts, path...)
return strings.Join(parts, "")
}
func TimeoutDialer(cTimeout time.Duration, rwTimeout time.Duration) func(net, addr string) (c net.Conn, err error) {
return func(netw, addr string) (net.Conn, error) {
conn, err := net.DialTimeout(netw, addr, cTimeout)
if err != nil {
return nil, err
}
conn.SetDeadline(time.Now().Add(rwTimeout))
return conn, nil
}
}

19
client/config.go Normal file
View File

@ -0,0 +1,19 @@
package client
import (
"time"
)
type (
Config struct {
Host string
Port int
Timeout time.Duration
}
)
func (c *Config) UseDefaults() {
c.Host = "127.0.0.1"
c.Port = 4401
c.Timeout = 60 * time.Second
}