diff --git a/hub/subscription.go b/hub/subscription.go index e0a08c0..38b79dc 100644 --- a/hub/subscription.go +++ b/hub/subscription.go @@ -3,7 +3,7 @@ package hub type ( Subscription struct { Queues []string - result chan<- Result + result chan Result done chan struct{} } Result struct { @@ -12,10 +12,10 @@ type ( } ) -func NewSubscription(queues []string, result chan<- Result) *Subscription { +func NewSubscription(queues []string) *Subscription { return &Subscription{ Queues: queues, - result: result, + result: make(chan Result), done: make(chan struct{}), } } @@ -47,10 +47,15 @@ func (s *Subscription) Send(res Result) bool { return <-success } +func (s *Subscription) Result() <-chan Result { + return s.result +} + func (s *Subscription) Done() <-chan struct{} { return s.done } func (s *Subscription) Close() { close(s.done) + close(s.result) } diff --git a/server/server.go b/server/server.go index 2d80619..b561572 100644 --- a/server/server.go +++ b/server/server.go @@ -75,11 +75,8 @@ func (s *Server) pubHandler(w http.ResponseWriter, r *http.Request) { } func (s *Server) subHandler(w http.ResponseWriter, r *http.Request) { - result := make(chan hub.Result) queues := strings.Split(r.FormValue("queues"), ",") - - sub := hub.NewSubscription(queues, result) - defer sub.Close() + sub := hub.NewSubscription(queues) finished := make(chan struct{}) defer close(finished) @@ -88,14 +85,15 @@ func (s *Server) subHandler(w http.ResponseWriter, r *http.Request) { go func() { select { case <-disconnected: - sub.Close() case <-finished: } + sub.Close() }() go s.hub.Sub(sub) - res := <-result - w.Header().Set("Queue", res.Queue) - w.Write(res.Message) + if res, ok := <-sub.Result(); ok { + w.Header().Set("Queue", res.Queue) + w.Write(res.Message) + } }