160 lines
3.5 KiB
Go
160 lines
3.5 KiB
Go
package server
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"os"
|
|
"runtime"
|
|
"strings"
|
|
"text/template"
|
|
|
|
"github.com/GeertJohan/go.rice"
|
|
"github.com/KosyanMedia/burlesque/hub"
|
|
)
|
|
|
|
type (
|
|
Server struct {
|
|
port int
|
|
hub *hub.Hub
|
|
dashboardTmpl string
|
|
}
|
|
)
|
|
|
|
const (
|
|
Version = "1.1.0"
|
|
)
|
|
|
|
func New(port int, h *hub.Hub) *Server {
|
|
s := Server{
|
|
port: port,
|
|
hub: h,
|
|
}
|
|
|
|
box := rice.MustFindBox("static")
|
|
http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(box.HTTPBox())))
|
|
|
|
http.HandleFunc("/status", s.statusHandler)
|
|
http.HandleFunc("/debug", s.debugHandler)
|
|
http.HandleFunc("/publish", s.pubHandler)
|
|
http.HandleFunc("/subscribe", s.subHandler)
|
|
http.HandleFunc("/flush", s.flushHandler)
|
|
http.HandleFunc("/dashboard", s.dashboardHandler)
|
|
|
|
s.dashboardTmpl, _ = box.String("dashboard.tmpl")
|
|
|
|
return &s
|
|
}
|
|
|
|
func (s *Server) Start() {
|
|
port := fmt.Sprintf(":%d", s.port)
|
|
if err := http.ListenAndServe(port, nil); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func (s *Server) statusHandler(w http.ResponseWriter, r *http.Request) {
|
|
var (
|
|
res = map[string]map[string]interface{}{}
|
|
info = s.hub.Info()
|
|
withRates = (r.FormValue("rates") != "")
|
|
)
|
|
|
|
for queue, meta := range info {
|
|
res[queue] = map[string]interface{}{}
|
|
|
|
for key, val := range meta {
|
|
res[queue][key] = val
|
|
}
|
|
if withRates {
|
|
inRate, outRate := s.hub.Rates(queue)
|
|
inHist, outHist := s.hub.RateHistory(queue)
|
|
res[queue]["in_rate"] = inRate
|
|
res[queue]["out_rate"] = outRate
|
|
res[queue]["in_rate_history"] = inHist
|
|
res[queue]["out_rate_history"] = outHist
|
|
}
|
|
}
|
|
|
|
jsn, _ := json.Marshal(res)
|
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
|
w.Write(jsn)
|
|
}
|
|
|
|
func (s *Server) debugHandler(w http.ResponseWriter, r *http.Request) {
|
|
info := make(map[string]interface{})
|
|
info["version"] = Version
|
|
info["gomaxprocs"] = runtime.GOMAXPROCS(-1)
|
|
info["goroutines"] = runtime.NumGoroutine()
|
|
info["kyoto_cabinet"] = s.hub.StorageInfo()
|
|
jsn, _ := json.Marshal(info)
|
|
|
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
|
w.Write(jsn)
|
|
}
|
|
|
|
func (s *Server) pubHandler(w http.ResponseWriter, r *http.Request) {
|
|
msg, _ := ioutil.ReadAll(r.Body)
|
|
if len(msg) == 0 {
|
|
msg = []byte(r.FormValue("msg"))
|
|
}
|
|
queue := r.FormValue("queue")
|
|
|
|
if ok := s.hub.Pub(queue, msg); ok {
|
|
w.Write([]byte("OK"))
|
|
} else {
|
|
http.Error(w, "FAIL", 500)
|
|
}
|
|
}
|
|
|
|
func (s *Server) subHandler(w http.ResponseWriter, r *http.Request) {
|
|
queues := strings.Split(r.FormValue("queues"), ",")
|
|
sub := hub.NewSubscription(queues)
|
|
|
|
finished := make(chan struct{})
|
|
defer close(finished)
|
|
|
|
disconnected := w.(http.CloseNotifier).CloseNotify()
|
|
go func() {
|
|
select {
|
|
case <-disconnected:
|
|
case <-finished:
|
|
}
|
|
sub.Close()
|
|
}()
|
|
|
|
go s.hub.Sub(sub)
|
|
|
|
if res, ok := <-sub.Result(); ok {
|
|
w.Header().Set("Queue", res.Queue)
|
|
w.Write(res.Message)
|
|
}
|
|
}
|
|
|
|
func (s *Server) flushHandler(w http.ResponseWriter, r *http.Request) {
|
|
queues := strings.Split(r.FormValue("queues"), ",")
|
|
messages := s.hub.Flush(queues)
|
|
jsn, _ := json.Marshal(messages)
|
|
|
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
|
w.Write(jsn)
|
|
}
|
|
|
|
func (s *Server) dashboardHandler(w http.ResponseWriter, r *http.Request) {
|
|
tmpl := template.New("dashboard")
|
|
tmpl, _ = tmpl.Parse(s.dashboardTmpl)
|
|
|
|
w.Header().Set("Content-Type", "text/html; charset=utf8")
|
|
hostname, _ := os.Hostname()
|
|
if hostname == "" {
|
|
hostname = "Unknown Host"
|
|
}
|
|
|
|
tmpl.ExecuteTemplate(w, "dashboard", map[string]interface{}{
|
|
"version": Version,
|
|
"hostname": hostname,
|
|
"port": s.port,
|
|
})
|
|
}
|