Use logging macros
This commit is contained in:
parent
51bc5b04e1
commit
313677aa48
34
main.go
34
main.go
|
@ -2,31 +2,13 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/stvp/rollbar"
|
"github.com/stvp/rollbar"
|
||||||
logpkg "log"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"syscall"
|
"syscall"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
|
||||||
Message []byte
|
|
||||||
Key []byte
|
|
||||||
)
|
|
||||||
|
|
||||||
func NewKey(queue string, index uint) Key {
|
|
||||||
istr := strconv.FormatUint(uint64(index), 10)
|
|
||||||
key := strings.Join([]string{queue, istr}, "_")
|
|
||||||
return Key(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
log *logpkg.Logger
|
|
||||||
)
|
|
||||||
|
|
||||||
func HandleShutdown() {
|
func HandleShutdown() {
|
||||||
ch := make(chan os.Signal)
|
ch := make(chan os.Signal)
|
||||||
signal.Notify(ch, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT)
|
signal.Notify(ch, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT)
|
||||||
|
@ -34,21 +16,17 @@ func HandleShutdown() {
|
||||||
go func() {
|
go func() {
|
||||||
<-ch
|
<-ch
|
||||||
SaveState()
|
SaveState()
|
||||||
log.Printf("State successfully persisted")
|
Log("State successfully persisted")
|
||||||
storage.Close()
|
storage.Close()
|
||||||
rollbar.Wait()
|
rollbar.Wait()
|
||||||
log.Println("Storage closed")
|
Log("Storage closed")
|
||||||
log.Printf("Server stopped")
|
Log("Server stopped")
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
log = logpkg.New(os.Stdout, "", logpkg.Ldate|logpkg.Lmicroseconds)
|
SetupLogging()
|
||||||
|
|
||||||
rollbar.Token = "***REMOVED***" // klit access token
|
|
||||||
rollbar.Environment = cfg.Env
|
|
||||||
|
|
||||||
SetupConfig()
|
SetupConfig()
|
||||||
SetupStorage()
|
SetupStorage()
|
||||||
SetupServer()
|
SetupServer()
|
||||||
|
@ -57,8 +35,8 @@ func main() {
|
||||||
go KeepStatePersisted()
|
go KeepStatePersisted()
|
||||||
go PersistMessages()
|
go PersistMessages()
|
||||||
|
|
||||||
log.Printf("GOMAXPROCS = %d", runtime.GOMAXPROCS(-1))
|
Log("GOMAXPROCS = %d", runtime.GOMAXPROCS(-1))
|
||||||
log.Printf("Starting HTTP server on port %d", cfg.Port)
|
Log("Starting HTTP server on port %d", cfg.Port)
|
||||||
|
|
||||||
http.ListenAndServe(cfg.PortString(), nil)
|
http.ListenAndServe(cfg.PortString(), nil)
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,12 +35,9 @@ func Register(q string, msg Message) {
|
||||||
func Process(r *Request) {
|
func Process(r *Request) {
|
||||||
for _, queueName := range r.Queues {
|
for _, queueName := range r.Queues {
|
||||||
q := GetQueue(queueName)
|
q := GetQueue(queueName)
|
||||||
if q.Counter.Distance() > 0 {
|
msg, ok := q.TryFetch()
|
||||||
if msg, err := q.Fetch(); err != nil {
|
if ok {
|
||||||
go r.Callback(nil)
|
go r.Callback(&Response{Queue: queueName, Message: msg})
|
||||||
} else {
|
|
||||||
go r.Callback(&Response{Queue: queueName, Message: msg})
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,7 +46,7 @@ func PublishHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
queueName := r.FormValue("queue")
|
queueName := r.FormValue("queue")
|
||||||
go Register(queueName, msg)
|
go Register(queueName, msg)
|
||||||
|
|
||||||
log.Println("Published message of", len(msg), "bytes to queue", queueName)
|
Log("Published message of %d bytes to queue %s", len(msg), queueName)
|
||||||
w.Write([]byte("OK"))
|
w.Write([]byte("OK"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,7 +81,7 @@ func SubscriptionHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set("Queue", res.Queue)
|
w.Header().Set("Queue", res.Queue)
|
||||||
w.Write(res.Message)
|
w.Write(res.Message)
|
||||||
|
|
||||||
log.Println("Recieved message of", len(res.Message), "bytes from queue", res.Queue)
|
Log("Recieved message of %d bytes from queue %s", len(res.Message), res.Queue)
|
||||||
finished <- true
|
finished <- true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
11
state.go
11
state.go
|
@ -2,7 +2,6 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"github.com/stvp/rollbar"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -28,8 +27,7 @@ func SaveState() {
|
||||||
stateJson, _ := json.Marshal(state)
|
stateJson, _ := json.Marshal(state)
|
||||||
key := Key(StateMetaKey)
|
key := Key(StateMetaKey)
|
||||||
if err := storage.Set(key, stateJson); err != nil {
|
if err := storage.Set(key, stateJson); err != nil {
|
||||||
rollbar.Error("error", err)
|
Error(err, "Failed to persist state")
|
||||||
log.Printf("Failed to persist state")
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -40,13 +38,12 @@ func LoadState() {
|
||||||
|
|
||||||
stateJson, err := storage.Get(key)
|
stateJson, err := storage.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("State not found")
|
Log("State not found")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := json.Unmarshal(stateJson, &state); err != nil {
|
if err := json.Unmarshal(stateJson, &state); err != nil {
|
||||||
rollbar.Error("error", err)
|
Log("Failed to load state")
|
||||||
log.Printf("Failed to load state")
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,7 +51,7 @@ func LoadState() {
|
||||||
RegisterQueue(queueName, meta["wi"], meta["ri"])
|
RegisterQueue(queueName, meta["wi"], meta["ri"])
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("State successfully loaded")
|
Log("State successfully loaded")
|
||||||
}
|
}
|
||||||
|
|
||||||
func KeepStatePersisted() {
|
func KeepStatePersisted() {
|
||||||
|
|
13
storage.go
13
storage.go
|
@ -2,10 +2,13 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/ezotrank/cabinetgo"
|
"github.com/ezotrank/cabinetgo"
|
||||||
"github.com/stvp/rollbar"
|
"strconv"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
Message []byte
|
||||||
|
Key []byte
|
||||||
Payload struct {
|
Payload struct {
|
||||||
Queue *Queue
|
Queue *Queue
|
||||||
Message Message
|
Message Message
|
||||||
|
@ -17,10 +20,16 @@ var (
|
||||||
saver = make(chan Payload, 1000)
|
saver = make(chan Payload, 1000)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func NewKey(queue string, index uint) Key {
|
||||||
|
istr := strconv.FormatUint(uint64(index), 10)
|
||||||
|
key := strings.Join([]string{queue, istr}, "_")
|
||||||
|
return Key(key)
|
||||||
|
}
|
||||||
|
|
||||||
func SetupStorage() {
|
func SetupStorage() {
|
||||||
err := storage.Open(cfg.Storage, cabinet.KCOWRITER|cabinet.KCOCREATE)
|
err := storage.Open(cfg.Storage, cabinet.KCOWRITER|cabinet.KCOCREATE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
Error(err, "Failed to open database '%s'", cfg.Storage)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue