Subscription
This commit is contained in:
parent
601a80fb8e
commit
c1d828b222
|
@ -0,0 +1,43 @@
|
||||||
|
package hub
|
||||||
|
|
||||||
|
type (
|
||||||
|
Subscription struct {
|
||||||
|
Queue string
|
||||||
|
result chan<- []byte
|
||||||
|
done chan struct{}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewSubscription(queue string, result chan<- []byte) *Subscription {
|
||||||
|
return &Subscription{
|
||||||
|
Queue: queue,
|
||||||
|
result: result,
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Subscription) Send(msg []byte) bool {
|
||||||
|
success := make(chan bool)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
if err := recover(); err != nil {
|
||||||
|
success <- false
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
s.result <- msg
|
||||||
|
success <- true
|
||||||
|
}()
|
||||||
|
|
||||||
|
return <-success
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Subscription) Done() <-chan struct{} {
|
||||||
|
return s.done
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Subscription) Close() {
|
||||||
|
close(s.result)
|
||||||
|
close(s.done)
|
||||||
|
}
|
|
@ -35,25 +35,29 @@ func New(path string) (s *Storage, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Storage) Get(queue string, abort <-chan struct{}) (message []byte, err error) {
|
func (s *Storage) Get(queue string) (message []byte, ok bool) {
|
||||||
if _, ok := s.counters[queue]; !ok {
|
if _, ok := s.counters[queue]; !ok {
|
||||||
s.counters[queue] = newCounter(0, 0)
|
return
|
||||||
|
}
|
||||||
|
if size := s.counters[queue].distance(); size == 0 {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var index uint
|
var index uint
|
||||||
select {
|
select {
|
||||||
case index = <-s.counters[queue].stream:
|
case index = <-s.counters[queue].stream:
|
||||||
case <-abort:
|
default:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
key := makeKey(queue, index)
|
key := makeKey(queue, index)
|
||||||
if message, err = s.kyoto.Get(key); err != nil {
|
ok = true
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = s.kyoto.Remove(key); err != nil {
|
if message, err := s.kyoto.Get(key); err != nil {
|
||||||
return
|
panic(err)
|
||||||
|
}
|
||||||
|
if err := s.kyoto.Remove(key); err != nil {
|
||||||
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in New Issue