From 1cce73211f962991ef741b01bdebaa27b36d254c Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Thu, 17 Jul 2014 00:50:05 +0700 Subject: [PATCH] Stop polling counter on disconnect --- counter.go | 9 +++++++-- main.go | 10 +++++----- queue.go | 9 ++++----- request.go | 11 ++++++++--- server.go | 5 +++-- 5 files changed, 27 insertions(+), 17 deletions(-) diff --git a/counter.go b/counter.go index b0acfaf..5717f72 100644 --- a/counter.go +++ b/counter.go @@ -48,8 +48,13 @@ func (c *Counter) Write(proc func(i uint) bool) { } } -func (c *Counter) Read() uint { - return <-c.stream +func (c *Counter) Read(abort chan bool) uint { + select { + case i := <-c.stream: + return i + case <-abort: + return 0 + } } func (c *Counter) Distance() uint { diff --git a/main.go b/main.go index b1b007f..44853f5 100644 --- a/main.go +++ b/main.go @@ -19,8 +19,7 @@ func HandleShutdown() { SaveState() Log("State successfully persisted") - storage.Close() - Log("Storage closed") + CloseStorage() Log("Waiting for rollbar...") rollbar.Wait() @@ -38,9 +37,10 @@ func main() { HandleShutdown() LoadState() go KeepStatePersisted() - go PersistMessages() - port := fmt.Sprintf(":%d", Config.Port) - http.ListenAndServe(port, nil) + err := http.ListenAndServe(port, nil) + if err != nil { + Error(err, "Error starting server on port %d", Config.Port) + } } diff --git a/queue.go b/queue.go index 0eb4a87..3df1587 100644 --- a/queue.go +++ b/queue.go @@ -3,7 +3,6 @@ package main type ( Queue struct { Name string - Counter Counter Counter *Counter } ) @@ -28,16 +27,16 @@ func (q *Queue) Push(msg Message) bool { return (err == nil) } -func (q *Queue) TryFetch() (Message, bool) { +func (q *Queue) TryFetch(abort chan bool) (Message, bool) { if q.Counter.Distance() > 0 { - return q.Fetch() + return q.Fetch(abort) } else { return Message{}, false } } -func (q *Queue) Fetch() (Message, bool) { - i := q.Counter.Read() +func (q *Queue) Fetch(abort chan bool) (Message, bool) { + i := q.Counter.Read(abort) key := NewKey(q.Name, i) msg, err := storage.Get(key) diff --git a/request.go b/request.go index 78075ce..bce61d5 100644 --- a/request.go +++ b/request.go @@ -8,6 +8,7 @@ type ( Request struct { Queues []string Callback func(*Response) + Abort chan bool } Response struct { Queue string @@ -19,23 +20,27 @@ var ( pool = []*Request{} ) -func Register(q string, msg Message) { +func Register(q string, msg Message) bool { for i, r := range pool { for _, queueName := range r.Queues { if queueName == q { go r.Callback(&Response{Queue: queueName, Message: msg}) pool = append(pool[:i], pool[i+1:]...) return + + return true } } } - GetQueue(q).Push(msg) + + ok := GetQueue(q).Push(msg) + return ok } func Process(r *Request) { for _, queueName := range r.Queues { q := GetQueue(queueName) - msg, ok := q.TryFetch() + msg, ok := q.TryFetch(r.Abort) if ok { go r.Callback(&Response{Queue: queueName, Message: msg}) return diff --git a/server.go b/server.go index 9d0d45a..db06cde 100644 --- a/server.go +++ b/server.go @@ -57,13 +57,14 @@ func PublishHandler(w http.ResponseWriter, r *http.Request) { func SubscriptionHandler(w http.ResponseWriter, r *http.Request) { rch := make(chan *Response) + abort := make(chan bool, 1) req := &Request{ Queues: strings.Split(r.FormValue("queues"), ","), Callback: func(r *Response) { rch <- r }, + Abort: abort, } - go Process(req) disconnected := w.(http.CloseNotifier).CloseNotify() @@ -72,8 +73,8 @@ func SubscriptionHandler(w http.ResponseWriter, r *http.Request) { select { case <-disconnected: rch <- nil + abort <- true case <-finished: - break } Purge(req) }()