From c1d828b22232f44d38fb68dfc265846e3eec8491 Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Wed, 10 Sep 2014 16:52:03 +0400 Subject: [PATCH] Subscription --- hub/subscription.go | 43 +++++++++++++++++++++++++++++++++++++++++++ storage/storage.go | 20 ++++++++++++-------- 2 files changed, 55 insertions(+), 8 deletions(-) create mode 100644 hub/subscription.go diff --git a/hub/subscription.go b/hub/subscription.go new file mode 100644 index 0000000..bd77ad5 --- /dev/null +++ b/hub/subscription.go @@ -0,0 +1,43 @@ +package hub + +type ( + Subscription struct { + Queue string + result chan<- []byte + done chan struct{} + } +) + +func NewSubscription(queue string, result chan<- []byte) *Subscription { + return &Subscription{ + Queue: queue, + result: result, + done: make(chan struct{}), + } +} + +func (s *Subscription) Send(msg []byte) bool { + success := make(chan bool) + + go func() { + defer func() { + if err := recover(); err != nil { + success <- false + } + }() + + s.result <- msg + success <- true + }() + + return <-success +} + +func (s *Subscription) Done() <-chan struct{} { + return s.done +} + +func (s *Subscription) Close() { + close(s.result) + close(s.done) +} diff --git a/storage/storage.go b/storage/storage.go index 63ee07a..1adf629 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -35,25 +35,29 @@ func New(path string) (s *Storage, err error) { return } -func (s *Storage) Get(queue string, abort <-chan struct{}) (message []byte, err error) { +func (s *Storage) Get(queue string) (message []byte, ok bool) { if _, ok := s.counters[queue]; !ok { - s.counters[queue] = newCounter(0, 0) + return + } + if size := s.counters[queue].distance(); size == 0 { + return } var index uint select { case index = <-s.counters[queue].stream: - case <-abort: + default: return } key := makeKey(queue, index) - if message, err = s.kyoto.Get(key); err != nil { - return - } + ok = true - if err = s.kyoto.Remove(key); err != nil { - return + if message, err := s.kyoto.Get(key); err != nil { + panic(err) + } + if err := s.kyoto.Remove(key); err != nil { + panic(err) } return