1
0
Fork 0

Return publishing success

This commit is contained in:
Gregory Eremin 2014-07-17 00:47:08 +07:00
parent 7e7fea7010
commit a84f1241a9
2 changed files with 22 additions and 9 deletions

View File

@ -12,12 +12,20 @@ var (
queues = make(map[string]*Queue) queues = make(map[string]*Queue)
) )
func (q *Queue) Push(msg Message) { func (q *Queue) Push(msg Message) bool {
p := Payload{ var err error
Queue: q,
Message: msg, q.Counter.Write(func(i uint) bool {
} key := NewKey(q.Name, i)
Persist(p) err = storage.Set(key, msg)
if err != nil {
Error(err, "Failed to write %d bytes to record '%s'", len(msg), key)
}
return (err == nil)
})
return (err == nil)
} }
func (q *Queue) TryFetch() (Message, bool) { func (q *Queue) TryFetch() (Message, bool) {

View File

@ -44,10 +44,15 @@ func PublishHandler(w http.ResponseWriter, r *http.Request) {
} }
queueName := r.FormValue("queue") queueName := r.FormValue("queue")
go Register(queueName, msg) ok := Register(queueName, msg)
Debug("Published message of %d bytes to queue %s", len(msg), queueName) if ok {
w.Write([]byte("OK")) Debug("Published message of %d bytes to queue %s", len(msg), queueName)
w.Write([]byte("OK"))
} else {
Debug("Failed to publish message of %d bytes to queue %s", len(msg), queueName)
http.Error(w, "FAIL", 500)
}
} }
func SubscriptionHandler(w http.ResponseWriter, r *http.Request) { func SubscriptionHandler(w http.ResponseWriter, r *http.Request) {