This commit is contained in:
Gregory Eremin 2014-09-11 14:27:55 +04:00
parent 3adaa805eb
commit d417fa653f
3 changed files with 87 additions and 110 deletions

View File

@ -1,18 +0,0 @@
package main
import (
"flag"
)
var (
config struct {
storage string
port int
}
)
func setupConfig() {
flag.StringVar(&config.storage, "storage", "-", "Kyoto Cabinet storage path (e.g. burlesque.kch#dfunit=8#msiz=512M)")
flag.IntVar(&config.port, "port", 4401, "Server HTTP port")
flag.Parse()
}

43
main.go
View File

@ -1,39 +1,48 @@
package main package main
import ( import (
"flag"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"github.com/KosyanMedia/burlesque/hub"
"github.com/KosyanMedia/burlesque/storage"
) )
const ( const (
version = "0.1.3" version = "0.2.0"
) )
func handleShutdown() { var (
theStorage *storage.Storage
theHub *hub.Hub
config struct {
storage string
port int
}
)
func main() {
flag.StringVar(&config.storage, "storage", "-", "Kyoto Cabinet storage path (e.g. burlesque.kch#dfunit=8#msiz=512M)")
flag.IntVar(&config.port, "port", 4401, "Server HTTP port")
flag.Parse()
theStorage, err := storage.New(config.storage)
if err != nil {
panic(err)
}
theHub = hub.New(theStorage)
ch := make(chan os.Signal) ch := make(chan os.Signal)
signal.Notify(ch, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT) signal.Notify(ch, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT)
go func() { go func() {
<-ch <-ch
theStorage.Close()
saveState()
log("State successfully persisted")
closeStorage()
log("Stopped")
os.Exit(0) os.Exit(0)
}() }()
}
func main() {
setupConfig()
setupLogging() setupLogging()
setupStorage()
setupServer()
handleShutdown()
loadState()
go keepStatePersisted()
startServer() startServer()
} }

136
server.go
View File

@ -1,124 +1,110 @@
package main package main
import ( import (
"encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"runtime"
"strconv"
"strings" "strings"
"github.com/KosyanMedia/burlesque/hub"
) )
func startServer() { func startServer() {
port := fmt.Sprintf(":%d", config.port) http.HandleFunc("/status", statusHandler)
err := http.ListenAndServe(port, nil) http.HandleFunc("/debug", debugHandler)
if err != nil { http.HandleFunc("/publish", pubHandler)
http.HandleFunc("/subscribe", subHandler)
if err := http.ListenAndServe(fmt.Sprintf(":%d", config.port), nil); err != nil {
alert(err, "Error starting server on port %d", config.port) alert(err, "Error starting server on port %d", config.port)
} }
} }
func statusHandler(w http.ResponseWriter, r *http.Request) { func statusHandler(w http.ResponseWriter, r *http.Request) {
info := make(map[string]map[string]uint) // info := make(map[string]map[string]uint)
for _, q := range queues { // for _, q := range queues {
info[q.name] = map[string]uint{ // info[q.name] = map[string]uint{
"messages": q.counter.distance(), // // "messages": q.counter.distance(),
"subscriptions": 0, // "subscriptions": 0,
} // }
} // }
for _, r := range pool.requests { // for _, r := range pool.requests {
for _, q := range r.queues { // for _, q := range r.queues {
info[q]["subscriptions"]++ // info[q]["subscriptions"]++
} // }
} // }
jsn, _ := json.Marshal(info) // jsn, _ := json.Marshal(info)
w.Write(jsn) // w.Write(jsn)
} }
func debugHandler(w http.ResponseWriter, r *http.Request) { func debugHandler(w http.ResponseWriter, r *http.Request) {
info := make(map[string]interface{}) // info := make(map[string]interface{})
info["version"] = version // info["version"] = version
info["goroutines"] = runtime.NumGoroutine() // info["goroutines"] = runtime.NumGoroutine()
s, err := storage.Status() // s, err := storage.Status()
if err != nil { // if err != nil {
alert(err, "Failed to get Kyoto Cabinet status") // alert(err, "Failed to get Kyoto Cabinet status")
} // }
s = s[:len(s)-1] // Removing trailing new line // s = s[:len(s)-1] // Removing trailing new line
ks := make(map[string]interface{}) // ks := make(map[string]interface{})
tokens := strings.Split(s, "\n") // tokens := strings.Split(s, "\n")
for _, t := range tokens { // for _, t := range tokens {
tt := strings.Split(t, "\t") // tt := strings.Split(t, "\t")
num, err := strconv.Atoi(tt[1]) // num, err := strconv.Atoi(tt[1])
if err != nil { // if err != nil {
ks[tt[0]] = tt[1] // ks[tt[0]] = tt[1]
} else { // } else {
ks[tt[0]] = num // ks[tt[0]] = num
} // }
} // }
info["kyoto_cabinet"] = ks // info["kyoto_cabinet"] = ks
jsn, _ := json.Marshal(info) // jsn, _ := json.Marshal(info)
w.Write(jsn) // w.Write(jsn)
} }
func publishHandler(w http.ResponseWriter, r *http.Request) { func pubHandler(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close() defer r.Body.Close()
msg, _ := ioutil.ReadAll(r.Body) msg, _ := ioutil.ReadAll(r.Body)
if len(msg) == 0 { if len(msg) == 0 {
msg = message(r.FormValue("msg")) msg = []byte(r.FormValue("msg"))
} }
queue := r.FormValue("queue")
qname := r.FormValue("queue") if ok := theHub.Pub(queue, msg); ok {
ok := registerPublication(qname, msg)
if ok {
w.Write([]byte("OK")) w.Write([]byte("OK"))
} else { } else {
http.Error(w, "FAIL", 500) http.Error(w, "FAIL", 500)
} }
} }
func subscriptionHandler(w http.ResponseWriter, r *http.Request) { func subHandler(w http.ResponseWriter, r *http.Request) {
rch := make(chan response) result := make(chan hub.Result)
abort := make(chan bool, 1) queues := strings.Split(r.FormValue("queues"), ",")
req := &request{ sub := hub.NewSubscription(queues, result)
queues: strings.Split(r.FormValue("queues"), ","),
responseCh: rch,
abort: abort,
}
go registerSubscription(req)
disconnected := w.(http.CloseNotifier).CloseNotify() disconnected := w.(http.CloseNotifier).CloseNotify()
finished := make(chan bool) finished := make(chan struct{})
go func() { go func() {
select { select {
case <-disconnected: case <-disconnected:
close(rch) sub.Close()
abort <- true close(finished)
case <-finished: case <-finished:
} }
req.purge()
}() }()
defer sub.Close()
res, ok := <-rch theHub.Sub(sub)
if !ok { res := <-result
return
}
w.Header().Set("Queue", res.queue) w.Header().Set("Queue", res.Queue)
w.Write(res.message) w.Write(res.Message)
finished <- true finished <- struct{}{}
}
func setupServer() {
http.HandleFunc("/status", statusHandler)
http.HandleFunc("/debug", debugHandler)
http.HandleFunc("/publish", publishHandler)
http.HandleFunc("/subscribe", subscriptionHandler)
} }