diff --git a/config.go b/config.go index e25df8e..2aecd03 100644 --- a/config.go +++ b/config.go @@ -5,14 +5,14 @@ import ( ) var ( - Config struct { - Storage string - Port int + config struct { + storage string + port int } ) -func SetupConfig() { - flag.StringVar(&Config.Storage, "storage", "-", "Kyoto Cabinet storage path (e.g. burlesque.kch#dfunit=8#msiz=512M)") - flag.IntVar(&Config.Port, "port", 4401, "Server HTTP port") +func setupConfig() { + flag.StringVar(&config.storage, "storage", "-", "Kyoto Cabinet storage path (e.g. burlesque.kch#dfunit=8#msiz=512M)") + flag.IntVar(&config.port, "port", 4401, "Server HTTP port") flag.Parse() } diff --git a/counter.go b/counter.go index 7e8748a..6232ba1 100644 --- a/counter.go +++ b/counter.go @@ -5,64 +5,64 @@ import ( ) const ( - MaxIndex = ^uint(0) + maxIndex = ^uint(0) ) type ( // Counter is responsible for operating queue read and write indexes - Counter struct { - WriteIndex uint // Number of the record last written to the queue - ReadIndex uint // Number of the record last read from the queue + counter struct { + writeIndex uint // Number of the record last written to the queue + readIndex uint // Number of the record last read from the queue // If WriteIndex is greater than ReadIndex then there are unread messages // If WriteIndex is less tham ReadIndex then MaxIndex was reached - Read chan uint + read chan uint mutex sync.Mutex streaming *sync.Cond } ) -func NewCounter(wi, ri uint) *Counter { +func newCounter(wi, ri uint) *counter { m := &sync.Mutex{} m.Lock() - c := &Counter{ - WriteIndex: wi, - ReadIndex: ri, - Read: make(chan uint), + c := &counter{ + writeIndex: wi, + readIndex: ri, + read: make(chan uint), streaming: sync.NewCond(m), } - go c.Stream() + go c.stream() return c } -func (c *Counter) Write(proc func(i uint) bool) { +func (c *counter) write(proc func(i uint) bool) { c.mutex.Lock() defer c.mutex.Unlock() - ok := proc(c.WriteIndex + 1) + ok := proc(c.writeIndex + 1) if ok { - c.WriteIndex++ + c.writeIndex++ c.streaming.Signal() } } -func (c *Counter) Distance() uint { - d := c.WriteIndex - c.ReadIndex +func (c *counter) distance() uint { + d := c.writeIndex - c.readIndex if d < 0 { - d += MaxIndex + d += maxIndex } return d } -func (c *Counter) Stream() { +func (c *counter) stream() { for { - if c.Distance() == 0 { + if c.distance() == 0 { c.streaming.Wait() } - c.Read <- c.ReadIndex + 1 - c.ReadIndex++ + c.read <- c.readIndex + 1 + c.readIndex++ } } diff --git a/logging.go b/logging.go index 38a9076..1f1d48f 100644 --- a/logging.go +++ b/logging.go @@ -1,29 +1,29 @@ package main import ( - "log" + loglib "log" "os" "runtime" ) var ( - logger *log.Logger + logger *loglib.Logger ) -func SetupLogging() { - logger = log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) +func setupLogging() { + logger = loglib.New(os.Stdout, "", loglib.Ldate|loglib.Lmicroseconds) - Log("Burlesque v%s started", Version) - Log("GOMAXPROCS is set to %d", runtime.GOMAXPROCS(-1)) - Log("Storage path: %s", Config.Storage) - Log("Server is running at http://127.0.0.1:%d", Config.Port) + log("Burlesque v%s started", version) + log("GOMAXPROCS is set to %d", runtime.GOMAXPROCS(-1)) + log("Storage path: %s", config.storage) + log("Server is running at http://127.0.0.1:%d", config.port) } -func Log(format string, args ...interface{}) { +func log(format string, args ...interface{}) { logger.Printf("[INFO] "+format, args...) } -func Error(err error, format string, args ...interface{}) { +func alert(err error, format string, args ...interface{}) { logger.Printf("[ERROR] "+format, args...) logger.Printf(" %s", err.Error()) } diff --git a/main.go b/main.go index b8ca843..77e8f7c 100644 --- a/main.go +++ b/main.go @@ -7,33 +7,33 @@ import ( ) const ( - Version = "0.1.3" + version = "0.1.3" ) -func HandleShutdown() { +func handleShutdown() { ch := make(chan os.Signal) signal.Notify(ch, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT) go func() { <-ch - SaveState() - Log("State successfully persisted") + saveState() + log("State successfully persisted") - CloseStorage() + closeStorage() - Log("Stopped") + log("Stopped") os.Exit(0) }() } func main() { - SetupConfig() - SetupLogging() - SetupStorage() - SetupServer() - HandleShutdown() - LoadState() - go KeepStatePersisted() - StartServer() + setupConfig() + setupLogging() + setupStorage() + setupServer() + handleShutdown() + loadState() + go keepStatePersisted() + startServer() } diff --git a/queue.go b/queue.go index 09ec445..9db7767 100644 --- a/queue.go +++ b/queue.go @@ -1,24 +1,24 @@ package main type ( - Queue struct { - Name string - Counter *Counter + queue struct { + name string + counter *counter } ) var ( - queues = make(map[string]*Queue) + queues = make(map[string]*queue) ) -func (q *Queue) Push(msg Message) bool { +func (q *queue) push(msg message) bool { var err error - q.Counter.Write(func(i uint) bool { - key := NewKey(q.Name, i) + q.counter.write(func(i uint) bool { + key := newKey(q.name, i) err = storage.Set(key, msg) if err != nil { - Error(err, "Failed to write %d bytes to record '%s'", len(msg), key) + alert(err, "Failed to write %d bytes to record '%s'", len(msg), key) } return (err == nil) @@ -27,49 +27,49 @@ func (q *Queue) Push(msg Message) bool { return (err == nil) } -func (q *Queue) TryFetch(abort chan bool) (Message, bool) { - if q.Counter.Distance() > 0 { - return q.Fetch(abort) +func (q *queue) tryFetch(abort chan bool) (message, bool) { + if q.counter.distance() > 0 { + return q.fetch(abort) } else { - return Message{}, false + return message{}, false } } -func (q *Queue) Fetch(abort chan bool) (Message, bool) { +func (q *queue) fetch(abort chan bool) (message, bool) { var i uint select { - case i = <-q.Counter.Read: + case i = <-q.counter.read: case <-abort: - return Message{}, false + return message{}, false } - key := NewKey(q.Name, i) - msg, err := storage.Get(key) + k := newKey(q.name, i) + msg, err := storage.Get(k) if err != nil { - Error(err, "Failed to read record '%s'", key) + alert(err, "Failed to read record '%s'", k) return msg, false } - err = storage.Remove(key) + err = storage.Remove(k) if err != nil { - Error(err, "Failed to delete record '%s'", key) + alert(err, "Failed to delete record '%s'", k) return msg, false } return msg, true } -func GetQueue(name string) *Queue { +func getQueue(name string) *queue { if _, ok := queues[name]; !ok { - RegisterQueue(name, 0, 0) + registerQueue(name, 0, 0) } return queues[name] } -func RegisterQueue(name string, wi, ri uint) { - queues[name] = &Queue{ - Name: name, - Counter: NewCounter(wi, ri), +func registerQueue(name string, wi, ri uint) { + queues[name] = &queue{ + name: name, + counter: newCounter(wi, ri), } } diff --git a/request.go b/request.go index eb6eefc..7bc1c28 100644 --- a/request.go +++ b/request.go @@ -5,34 +5,34 @@ import ( ) type ( - Request struct { - Queues []string - ResponseCh chan Response - Abort chan bool - Dead bool + request struct { + queues []string + responseCh chan response + abort chan bool + dead bool } - Response struct { - Queue string - Message Message + response struct { + queue string + message message } ) var ( pool struct { - Requests []*Request + requests []*request mutex sync.Mutex } ) -func RegisterPublication(q string, msg Message) bool { - for _, r := range pool.Requests { - if r.Dead { +func registerPublication(q string, msg message) bool { + for _, r := range pool.requests { + if r.dead { continue } - for _, qname := range r.Queues { + for _, qname := range r.queues { if qname == q { - rsp := Response{Queue: q, Message: msg} - ok := r.TryRespond(rsp) + rsp := response{queue: q, message: msg} + ok := r.tryRespond(rsp) if ok { return true } @@ -40,41 +40,41 @@ func RegisterPublication(q string, msg Message) bool { } } - ok := GetQueue(q).Push(msg) + ok := getQueue(q).push(msg) return ok } -func RegisterSubscription(r *Request) { - for _, qname := range r.Queues { - q := GetQueue(qname) - msg, ok := q.TryFetch(r.Abort) +func registerSubscription(r *request) { + for _, qname := range r.queues { + q := getQueue(qname) + msg, ok := q.tryFetch(r.abort) if ok { - rsp := Response{Queue: qname, Message: msg} - ok := r.TryRespond(rsp) + rsp := response{queue: qname, message: msg} + ok := r.tryRespond(rsp) if !ok { - q.Push(msg) + q.push(msg) } return } } - pool.Requests = append(pool.Requests, r) + pool.requests = append(pool.requests, r) } -func (r *Request) TryRespond(rsp Response) bool { +func (r *request) tryRespond(rsp response) bool { okch := make(chan bool) go func() { defer func() { err := recover() if err != nil { // Panic! - r.Dead = true + r.dead = true okch <- false } }() - r.ResponseCh <- rsp // If channel is already closed expect a panic + r.responseCh <- rsp // If channel is already closed expect a panic okch <- true }() @@ -82,15 +82,15 @@ func (r *Request) TryRespond(rsp Response) bool { return ok } -func (r *Request) Purge() { +func (r *request) purge() { pool.mutex.Lock() defer pool.mutex.Unlock() - r.Dead = true + r.dead = true deleted := 0 - for i, req := range pool.Requests { - if req.Dead { - pool.Requests = append(pool.Requests[:i-deleted], pool.Requests[i-deleted+1:]...) + for i, req := range pool.requests { + if req.dead { + pool.requests = append(pool.requests[:i-deleted], pool.requests[i-deleted+1:]...) deleted++ } } diff --git a/server.go b/server.go index 2e871d3..652c90d 100644 --- a/server.go +++ b/server.go @@ -10,26 +10,26 @@ import ( "strings" ) -func StartServer() { - port := fmt.Sprintf(":%d", Config.Port) +func startServer() { + port := fmt.Sprintf(":%d", config.port) err := http.ListenAndServe(port, nil) if err != nil { - Error(err, "Error starting server on port %d", Config.Port) + alert(err, "Error starting server on port %d", config.port) } } -func StatusHandler(w http.ResponseWriter, r *http.Request) { +func statusHandler(w http.ResponseWriter, r *http.Request) { info := make(map[string]map[string]uint) for _, q := range queues { - info[q.Name] = map[string]uint{ - "messages": q.Counter.Distance(), + info[q.name] = map[string]uint{ + "messages": q.counter.distance(), "subscriptions": 0, } } - for _, r := range pool.Requests { - for _, q := range r.Queues { + for _, r := range pool.requests { + for _, q := range r.queues { info[q]["subscriptions"]++ } } @@ -38,14 +38,14 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { w.Write(jsn) } -func DebugHandler(w http.ResponseWriter, r *http.Request) { +func debugHandler(w http.ResponseWriter, r *http.Request) { info := make(map[string]interface{}) - info["version"] = Version + info["version"] = version info["goroutines"] = runtime.NumGoroutine() s, err := storage.Status() if err != nil { - Error(err, "Failed to get Kyoto Cabinet status") + alert(err, "Failed to get Kyoto Cabinet status") } s = s[:len(s)-1] // Removing trailing new line @@ -66,15 +66,15 @@ func DebugHandler(w http.ResponseWriter, r *http.Request) { w.Write(jsn) } -func PublishHandler(w http.ResponseWriter, r *http.Request) { +func publishHandler(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() msg, _ := ioutil.ReadAll(r.Body) if len(msg) == 0 { - msg = Message(r.FormValue("msg")) + msg = message(r.FormValue("msg")) } qname := r.FormValue("queue") - ok := RegisterPublication(qname, msg) + ok := registerPublication(qname, msg) if ok { w.Write([]byte("OK")) @@ -83,15 +83,15 @@ func PublishHandler(w http.ResponseWriter, r *http.Request) { } } -func SubscriptionHandler(w http.ResponseWriter, r *http.Request) { - rch := make(chan Response) +func subscriptionHandler(w http.ResponseWriter, r *http.Request) { + rch := make(chan response) abort := make(chan bool, 1) - req := &Request{ - Queues: strings.Split(r.FormValue("queues"), ","), - ResponseCh: rch, - Abort: abort, + req := &request{ + queues: strings.Split(r.FormValue("queues"), ","), + responseCh: rch, + abort: abort, } - go RegisterSubscription(req) + go registerSubscription(req) disconnected := w.(http.CloseNotifier).CloseNotify() finished := make(chan bool) @@ -102,7 +102,7 @@ func SubscriptionHandler(w http.ResponseWriter, r *http.Request) { abort <- true case <-finished: } - req.Purge() + req.purge() }() res, ok := <-rch @@ -110,15 +110,15 @@ func SubscriptionHandler(w http.ResponseWriter, r *http.Request) { return } - w.Header().Set("Queue", res.Queue) - w.Write(res.Message) + w.Header().Set("Queue", res.queue) + w.Write(res.message) finished <- true } -func SetupServer() { - http.HandleFunc("/status", StatusHandler) - http.HandleFunc("/debug", DebugHandler) - http.HandleFunc("/publish", PublishHandler) - http.HandleFunc("/subscribe", SubscriptionHandler) +func setupServer() { + http.HandleFunc("/status", statusHandler) + http.HandleFunc("/debug", debugHandler) + http.HandleFunc("/publish", publishHandler) + http.HandleFunc("/subscribe", subscriptionHandler) } diff --git a/state.go b/state.go index 5e3e3be..45d230c 100644 --- a/state.go +++ b/state.go @@ -6,63 +6,64 @@ import ( ) type ( - QueueState map[string]uint - ServerState map[string]QueueState + queueState map[string]uint + serverState map[string]queueState ) const ( - StateMetaKey = "state" - StateSaveInterval = 1 // seconds + stateMetaKey = "state" + stateSaveInterval = 1 // seconds ) -func SaveState() { - state := make(ServerState) +func saveState() { + state := make(serverState) for _, q := range queues { - state[q.Name] = QueueState{ - "wi": q.Counter.WriteIndex, - "ri": q.Counter.ReadIndex, + state[q.name] = queueState{ + "wi": q.counter.writeIndex, + "ri": q.counter.readIndex, } } jsn, _ := json.Marshal(state) - key := Key(StateMetaKey) - if err := storage.Set(key, jsn); err != nil { - Error(err, "Failed to persist state") + k := key(stateMetaKey) + if err := storage.Set(k, jsn); err != nil { + alert(err, "Failed to persist state") return } } -func LoadState() { - state := make(ServerState) - key := Key(StateMetaKey) +func loadState() { + state := make(serverState) + k := key(stateMetaKey) - jsn, err := storage.Get(key) + jsn, err := storage.Get(k) if err != nil { - Log("State not found") + log("State not found") return } err = json.Unmarshal(jsn, &state) if err != nil { - Log("Failed to load state") + log("Failed to load state") return } for qname, meta := range state { - RegisterQueue(qname, meta["wi"], meta["ri"]) + registerQueue(qname, meta["wi"], meta["ri"]) } - Log("State successfully loaded") + log("State successfully loaded") } -func KeepStatePersisted() { - t := time.NewTicker(StateSaveInterval * time.Second) +func keepStatePersisted() { + t := time.NewTicker(stateSaveInterval * time.Second) + for { <-t.C - SaveState() + saveState() err := storage.Sync(false) if err != nil { - Error(err, "Failed to sync storage") + alert(err, "Failed to sync storage") } } } diff --git a/storage.go b/storage.go index 2c10120..b991f16 100644 --- a/storage.go +++ b/storage.go @@ -8,41 +8,42 @@ import ( ) type ( - Message []byte - Key []byte + message []byte + key []byte ) var ( storage = cabinet.New() ) -func NewKey(queue string, index uint) Key { +func newKey(queue string, index uint) key { istr := strconv.FormatUint(uint64(index), 10) - key := strings.Join([]string{queue, istr}, "_") - return Key(key) + k := strings.Join([]string{queue, istr}, "_") + + return key(k) } -func SetupStorage() { - err := storage.Open(Config.Storage, cabinet.KCOWRITER|cabinet.KCOCREATE) +func setupStorage() { + err := storage.Open(config.storage, cabinet.KCOWRITER|cabinet.KCOCREATE) if err != nil { - Error(err, "Failed to open database '%s'", Config.Storage) + alert(err, "Failed to open database '%s'", config.storage) } } -func CloseStorage() { +func closeStorage() { var err error err = storage.Sync(true) if err != nil { - Error(err, "Failed to sync storage (hard)") + alert(err, "Failed to sync storage (hard)") } else { - Log("Storage synchronized") + log("Storage synchronized") } err = storage.Close() if err != nil { - Error(err, "Failed to close storage") + alert(err, "Failed to close storage") } else { - Log("Storage closed") + log("Storage closed") } }