Safely write to response channels

This commit is contained in:
Gregory Eremin 2014-07-17 20:22:37 +07:00
parent b71def6cf3
commit 580a618b5f
4 changed files with 58 additions and 36 deletions

View File

@ -6,7 +6,7 @@ import (
const ( const (
// With compression: burlesque.kch#opts=c#zcomp=gz#msiz=524288000 // With compression: burlesque.kch#opts=c#zcomp=gz#msiz=524288000
DefaultProductionStorage = "burlesque.kch#dfunit=100#msiz=512M" DefaultProductionStorage = "burlesque.kch#dfunit=8#msiz=512M"
) )
var ( var (

View File

@ -7,7 +7,7 @@ import (
type ( type (
Request struct { Request struct {
Queues []string Queues []string
Callback func(*Response) ResponseCh chan Response
Abort chan bool Abort chan bool
Dead bool Dead bool
} }
@ -25,37 +25,61 @@ var (
) )
func RegisterPublication(q string, msg Message) bool { func RegisterPublication(q string, msg Message) bool {
pool.mutex.Lock() for _, r := range pool.Requests {
for i, r := range pool.Requests { if r.Dead {
for _, queueName := range r.Queues { continue
if queueName == q { }
go r.Callback(&Response{Queue: queueName, Message: msg}) for _, qname := range r.Queues {
pool.Requests = append(pool.Requests[:i], pool.Requests[i+1:]...) if qname == q {
defer pool.mutex.Unlock() rsp := Response{Queue: q, Message: msg}
ok := r.TryRespond(rsp)
if ok {
return true return true
} }
} }
} }
pool.mutex.Unlock() }
ok := GetQueue(q).Push(msg) ok := GetQueue(q).Push(msg)
return ok return ok
} }
func RegisterSubscription(r *Request) { func RegisterSubscription(r *Request) {
for _, queueName := range r.Queues { for _, qname := range r.Queues {
q := GetQueue(queueName) q := GetQueue(qname)
msg, ok := q.TryFetch(r.Abort) msg, ok := q.TryFetch(r.Abort)
if ok { if ok {
go r.Callback(&Response{Queue: queueName, Message: msg}) rsp := Response{Queue: qname, Message: msg}
ok := r.TryRespond(rsp)
if !ok {
q.Push(msg)
}
return return
} }
} }
pool.mutex.Lock()
pool.Requests = append(pool.Requests, r) pool.Requests = append(pool.Requests, r)
pool.mutex.Unlock() }
func (r *Request) TryRespond(rsp Response) bool {
okch := make(chan bool)
go func() {
defer func() {
err := recover()
if err != nil {
r.Dead = true
okch <- false
}
}()
r.ResponseCh <- rsp
okch <- true
}()
ok := <-okch
return ok
} }
func (r *Request) Purge() { func (r *Request) Purge() {

View File

@ -49,26 +49,24 @@ func PublishHandler(w http.ResponseWriter, r *http.Request) {
msg = Message(r.FormValue("msg")) msg = Message(r.FormValue("msg"))
} }
queueName := r.FormValue("queue") qname := r.FormValue("queue")
ok := RegisterPublication(queueName, msg) ok := RegisterPublication(qname, msg)
if ok { if ok {
Debug("Published message of %d bytes to queue %s", len(msg), queueName) Debug("Published message of %d bytes to queue %s", len(msg), qname)
w.Write([]byte("OK")) w.Write([]byte("OK"))
} else { } else {
Debug("Failed to publish message of %d bytes to queue %s", len(msg), queueName) Debug("Failed to publish message of %d bytes to queue %s", len(msg), qname)
http.Error(w, "FAIL", 500) http.Error(w, "FAIL", 500)
} }
} }
func SubscriptionHandler(w http.ResponseWriter, r *http.Request) { func SubscriptionHandler(w http.ResponseWriter, r *http.Request) {
rch := make(chan *Response) rch := make(chan Response)
abort := make(chan bool, 1) abort := make(chan bool, 1)
req := &Request{ req := &Request{
Queues: strings.Split(r.FormValue("queues"), ","), Queues: strings.Split(r.FormValue("queues"), ","),
Callback: func(r *Response) { ResponseCh: rch,
rch <- r
},
Abort: abort, Abort: abort,
} }
go RegisterSubscription(req) go RegisterSubscription(req)
@ -78,15 +76,15 @@ func SubscriptionHandler(w http.ResponseWriter, r *http.Request) {
go func() { go func() {
select { select {
case <-disconnected: case <-disconnected:
rch <- nil close(rch)
abort <- true abort <- true
req.Purge()
case <-finished: case <-finished:
} }
req.Purge()
}() }()
res := <-rch res, ok := <-rch
if res == nil { if !ok {
return return
} }

View File

@ -48,8 +48,8 @@ func LoadState() {
return return
} }
for queueName, meta := range state { for qname, meta := range state {
RegisterQueue(queueName, meta["wi"], meta["ri"]) RegisterQueue(qname, meta["wi"], meta["ri"])
} }
Log("State successfully loaded") Log("State successfully loaded")