Contextless hub
This commit is contained in:
		
							parent
							
								
									c1d828b222
								
							
						
					
					
						commit
						69b94c077f
					
				
							
								
								
									
										40
									
								
								hub/hub.go
									
									
									
									
									
								
							
							
						
						
									
										40
									
								
								hub/hub.go
									
									
									
									
									
								
							| @ -1,27 +1,47 @@ | ||||
| package hub | ||||
| 
 | ||||
| import ( | ||||
| 	"code.google.com/p/go.net/context" | ||||
| 	"github.com/KosyanMedia/burlesque/storage" | ||||
| ) | ||||
| 
 | ||||
| type ( | ||||
| 	Hub struct { | ||||
| 		storage     *storage.Storage | ||||
| 		subscribers []*context.Context | ||||
| 		subscribers []*Subscription | ||||
| 	} | ||||
| ) | ||||
| 
 | ||||
| func New() (h *Hub) { | ||||
| 	h = Hub{} | ||||
| 
 | ||||
| 	return | ||||
| func New(st *storage.Storage) *Hub { | ||||
| 	return &Hub{ | ||||
| 		storage:     st, | ||||
| 		subscribers: []*Subscription{}, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (h *Hub) Pub(ctx context.Context) context.Context { | ||||
| 	return ctx | ||||
| func (h *Hub) Pub(queue string, msg []byte) bool { | ||||
| 	for _, s := range h.subscribers { | ||||
| 		if s.Queue == queue { | ||||
| 			select { | ||||
| 			case <-s.Done(): | ||||
| 				continue | ||||
| 			default: | ||||
| 			} | ||||
| 
 | ||||
| 			if ok := s.Send(msg); ok { | ||||
| 				return true | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	err := h.storage.Put(queue, msg) | ||||
| 
 | ||||
| 	return (err == nil) | ||||
| } | ||||
| 
 | ||||
| func (h *Hub) Sub(ctx context.Context) context.Context { | ||||
| 	return ctx | ||||
| func (h *Hub) Sub(s *Subscription) { | ||||
| 	if msg, ok := h.storage.Get(s.Queue); ok { | ||||
| 		s.Send(msg) | ||||
| 	} else { | ||||
| 		h.subscribers = append(h.subscribers, s) | ||||
| 	} | ||||
| } | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user