1
0
Fork 0

Fix request pool

This commit is contained in:
Gregory Eremin 2014-07-17 01:57:54 +07:00
parent 1c62c6839b
commit 0339b45090
4 changed files with 34 additions and 20 deletions

View File

@ -5,7 +5,7 @@ import (
) )
const ( const (
// DefaultProductionStorage = "burlesque.kch#opts=c#zcomp=gz#msiz=524288000" // With compression: burlesque.kch#opts=c#zcomp=gz#msiz=524288000
DefaultProductionStorage = "burlesque.kch#msiz=524288000" DefaultProductionStorage = "burlesque.kch#msiz=524288000"
) )

View File

@ -29,7 +29,7 @@ func NewCounter(wi, ri uint) *Counter {
c := &Counter{ c := &Counter{
WriteIndex: wi, WriteIndex: wi,
ReadIndex: ri, ReadIndex: ri,
stream: make(chan uint), Read: make(chan uint),
streaming: sync.NewCond(m), streaming: sync.NewCond(m),
} }
@ -62,7 +62,7 @@ func (c *Counter) Stream() {
c.streaming.Wait() c.streaming.Wait()
} }
c.stream <- c.ReadIndex + 1 c.Read <- c.ReadIndex + 1
c.ReadIndex++ c.ReadIndex++
} }
} }

View File

@ -1,7 +1,7 @@
package main package main
import ( import (
"reflect" "sync"
) )
type ( type (
@ -9,6 +9,7 @@ type (
Queues []string Queues []string
Callback func(*Response) Callback func(*Response)
Abort chan bool Abort chan bool
Dead bool
} }
Response struct { Response struct {
Queue string Queue string
@ -17,27 +18,32 @@ type (
) )
var ( var (
pool = []*Request{} pool struct {
Requests []*Request
mutex sync.Mutex
}
) )
func Register(q string, msg Message) bool { func RegisterPublication(q string, msg Message) bool {
for i, r := range pool { pool.mutex.Lock()
for i, r := range pool.Requests {
for _, queueName := range r.Queues { for _, queueName := range r.Queues {
if queueName == q { if queueName == q {
go r.Callback(&Response{Queue: queueName, Message: msg}) go r.Callback(&Response{Queue: queueName, Message: msg})
pool = append(pool[:i], pool[i+1:]...) pool.Requests = append(pool.Requests[:i], pool.Requests[i+1:]...)
return defer pool.mutex.Unlock()
return true return true
} }
} }
} }
pool.mutex.Unlock()
ok := GetQueue(q).Push(msg) ok := GetQueue(q).Push(msg)
return ok return ok
} }
func Process(r *Request) { func RegisterSubscription(r *Request) {
for _, queueName := range r.Queues { for _, queueName := range r.Queues {
q := GetQueue(queueName) q := GetQueue(queueName)
msg, ok := q.TryFetch(r.Abort) msg, ok := q.TryFetch(r.Abort)
@ -46,14 +52,22 @@ func Process(r *Request) {
return return
} }
} }
pool = append(pool, r)
pool.mutex.Lock()
pool.Requests = append(pool.Requests, r)
pool.mutex.Unlock()
} }
func Purge(r *Request) { func (r *Request) Purge() {
for i, req := range pool { pool.mutex.Lock()
if reflect.ValueOf(r).Pointer() == reflect.ValueOf(req).Pointer() { defer pool.mutex.Unlock()
pool = append(pool[:i], pool[i+1:]...)
return r.Dead = true
deleted := 0
for i, req := range pool.Requests {
if req.Dead {
pool.Requests = append(pool.Requests[:i-deleted], pool.Requests[i-deleted+1:]...)
deleted++
} }
} }
} }

View File

@ -18,7 +18,7 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) {
} }
} }
for _, r := range pool { for _, r := range pool.Requests {
for _, q := range r.Queues { for _, q := range r.Queues {
info[q]["subscriptions"]++ info[q]["subscriptions"]++
} }
@ -44,7 +44,7 @@ func PublishHandler(w http.ResponseWriter, r *http.Request) {
} }
queueName := r.FormValue("queue") queueName := r.FormValue("queue")
ok := Register(queueName, msg) ok := RegisterPublication(queueName, msg)
if ok { if ok {
Debug("Published message of %d bytes to queue %s", len(msg), queueName) Debug("Published message of %d bytes to queue %s", len(msg), queueName)
@ -65,7 +65,7 @@ func SubscriptionHandler(w http.ResponseWriter, r *http.Request) {
}, },
Abort: abort, Abort: abort,
} }
go Process(req) go RegisterSubscription(req)
disconnected := w.(http.CloseNotifier).CloseNotify() disconnected := w.(http.CloseNotifier).CloseNotify()
finished := make(chan bool) finished := make(chan bool)
@ -74,9 +74,9 @@ func SubscriptionHandler(w http.ResponseWriter, r *http.Request) {
case <-disconnected: case <-disconnected:
rch <- nil rch <- nil
abort <- true abort <- true
req.Purge()
case <-finished: case <-finished:
} }
Purge(req)
}() }()
res := <-rch res := <-rch