1
0
Fork 0

Hide result channel into subscription

This commit is contained in:
Gregory Eremin 2014-09-24 16:26:05 +04:00
parent a73ec0887c
commit f29268776b
2 changed files with 14 additions and 11 deletions

View File

@ -3,7 +3,7 @@ package hub
type (
Subscription struct {
Queues []string
result chan<- Result
result chan Result
done chan struct{}
}
Result struct {
@ -12,10 +12,10 @@ type (
}
)
func NewSubscription(queues []string, result chan<- Result) *Subscription {
func NewSubscription(queues []string) *Subscription {
return &Subscription{
Queues: queues,
result: result,
result: make(chan Result),
done: make(chan struct{}),
}
}
@ -47,10 +47,15 @@ func (s *Subscription) Send(res Result) bool {
return <-success
}
func (s *Subscription) Result() <-chan Result {
return s.result
}
func (s *Subscription) Done() <-chan struct{} {
return s.done
}
func (s *Subscription) Close() {
close(s.done)
close(s.result)
}

View File

@ -75,11 +75,8 @@ func (s *Server) pubHandler(w http.ResponseWriter, r *http.Request) {
}
func (s *Server) subHandler(w http.ResponseWriter, r *http.Request) {
result := make(chan hub.Result)
queues := strings.Split(r.FormValue("queues"), ",")
sub := hub.NewSubscription(queues, result)
defer sub.Close()
sub := hub.NewSubscription(queues)
finished := make(chan struct{})
defer close(finished)
@ -88,14 +85,15 @@ func (s *Server) subHandler(w http.ResponseWriter, r *http.Request) {
go func() {
select {
case <-disconnected:
sub.Close()
case <-finished:
}
sub.Close()
}()
go s.hub.Sub(sub)
res := <-result
w.Header().Set("Queue", res.Queue)
w.Write(res.Message)
if res, ok := <-sub.Result(); ok {
w.Header().Set("Queue", res.Queue)
w.Write(res.Message)
}
}