From a84f1241a90b597c697a629e41e6151d90c91865 Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Thu, 17 Jul 2014 00:47:08 +0700 Subject: [PATCH] Return publishing success --- queue.go | 20 ++++++++++++++------ server.go | 11 ++++++++--- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/queue.go b/queue.go index bbbd6ea..0eb4a87 100644 --- a/queue.go +++ b/queue.go @@ -12,12 +12,20 @@ var ( queues = make(map[string]*Queue) ) -func (q *Queue) Push(msg Message) { - p := Payload{ - Queue: q, - Message: msg, - } - Persist(p) +func (q *Queue) Push(msg Message) bool { + var err error + + q.Counter.Write(func(i uint) bool { + key := NewKey(q.Name, i) + err = storage.Set(key, msg) + if err != nil { + Error(err, "Failed to write %d bytes to record '%s'", len(msg), key) + } + + return (err == nil) + }) + + return (err == nil) } func (q *Queue) TryFetch() (Message, bool) { diff --git a/server.go b/server.go index bd788b3..9d0d45a 100644 --- a/server.go +++ b/server.go @@ -44,10 +44,15 @@ func PublishHandler(w http.ResponseWriter, r *http.Request) { } queueName := r.FormValue("queue") - go Register(queueName, msg) + ok := Register(queueName, msg) - Debug("Published message of %d bytes to queue %s", len(msg), queueName) - w.Write([]byte("OK")) + if ok { + Debug("Published message of %d bytes to queue %s", len(msg), queueName) + w.Write([]byte("OK")) + } else { + Debug("Failed to publish message of %d bytes to queue %s", len(msg), queueName) + http.Error(w, "FAIL", 500) + } } func SubscriptionHandler(w http.ResponseWriter, r *http.Request) {