diff --git a/config.go b/config.go index e1b10e1..f25c6e0 100644 --- a/config.go +++ b/config.go @@ -5,7 +5,7 @@ import ( ) const ( - // DefaultProductionStorage = "burlesque.kch#opts=c#zcomp=gz#msiz=524288000" + // With compression: burlesque.kch#opts=c#zcomp=gz#msiz=524288000 DefaultProductionStorage = "burlesque.kch#msiz=524288000" ) diff --git a/counter.go b/counter.go index 992493a..7e8748a 100644 --- a/counter.go +++ b/counter.go @@ -29,7 +29,7 @@ func NewCounter(wi, ri uint) *Counter { c := &Counter{ WriteIndex: wi, ReadIndex: ri, - stream: make(chan uint), + Read: make(chan uint), streaming: sync.NewCond(m), } @@ -62,7 +62,7 @@ func (c *Counter) Stream() { c.streaming.Wait() } - c.stream <- c.ReadIndex + 1 + c.Read <- c.ReadIndex + 1 c.ReadIndex++ } } diff --git a/request.go b/request.go index bce61d5..7e25bd2 100644 --- a/request.go +++ b/request.go @@ -1,7 +1,7 @@ package main import ( - "reflect" + "sync" ) type ( @@ -9,6 +9,7 @@ type ( Queues []string Callback func(*Response) Abort chan bool + Dead bool } Response struct { Queue string @@ -17,27 +18,32 @@ type ( ) var ( - pool = []*Request{} + pool struct { + Requests []*Request + mutex sync.Mutex + } ) -func Register(q string, msg Message) bool { - for i, r := range pool { +func RegisterPublication(q string, msg Message) bool { + pool.mutex.Lock() + for i, r := range pool.Requests { for _, queueName := range r.Queues { if queueName == q { go r.Callback(&Response{Queue: queueName, Message: msg}) - pool = append(pool[:i], pool[i+1:]...) - return + pool.Requests = append(pool.Requests[:i], pool.Requests[i+1:]...) + defer pool.mutex.Unlock() return true } } } + pool.mutex.Unlock() ok := GetQueue(q).Push(msg) return ok } -func Process(r *Request) { +func RegisterSubscription(r *Request) { for _, queueName := range r.Queues { q := GetQueue(queueName) msg, ok := q.TryFetch(r.Abort) @@ -46,14 +52,22 @@ func Process(r *Request) { return } } - pool = append(pool, r) + + pool.mutex.Lock() + pool.Requests = append(pool.Requests, r) + pool.mutex.Unlock() } -func Purge(r *Request) { - for i, req := range pool { - if reflect.ValueOf(r).Pointer() == reflect.ValueOf(req).Pointer() { - pool = append(pool[:i], pool[i+1:]...) - return +func (r *Request) Purge() { + pool.mutex.Lock() + defer pool.mutex.Unlock() + + r.Dead = true + deleted := 0 + for i, req := range pool.Requests { + if req.Dead { + pool.Requests = append(pool.Requests[:i-deleted], pool.Requests[i-deleted+1:]...) + deleted++ } } } diff --git a/server.go b/server.go index db06cde..38d7180 100644 --- a/server.go +++ b/server.go @@ -18,7 +18,7 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { } } - for _, r := range pool { + for _, r := range pool.Requests { for _, q := range r.Queues { info[q]["subscriptions"]++ } @@ -44,7 +44,7 @@ func PublishHandler(w http.ResponseWriter, r *http.Request) { } queueName := r.FormValue("queue") - ok := Register(queueName, msg) + ok := RegisterPublication(queueName, msg) if ok { Debug("Published message of %d bytes to queue %s", len(msg), queueName) @@ -65,7 +65,7 @@ func SubscriptionHandler(w http.ResponseWriter, r *http.Request) { }, Abort: abort, } - go Process(req) + go RegisterSubscription(req) disconnected := w.(http.CloseNotifier).CloseNotify() finished := make(chan bool) @@ -74,9 +74,9 @@ func SubscriptionHandler(w http.ResponseWriter, r *http.Request) { case <-disconnected: rch <- nil abort <- true + req.Purge() case <-finished: } - Purge(req) }() res := <-rch