diff --git a/config.go b/config.go new file mode 100644 index 0000000..6ce662b --- /dev/null +++ b/config.go @@ -0,0 +1,29 @@ +package main + +import ( + "flag" + "fmt" +) + +type ( + Config struct { + Storage string + Env string + Port int + } +) + +var ( + cfg = Config{} +) + +func SetupConfig() { + cfg.Storage = *flag.String("storage", "-", "Kyoto Cabinet storage path (e.g. storage.kch#zcomp=gz#capsiz=524288000)") + cfg.Env = *flag.String("environment", "development", "Process environment: development or production") + cfg.Port = *flag.Int("port", 4401, "HTTP port to listen") + flag.Parse() +} + +func (c Config) PortString() string { + return fmt.Sprintf(":%d", cfg.Port) +} diff --git a/counter.go b/counter.go new file mode 100644 index 0000000..d48fd93 --- /dev/null +++ b/counter.go @@ -0,0 +1,51 @@ +package main + +import () + +const ( + MaxIndex = ^uint(0) +) + +type ( + Counter struct { + Write uint + Read uint + stream chan uint + inLoop bool + } +) + +func NewCounter(wi, ri uint) *Counter { + c := &Counter{Write: wi, Read: ri} + c.stream = make(chan uint) + go c.Loop() + return c +} + +func (c *Counter) Incr() { + c.Write++ + if !c.inLoop { + c.inLoop = true + go c.Loop() + } +} + +func (c *Counter) Next() uint { + return <-c.stream +} + +func (c *Counter) Distance() uint { + d := c.Write - c.Read + if d < 0 { + d += MaxIndex + } + return d +} + +func (c *Counter) Loop() { + for c.Write > c.Read { + c.stream <- c.Read + 1 + c.Read++ + } + c.inLoop = false +} diff --git a/main.go b/main.go index 3d7e457..7d343cd 100644 --- a/main.go +++ b/main.go @@ -1,389 +1,64 @@ package main import ( - "encoding/json" - "flag" - "fmt" - "github.com/ezotrank/cabinetgo" "github.com/stvp/rollbar" - "io/ioutil" logpkg "log" "net/http" "os" "os/signal" - "reflect" "runtime" "strconv" "strings" "syscall" - "time" ) type ( Message []byte Key []byte - Counter struct { - Write uint - Read uint - stream chan uint - inLoop bool - } - Queue struct { - Name string - Counter *Counter - } - Request struct { - Queues []string - Callback func(*Response) - } - Response struct { - Queue string - Message Message - } - Payload struct { - Queue *Queue - Message Message - } - QueueState map[string]uint - ServerState map[string]QueueState ) -const ( - MaxIndex = ^uint(0) - StateMetaKey = "state" - StateSaveInterval = 10 -) - -// -// Key -// - func NewKey(queue string, index uint) Key { istr := strconv.FormatUint(uint64(index), 10) key := strings.Join([]string{queue, istr}, "_") return Key(key) } -// -// Counter -// - -func NewCounter(wi, ri uint) *Counter { - c := &Counter{Write: wi, Read: ri} - c.stream = make(chan uint) - go c.Loop() - return c -} - -func (c *Counter) Incr() { - c.Write++ - if !c.inLoop { - c.inLoop = true - go c.Loop() - } -} - -func (c *Counter) Next() uint { - return <-c.stream -} - -func (c *Counter) Distance() uint { - return c.Write - c.Read -} - -func (c *Counter) Loop() { - for c.Write > c.Read { - c.stream <- c.Read + 1 - c.Read++ - } - c.inLoop = false -} - -// -// Queue -// - -func PersistMessages() { - for { - select { - case payload := <-saver: - i := payload.Queue.Counter.Write + 1 - key := NewKey(payload.Queue.Name, i) - - if err := storage.Set(key, payload.Message); err != nil { - rollbar.Error("error", err) - } else { - payload.Queue.Counter.Incr() - } - } - } -} - -func (q *Queue) Push(msg Message) { - saver <- Payload{Queue: q, Message: msg} -} - -func (q *Queue) Fetch() (Message, error) { - i := q.Counter.Next() - key := NewKey(q.Name, i) - msg, err := storage.Get(key) - if err != nil { - rollbar.Error("error", err) - return msg, err - } - - defer func() { - if err := storage.Remove(key); err != nil { - rollbar.Error("error", err) - } - }() - - return msg, nil -} - -func (q *Queue) Size() uint { - size := q.Counter.Distance() - if size < 0 { - size += MaxIndex - } - return size -} - -func GetQueue(name string) *Queue { - if _, ok := queues[name]; !ok { - RegisterQueue(name, 0, 0) - } - return queues[name] -} - -func RegisterQueue(name string, wi, ri uint) { - queues[name] = &Queue{Name: name, Counter: NewCounter(wi, ri)} -} - -// -// Request -// - -func Register(q string, msg Message) { - for i, r := range pool { - for _, queueName := range r.Queues { - if queueName == q { - go r.Callback(&Response{Queue: queueName, Message: msg}) - pool = append(pool[:i], pool[i+1:]...) - return - } - } - } - GetQueue(q).Push(msg) -} - -func Process(r *Request) { - for _, queueName := range r.Queues { - q := GetQueue(queueName) - if q.Size() > 0 { - if msg, err := q.Fetch(); err != nil { - go r.Callback(nil) - } else { - go r.Callback(&Response{Queue: queueName, Message: msg}) - } - return - } - } - pool = append(pool, r) -} - -func Purge(r *Request) { - for i, req := range pool { - if reflect.ValueOf(r).Pointer() == reflect.ValueOf(req).Pointer() { - pool = append(pool[:i], pool[i+1:]...) - return - } - } -} - -// -// State -// - -func SaveState() { - state := make(ServerState) - for _, q := range queues { - state[q.Name] = QueueState{ - "wi": q.Counter.Write, - "ri": q.Counter.Read, - } - } - - stateJson, _ := json.Marshal(state) - key := Key(StateMetaKey) - if err := storage.Set(key, stateJson); err != nil { - rollbar.Error("error", err) - log.Printf("Failed to persist state") - return - } -} - -func LoadState() { - state := make(ServerState) - key := Key(StateMetaKey) - - stateJson, err := storage.Get(key) - if err != nil { - log.Printf("State not found") - return - } - - if err := json.Unmarshal(stateJson, &state); err != nil { - rollbar.Error("error", err) - log.Printf("Failed to load state") - return - } - - for queueName, meta := range state { - RegisterQueue(queueName, meta["wi"], meta["ri"]) - } - - log.Printf("State successfully loaded") -} - -func KeepStatePersisted() { - t := time.NewTicker(time.Second) - for { - <-t.C - SaveState() - } -} - -// -// HTTP handlers -// - -func StatusHandler(w http.ResponseWriter, r *http.Request) { - info := make(map[string]map[string]uint) - for _, q := range queues { - info[q.Name] = map[string]uint{ - "messages": q.Size(), - "subscriptions": 0, - } - } - for _, r := range pool { - for _, q := range r.Queues { - info[q]["subscriptions"]++ - } - } - infoJson, _ := json.Marshal(info) - w.Write(infoJson) -} - -func DebugHandler(w http.ResponseWriter, r *http.Request) { - info := make(map[string]int) - info["goroutines"] = runtime.NumGoroutine() - infoJson, _ := json.Marshal(info) - w.Write(infoJson) -} - -func PublishHandler(w http.ResponseWriter, r *http.Request) { - defer r.Body.Close() - msg, _ := ioutil.ReadAll(r.Body) - if len(msg) == 0 { - msg = Message(r.FormValue("msg")) - } - queueName := r.FormValue("queue") - - go Register(queueName, msg) - log.Println("Published message of", len(msg), "bytes to queue", queueName) - w.Write([]byte("OK")) -} - -func SubscriptionHandler(w http.ResponseWriter, r *http.Request) { - rch := make(chan *Response) - req := &Request{ - Queues: strings.Split(r.FormValue("queues"), ","), - Callback: func(r *Response) { - rch <- r - }, - } - - go Process(req) - - disconnected := w.(http.CloseNotifier).CloseNotify() - finished := make(chan bool) - go func() { - select { - case <-disconnected: - rch <- nil - case <-finished: - break - } - Purge(req) - }() - - res := <-rch - if res == nil { - return - } - - w.Header().Set("Queue", res.Queue) - w.Write(res.Message) - - log.Println("Recieved message of", len(res.Message), "bytes from queue", res.Queue) - finished <- true -} - -// -// main -// - var ( - log *logpkg.Logger - storage = cabinet.New() - queues = make(map[string]*Queue) - pool = []*Request{} - saver = make(chan Payload, 1000) + log *logpkg.Logger ) -func main() { - log = logpkg.New(os.Stdout, "", logpkg.Ldate|logpkg.Lmicroseconds) +func HandleShutdown() { + ch := make(chan os.Signal) + signal.Notify(ch, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT) - storagep := flag.String("storage", "-", "Kyoto Cabinet storage path (e.g. storage.kch#zcomp=gz#capsiz=524288000)") - env := flag.String("environment", "development", "Process environment: development or production") - port := flag.Int("port", 4401, "HTTP port to listen") - flag.Parse() - - rollbar.Token = "***REMOVED***" // klit access token - rollbar.Environment = *env - - // Init storage - err := storage.Open(*storagep, cabinet.KCOWRITER|cabinet.KCOCREATE) - if err != nil { - panic(err) - } - - // Handle SIGTERM - shutdown := make(chan os.Signal) - signal.Notify(shutdown, os.Interrupt, os.Kill, syscall.SIGTERM) go func() { - <-shutdown + <-ch SaveState() log.Printf("State successfully persisted") storage.Close() rollbar.Wait() log.Println("Storage closed") log.Printf("Server stopped") - os.Exit(0) + os.Exit(1) }() +} +func main() { + log = logpkg.New(os.Stdout, "", logpkg.Ldate|logpkg.Lmicroseconds) + + rollbar.Token = "***REMOVED***" // klit access token + rollbar.Environment = cfg.Env + + SetupConfig() + SetupStorage() + SetupServer() + HandleShutdown() LoadState() - go KeepStatePersisted() go PersistMessages() log.Printf("GOMAXPROCS = %d", runtime.GOMAXPROCS(-1)) - log.Printf("Starting HTTP server on port %d", *port) + log.Printf("Starting HTTP server on port %d", cfg.Port) - http.HandleFunc("/status", StatusHandler) - http.HandleFunc("/debug", DebugHandler) - http.HandleFunc("/publish", PublishHandler) - http.HandleFunc("/subscribe", SubscriptionHandler) - http.ListenAndServe(fmt.Sprintf(":%d", *port), nil) + http.ListenAndServe(cfg.PortString(), nil) } diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..b086f78 --- /dev/null +++ b/queue.go @@ -0,0 +1,50 @@ +package main + +import ( + "github.com/stvp/rollbar" +) + +type ( + Queue struct { + Name string + Counter *Counter + } +) + +var ( + queues = make(map[string]*Queue) +) + +func (q *Queue) Push(msg Message) { + saver <- Payload{Queue: q, Message: msg} +} + +func (q *Queue) Fetch() (Message, error) { + i := q.Counter.Next() + key := NewKey(q.Name, i) + + msg, err := storage.Get(key) + if err != nil { + rollbar.Error("error", err) + return msg, err + } + + defer func() { + if err := storage.Remove(key); err != nil { + rollbar.Error("error", err) + } + }() + + return msg, nil +} + +func GetQueue(name string) *Queue { + if _, ok := queues[name]; !ok { + RegisterQueue(name, 0, 0) + } + return queues[name] +} + +func RegisterQueue(name string, wi, ri uint) { + queues[name] = &Queue{Name: name, Counter: NewCounter(wi, ri)} +} diff --git a/request.go b/request.go new file mode 100644 index 0000000..ce4cbe2 --- /dev/null +++ b/request.go @@ -0,0 +1,57 @@ +package main + +import ( + "reflect" +) + +type ( + Request struct { + Queues []string + Callback func(*Response) + } + Response struct { + Queue string + Message Message + } +) + +var ( + pool = []*Request{} +) + +func Register(q string, msg Message) { + for i, r := range pool { + for _, queueName := range r.Queues { + if queueName == q { + go r.Callback(&Response{Queue: queueName, Message: msg}) + pool = append(pool[:i], pool[i+1:]...) + return + } + } + } + GetQueue(q).Push(msg) +} + +func Process(r *Request) { + for _, queueName := range r.Queues { + q := GetQueue(queueName) + if q.Counter.Distance() > 0 { + if msg, err := q.Fetch(); err != nil { + go r.Callback(nil) + } else { + go r.Callback(&Response{Queue: queueName, Message: msg}) + } + return + } + } + pool = append(pool, r) +} + +func Purge(r *Request) { + for i, req := range pool { + if reflect.ValueOf(r).Pointer() == reflect.ValueOf(req).Pointer() { + pool = append(pool[:i], pool[i+1:]...) + return + } + } +} diff --git a/server.go b/server.go new file mode 100644 index 0000000..ada5cc3 --- /dev/null +++ b/server.go @@ -0,0 +1,93 @@ +package main + +import ( + "encoding/json" + "io/ioutil" + "net/http" + "runtime" + "strings" +) + +func StatusHandler(w http.ResponseWriter, r *http.Request) { + info := make(map[string]map[string]uint) + + for _, q := range queues { + info[q.Name] = map[string]uint{ + "messages": q.Counter.Distance(), + "subscriptions": 0, + } + } + + for _, r := range pool { + for _, q := range r.Queues { + info[q]["subscriptions"]++ + } + } + + infoJson, _ := json.Marshal(info) + w.Write(infoJson) +} + +func DebugHandler(w http.ResponseWriter, r *http.Request) { + info := make(map[string]int) + info["goroutines"] = runtime.NumGoroutine() + + infoJson, _ := json.Marshal(info) + w.Write(infoJson) +} + +func PublishHandler(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + msg, _ := ioutil.ReadAll(r.Body) + if len(msg) == 0 { + msg = Message(r.FormValue("msg")) + } + + queueName := r.FormValue("queue") + go Register(queueName, msg) + + log.Println("Published message of", len(msg), "bytes to queue", queueName) + w.Write([]byte("OK")) +} + +func SubscriptionHandler(w http.ResponseWriter, r *http.Request) { + rch := make(chan *Response) + req := &Request{ + Queues: strings.Split(r.FormValue("queues"), ","), + Callback: func(r *Response) { + rch <- r + }, + } + + go Process(req) + + disconnected := w.(http.CloseNotifier).CloseNotify() + finished := make(chan bool) + go func() { + select { + case <-disconnected: + rch <- nil + case <-finished: + break + } + Purge(req) + }() + + res := <-rch + if res == nil { + return + } + + w.Header().Set("Queue", res.Queue) + w.Write(res.Message) + + log.Println("Recieved message of", len(res.Message), "bytes from queue", res.Queue) + finished <- true +} + +func SetupServer() { + http.HandleFunc("/status", StatusHandler) + http.HandleFunc("/debug", DebugHandler) + http.HandleFunc("/publish", PublishHandler) + http.HandleFunc("/subscribe", SubscriptionHandler) +} diff --git a/state.go b/state.go new file mode 100644 index 0000000..1b4b670 --- /dev/null +++ b/state.go @@ -0,0 +1,66 @@ +package main + +import ( + "encoding/json" + "github.com/stvp/rollbar" + "time" +) + +type ( + QueueState map[string]uint + ServerState map[string]QueueState +) + +const ( + StateMetaKey = "state" + StateSaveInterval = 10 +) + +func SaveState() { + state := make(ServerState) + for _, q := range queues { + state[q.Name] = QueueState{ + "wi": q.Counter.Write, + "ri": q.Counter.Read, + } + } + + stateJson, _ := json.Marshal(state) + key := Key(StateMetaKey) + if err := storage.Set(key, stateJson); err != nil { + rollbar.Error("error", err) + log.Printf("Failed to persist state") + return + } +} + +func LoadState() { + state := make(ServerState) + key := Key(StateMetaKey) + + stateJson, err := storage.Get(key) + if err != nil { + log.Printf("State not found") + return + } + + if err := json.Unmarshal(stateJson, &state); err != nil { + rollbar.Error("error", err) + log.Printf("Failed to load state") + return + } + + for queueName, meta := range state { + RegisterQueue(queueName, meta["wi"], meta["ri"]) + } + + log.Printf("State successfully loaded") +} + +func KeepStatePersisted() { + t := time.NewTicker(time.Second) + for { + <-t.C + SaveState() + } +} diff --git a/storage.go b/storage.go new file mode 100644 index 0000000..43858b0 --- /dev/null +++ b/storage.go @@ -0,0 +1,39 @@ +package main + +import ( + "github.com/ezotrank/cabinetgo" + "github.com/stvp/rollbar" +) + +type ( + Payload struct { + Queue *Queue + Message Message + } +) + +var ( + storage = cabinet.New() + saver = make(chan Payload, 1000) +) + +func SetupStorage() { + err := storage.Open(cfg.Storage, cabinet.KCOWRITER|cabinet.KCOCREATE) + if err != nil { + panic(err) + } +} + +func PersistMessages() { + for { + payload := <-saver + i := payload.Queue.Counter.Write + 1 + key := NewKey(payload.Queue.Name, i) + + if err := storage.Set(key, payload.Message); err != nil { + rollbar.Error("error", err) + } else { + payload.Queue.Counter.Incr() + } + } +}