Stop polling counter on disconnect

This commit is contained in:
Gregory Eremin 2014-07-17 00:50:05 +07:00
parent a84f1241a9
commit 1cce73211f
5 changed files with 27 additions and 17 deletions

View File

@ -48,8 +48,13 @@ func (c *Counter) Write(proc func(i uint) bool) {
} }
} }
func (c *Counter) Read() uint { func (c *Counter) Read(abort chan bool) uint {
return <-c.stream select {
case i := <-c.stream:
return i
case <-abort:
return 0
}
} }
func (c *Counter) Distance() uint { func (c *Counter) Distance() uint {

10
main.go
View File

@ -19,8 +19,7 @@ func HandleShutdown() {
SaveState() SaveState()
Log("State successfully persisted") Log("State successfully persisted")
storage.Close() CloseStorage()
Log("Storage closed")
Log("Waiting for rollbar...") Log("Waiting for rollbar...")
rollbar.Wait() rollbar.Wait()
@ -38,9 +37,10 @@ func main() {
HandleShutdown() HandleShutdown()
LoadState() LoadState()
go KeepStatePersisted() go KeepStatePersisted()
go PersistMessages()
port := fmt.Sprintf(":%d", Config.Port) port := fmt.Sprintf(":%d", Config.Port)
http.ListenAndServe(port, nil) err := http.ListenAndServe(port, nil)
if err != nil {
Error(err, "Error starting server on port %d", Config.Port)
}
} }

View File

@ -3,7 +3,6 @@ package main
type ( type (
Queue struct { Queue struct {
Name string Name string
Counter Counter
Counter *Counter Counter *Counter
} }
) )
@ -28,16 +27,16 @@ func (q *Queue) Push(msg Message) bool {
return (err == nil) return (err == nil)
} }
func (q *Queue) TryFetch() (Message, bool) { func (q *Queue) TryFetch(abort chan bool) (Message, bool) {
if q.Counter.Distance() > 0 { if q.Counter.Distance() > 0 {
return q.Fetch() return q.Fetch(abort)
} else { } else {
return Message{}, false return Message{}, false
} }
} }
func (q *Queue) Fetch() (Message, bool) { func (q *Queue) Fetch(abort chan bool) (Message, bool) {
i := q.Counter.Read() i := q.Counter.Read(abort)
key := NewKey(q.Name, i) key := NewKey(q.Name, i)
msg, err := storage.Get(key) msg, err := storage.Get(key)

View File

@ -8,6 +8,7 @@ type (
Request struct { Request struct {
Queues []string Queues []string
Callback func(*Response) Callback func(*Response)
Abort chan bool
} }
Response struct { Response struct {
Queue string Queue string
@ -19,23 +20,27 @@ var (
pool = []*Request{} pool = []*Request{}
) )
func Register(q string, msg Message) { func Register(q string, msg Message) bool {
for i, r := range pool { for i, r := range pool {
for _, queueName := range r.Queues { for _, queueName := range r.Queues {
if queueName == q { if queueName == q {
go r.Callback(&Response{Queue: queueName, Message: msg}) go r.Callback(&Response{Queue: queueName, Message: msg})
pool = append(pool[:i], pool[i+1:]...) pool = append(pool[:i], pool[i+1:]...)
return return
return true
} }
} }
} }
GetQueue(q).Push(msg)
ok := GetQueue(q).Push(msg)
return ok
} }
func Process(r *Request) { func Process(r *Request) {
for _, queueName := range r.Queues { for _, queueName := range r.Queues {
q := GetQueue(queueName) q := GetQueue(queueName)
msg, ok := q.TryFetch() msg, ok := q.TryFetch(r.Abort)
if ok { if ok {
go r.Callback(&Response{Queue: queueName, Message: msg}) go r.Callback(&Response{Queue: queueName, Message: msg})
return return

View File

@ -57,13 +57,14 @@ func PublishHandler(w http.ResponseWriter, r *http.Request) {
func SubscriptionHandler(w http.ResponseWriter, r *http.Request) { func SubscriptionHandler(w http.ResponseWriter, r *http.Request) {
rch := make(chan *Response) rch := make(chan *Response)
abort := make(chan bool, 1)
req := &Request{ req := &Request{
Queues: strings.Split(r.FormValue("queues"), ","), Queues: strings.Split(r.FormValue("queues"), ","),
Callback: func(r *Response) { Callback: func(r *Response) {
rch <- r rch <- r
}, },
Abort: abort,
} }
go Process(req) go Process(req)
disconnected := w.(http.CloseNotifier).CloseNotify() disconnected := w.(http.CloseNotifier).CloseNotify()
@ -72,8 +73,8 @@ func SubscriptionHandler(w http.ResponseWriter, r *http.Request) {
select { select {
case <-disconnected: case <-disconnected:
rch <- nil rch <- nil
abort <- true
case <-finished: case <-finished:
break
} }
Purge(req) Purge(req)
}() }()