Adopt community codestyle

This commit is contained in:
Gregory Eremin 2014-09-09 12:20:40 +04:00
parent 56c1ec2300
commit 49acb0076f
9 changed files with 177 additions and 175 deletions

View File

@ -5,14 +5,14 @@ import (
) )
var ( var (
Config struct { config struct {
Storage string storage string
Port int port int
} }
) )
func SetupConfig() { func setupConfig() {
flag.StringVar(&Config.Storage, "storage", "-", "Kyoto Cabinet storage path (e.g. burlesque.kch#dfunit=8#msiz=512M)") flag.StringVar(&config.storage, "storage", "-", "Kyoto Cabinet storage path (e.g. burlesque.kch#dfunit=8#msiz=512M)")
flag.IntVar(&Config.Port, "port", 4401, "Server HTTP port") flag.IntVar(&config.port, "port", 4401, "Server HTTP port")
flag.Parse() flag.Parse()
} }

View File

@ -5,64 +5,64 @@ import (
) )
const ( const (
MaxIndex = ^uint(0) maxIndex = ^uint(0)
) )
type ( type (
// Counter is responsible for operating queue read and write indexes // Counter is responsible for operating queue read and write indexes
Counter struct { counter struct {
WriteIndex uint // Number of the record last written to the queue writeIndex uint // Number of the record last written to the queue
ReadIndex uint // Number of the record last read from the queue readIndex uint // Number of the record last read from the queue
// If WriteIndex is greater than ReadIndex then there are unread messages // If WriteIndex is greater than ReadIndex then there are unread messages
// If WriteIndex is less tham ReadIndex then MaxIndex was reached // If WriteIndex is less tham ReadIndex then MaxIndex was reached
Read chan uint read chan uint
mutex sync.Mutex mutex sync.Mutex
streaming *sync.Cond streaming *sync.Cond
} }
) )
func NewCounter(wi, ri uint) *Counter { func newCounter(wi, ri uint) *counter {
m := &sync.Mutex{} m := &sync.Mutex{}
m.Lock() m.Lock()
c := &Counter{ c := &counter{
WriteIndex: wi, writeIndex: wi,
ReadIndex: ri, readIndex: ri,
Read: make(chan uint), read: make(chan uint),
streaming: sync.NewCond(m), streaming: sync.NewCond(m),
} }
go c.Stream() go c.stream()
return c return c
} }
func (c *Counter) Write(proc func(i uint) bool) { func (c *counter) write(proc func(i uint) bool) {
c.mutex.Lock() c.mutex.Lock()
defer c.mutex.Unlock() defer c.mutex.Unlock()
ok := proc(c.WriteIndex + 1) ok := proc(c.writeIndex + 1)
if ok { if ok {
c.WriteIndex++ c.writeIndex++
c.streaming.Signal() c.streaming.Signal()
} }
} }
func (c *Counter) Distance() uint { func (c *counter) distance() uint {
d := c.WriteIndex - c.ReadIndex d := c.writeIndex - c.readIndex
if d < 0 { if d < 0 {
d += MaxIndex d += maxIndex
} }
return d return d
} }
func (c *Counter) Stream() { func (c *counter) stream() {
for { for {
if c.Distance() == 0 { if c.distance() == 0 {
c.streaming.Wait() c.streaming.Wait()
} }
c.Read <- c.ReadIndex + 1 c.read <- c.readIndex + 1
c.ReadIndex++ c.readIndex++
} }
} }

View File

@ -1,29 +1,29 @@
package main package main
import ( import (
"log" loglib "log"
"os" "os"
"runtime" "runtime"
) )
var ( var (
logger *log.Logger logger *loglib.Logger
) )
func SetupLogging() { func setupLogging() {
logger = log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds) logger = loglib.New(os.Stdout, "", loglib.Ldate|loglib.Lmicroseconds)
Log("Burlesque v%s started", Version) log("Burlesque v%s started", version)
Log("GOMAXPROCS is set to %d", runtime.GOMAXPROCS(-1)) log("GOMAXPROCS is set to %d", runtime.GOMAXPROCS(-1))
Log("Storage path: %s", Config.Storage) log("Storage path: %s", config.storage)
Log("Server is running at http://127.0.0.1:%d", Config.Port) log("Server is running at http://127.0.0.1:%d", config.port)
} }
func Log(format string, args ...interface{}) { func log(format string, args ...interface{}) {
logger.Printf("[INFO] "+format, args...) logger.Printf("[INFO] "+format, args...)
} }
func Error(err error, format string, args ...interface{}) { func alert(err error, format string, args ...interface{}) {
logger.Printf("[ERROR] "+format, args...) logger.Printf("[ERROR] "+format, args...)
logger.Printf(" %s", err.Error()) logger.Printf(" %s", err.Error())
} }

28
main.go
View File

@ -7,33 +7,33 @@ import (
) )
const ( const (
Version = "0.1.3" version = "0.1.3"
) )
func HandleShutdown() { func handleShutdown() {
ch := make(chan os.Signal) ch := make(chan os.Signal)
signal.Notify(ch, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT) signal.Notify(ch, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT)
go func() { go func() {
<-ch <-ch
SaveState() saveState()
Log("State successfully persisted") log("State successfully persisted")
CloseStorage() closeStorage()
Log("Stopped") log("Stopped")
os.Exit(0) os.Exit(0)
}() }()
} }
func main() { func main() {
SetupConfig() setupConfig()
SetupLogging() setupLogging()
SetupStorage() setupStorage()
SetupServer() setupServer()
HandleShutdown() handleShutdown()
LoadState() loadState()
go KeepStatePersisted() go keepStatePersisted()
StartServer() startServer()
} }

View File

@ -1,24 +1,24 @@
package main package main
type ( type (
Queue struct { queue struct {
Name string name string
Counter *Counter counter *counter
} }
) )
var ( var (
queues = make(map[string]*Queue) queues = make(map[string]*queue)
) )
func (q *Queue) Push(msg Message) bool { func (q *queue) push(msg message) bool {
var err error var err error
q.Counter.Write(func(i uint) bool { q.counter.write(func(i uint) bool {
key := NewKey(q.Name, i) key := newKey(q.name, i)
err = storage.Set(key, msg) err = storage.Set(key, msg)
if err != nil { if err != nil {
Error(err, "Failed to write %d bytes to record '%s'", len(msg), key) alert(err, "Failed to write %d bytes to record '%s'", len(msg), key)
} }
return (err == nil) return (err == nil)
@ -27,49 +27,49 @@ func (q *Queue) Push(msg Message) bool {
return (err == nil) return (err == nil)
} }
func (q *Queue) TryFetch(abort chan bool) (Message, bool) { func (q *queue) tryFetch(abort chan bool) (message, bool) {
if q.Counter.Distance() > 0 { if q.counter.distance() > 0 {
return q.Fetch(abort) return q.fetch(abort)
} else { } else {
return Message{}, false return message{}, false
} }
} }
func (q *Queue) Fetch(abort chan bool) (Message, bool) { func (q *queue) fetch(abort chan bool) (message, bool) {
var i uint var i uint
select { select {
case i = <-q.Counter.Read: case i = <-q.counter.read:
case <-abort: case <-abort:
return Message{}, false return message{}, false
} }
key := NewKey(q.Name, i) k := newKey(q.name, i)
msg, err := storage.Get(key) msg, err := storage.Get(k)
if err != nil { if err != nil {
Error(err, "Failed to read record '%s'", key) alert(err, "Failed to read record '%s'", k)
return msg, false return msg, false
} }
err = storage.Remove(key) err = storage.Remove(k)
if err != nil { if err != nil {
Error(err, "Failed to delete record '%s'", key) alert(err, "Failed to delete record '%s'", k)
return msg, false return msg, false
} }
return msg, true return msg, true
} }
func GetQueue(name string) *Queue { func getQueue(name string) *queue {
if _, ok := queues[name]; !ok { if _, ok := queues[name]; !ok {
RegisterQueue(name, 0, 0) registerQueue(name, 0, 0)
} }
return queues[name] return queues[name]
} }
func RegisterQueue(name string, wi, ri uint) { func registerQueue(name string, wi, ri uint) {
queues[name] = &Queue{ queues[name] = &queue{
Name: name, name: name,
Counter: NewCounter(wi, ri), counter: newCounter(wi, ri),
} }
} }

View File

@ -5,34 +5,34 @@ import (
) )
type ( type (
Request struct { request struct {
Queues []string queues []string
ResponseCh chan Response responseCh chan response
Abort chan bool abort chan bool
Dead bool dead bool
} }
Response struct { response struct {
Queue string queue string
Message Message message message
} }
) )
var ( var (
pool struct { pool struct {
Requests []*Request requests []*request
mutex sync.Mutex mutex sync.Mutex
} }
) )
func RegisterPublication(q string, msg Message) bool { func registerPublication(q string, msg message) bool {
for _, r := range pool.Requests { for _, r := range pool.requests {
if r.Dead { if r.dead {
continue continue
} }
for _, qname := range r.Queues { for _, qname := range r.queues {
if qname == q { if qname == q {
rsp := Response{Queue: q, Message: msg} rsp := response{queue: q, message: msg}
ok := r.TryRespond(rsp) ok := r.tryRespond(rsp)
if ok { if ok {
return true return true
} }
@ -40,41 +40,41 @@ func RegisterPublication(q string, msg Message) bool {
} }
} }
ok := GetQueue(q).Push(msg) ok := getQueue(q).push(msg)
return ok return ok
} }
func RegisterSubscription(r *Request) { func registerSubscription(r *request) {
for _, qname := range r.Queues { for _, qname := range r.queues {
q := GetQueue(qname) q := getQueue(qname)
msg, ok := q.TryFetch(r.Abort) msg, ok := q.tryFetch(r.abort)
if ok { if ok {
rsp := Response{Queue: qname, Message: msg} rsp := response{queue: qname, message: msg}
ok := r.TryRespond(rsp) ok := r.tryRespond(rsp)
if !ok { if !ok {
q.Push(msg) q.push(msg)
} }
return return
} }
} }
pool.Requests = append(pool.Requests, r) pool.requests = append(pool.requests, r)
} }
func (r *Request) TryRespond(rsp Response) bool { func (r *request) tryRespond(rsp response) bool {
okch := make(chan bool) okch := make(chan bool)
go func() { go func() {
defer func() { defer func() {
err := recover() err := recover()
if err != nil { // Panic! if err != nil { // Panic!
r.Dead = true r.dead = true
okch <- false okch <- false
} }
}() }()
r.ResponseCh <- rsp // If channel is already closed expect a panic r.responseCh <- rsp // If channel is already closed expect a panic
okch <- true okch <- true
}() }()
@ -82,15 +82,15 @@ func (r *Request) TryRespond(rsp Response) bool {
return ok return ok
} }
func (r *Request) Purge() { func (r *request) purge() {
pool.mutex.Lock() pool.mutex.Lock()
defer pool.mutex.Unlock() defer pool.mutex.Unlock()
r.Dead = true r.dead = true
deleted := 0 deleted := 0
for i, req := range pool.Requests { for i, req := range pool.requests {
if req.Dead { if req.dead {
pool.Requests = append(pool.Requests[:i-deleted], pool.Requests[i-deleted+1:]...) pool.requests = append(pool.requests[:i-deleted], pool.requests[i-deleted+1:]...)
deleted++ deleted++
} }
} }

View File

@ -10,26 +10,26 @@ import (
"strings" "strings"
) )
func StartServer() { func startServer() {
port := fmt.Sprintf(":%d", Config.Port) port := fmt.Sprintf(":%d", config.port)
err := http.ListenAndServe(port, nil) err := http.ListenAndServe(port, nil)
if err != nil { if err != nil {
Error(err, "Error starting server on port %d", Config.Port) alert(err, "Error starting server on port %d", config.port)
} }
} }
func StatusHandler(w http.ResponseWriter, r *http.Request) { func statusHandler(w http.ResponseWriter, r *http.Request) {
info := make(map[string]map[string]uint) info := make(map[string]map[string]uint)
for _, q := range queues { for _, q := range queues {
info[q.Name] = map[string]uint{ info[q.name] = map[string]uint{
"messages": q.Counter.Distance(), "messages": q.counter.distance(),
"subscriptions": 0, "subscriptions": 0,
} }
} }
for _, r := range pool.Requests { for _, r := range pool.requests {
for _, q := range r.Queues { for _, q := range r.queues {
info[q]["subscriptions"]++ info[q]["subscriptions"]++
} }
} }
@ -38,14 +38,14 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) {
w.Write(jsn) w.Write(jsn)
} }
func DebugHandler(w http.ResponseWriter, r *http.Request) { func debugHandler(w http.ResponseWriter, r *http.Request) {
info := make(map[string]interface{}) info := make(map[string]interface{})
info["version"] = Version info["version"] = version
info["goroutines"] = runtime.NumGoroutine() info["goroutines"] = runtime.NumGoroutine()
s, err := storage.Status() s, err := storage.Status()
if err != nil { if err != nil {
Error(err, "Failed to get Kyoto Cabinet status") alert(err, "Failed to get Kyoto Cabinet status")
} }
s = s[:len(s)-1] // Removing trailing new line s = s[:len(s)-1] // Removing trailing new line
@ -66,15 +66,15 @@ func DebugHandler(w http.ResponseWriter, r *http.Request) {
w.Write(jsn) w.Write(jsn)
} }
func PublishHandler(w http.ResponseWriter, r *http.Request) { func publishHandler(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close() defer r.Body.Close()
msg, _ := ioutil.ReadAll(r.Body) msg, _ := ioutil.ReadAll(r.Body)
if len(msg) == 0 { if len(msg) == 0 {
msg = Message(r.FormValue("msg")) msg = message(r.FormValue("msg"))
} }
qname := r.FormValue("queue") qname := r.FormValue("queue")
ok := RegisterPublication(qname, msg) ok := registerPublication(qname, msg)
if ok { if ok {
w.Write([]byte("OK")) w.Write([]byte("OK"))
@ -83,15 +83,15 @@ func PublishHandler(w http.ResponseWriter, r *http.Request) {
} }
} }
func SubscriptionHandler(w http.ResponseWriter, r *http.Request) { func subscriptionHandler(w http.ResponseWriter, r *http.Request) {
rch := make(chan Response) rch := make(chan response)
abort := make(chan bool, 1) abort := make(chan bool, 1)
req := &Request{ req := &request{
Queues: strings.Split(r.FormValue("queues"), ","), queues: strings.Split(r.FormValue("queues"), ","),
ResponseCh: rch, responseCh: rch,
Abort: abort, abort: abort,
} }
go RegisterSubscription(req) go registerSubscription(req)
disconnected := w.(http.CloseNotifier).CloseNotify() disconnected := w.(http.CloseNotifier).CloseNotify()
finished := make(chan bool) finished := make(chan bool)
@ -102,7 +102,7 @@ func SubscriptionHandler(w http.ResponseWriter, r *http.Request) {
abort <- true abort <- true
case <-finished: case <-finished:
} }
req.Purge() req.purge()
}() }()
res, ok := <-rch res, ok := <-rch
@ -110,15 +110,15 @@ func SubscriptionHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
w.Header().Set("Queue", res.Queue) w.Header().Set("Queue", res.queue)
w.Write(res.Message) w.Write(res.message)
finished <- true finished <- true
} }
func SetupServer() { func setupServer() {
http.HandleFunc("/status", StatusHandler) http.HandleFunc("/status", statusHandler)
http.HandleFunc("/debug", DebugHandler) http.HandleFunc("/debug", debugHandler)
http.HandleFunc("/publish", PublishHandler) http.HandleFunc("/publish", publishHandler)
http.HandleFunc("/subscribe", SubscriptionHandler) http.HandleFunc("/subscribe", subscriptionHandler)
} }

View File

@ -6,63 +6,64 @@ import (
) )
type ( type (
QueueState map[string]uint queueState map[string]uint
ServerState map[string]QueueState serverState map[string]queueState
) )
const ( const (
StateMetaKey = "state" stateMetaKey = "state"
StateSaveInterval = 1 // seconds stateSaveInterval = 1 // seconds
) )
func SaveState() { func saveState() {
state := make(ServerState) state := make(serverState)
for _, q := range queues { for _, q := range queues {
state[q.Name] = QueueState{ state[q.name] = queueState{
"wi": q.Counter.WriteIndex, "wi": q.counter.writeIndex,
"ri": q.Counter.ReadIndex, "ri": q.counter.readIndex,
} }
} }
jsn, _ := json.Marshal(state) jsn, _ := json.Marshal(state)
key := Key(StateMetaKey) k := key(stateMetaKey)
if err := storage.Set(key, jsn); err != nil { if err := storage.Set(k, jsn); err != nil {
Error(err, "Failed to persist state") alert(err, "Failed to persist state")
return return
} }
} }
func LoadState() { func loadState() {
state := make(ServerState) state := make(serverState)
key := Key(StateMetaKey) k := key(stateMetaKey)
jsn, err := storage.Get(key) jsn, err := storage.Get(k)
if err != nil { if err != nil {
Log("State not found") log("State not found")
return return
} }
err = json.Unmarshal(jsn, &state) err = json.Unmarshal(jsn, &state)
if err != nil { if err != nil {
Log("Failed to load state") log("Failed to load state")
return return
} }
for qname, meta := range state { for qname, meta := range state {
RegisterQueue(qname, meta["wi"], meta["ri"]) registerQueue(qname, meta["wi"], meta["ri"])
} }
Log("State successfully loaded") log("State successfully loaded")
} }
func KeepStatePersisted() { func keepStatePersisted() {
t := time.NewTicker(StateSaveInterval * time.Second) t := time.NewTicker(stateSaveInterval * time.Second)
for { for {
<-t.C <-t.C
SaveState() saveState()
err := storage.Sync(false) err := storage.Sync(false)
if err != nil { if err != nil {
Error(err, "Failed to sync storage") alert(err, "Failed to sync storage")
} }
} }
} }

View File

@ -8,41 +8,42 @@ import (
) )
type ( type (
Message []byte message []byte
Key []byte key []byte
) )
var ( var (
storage = cabinet.New() storage = cabinet.New()
) )
func NewKey(queue string, index uint) Key { func newKey(queue string, index uint) key {
istr := strconv.FormatUint(uint64(index), 10) istr := strconv.FormatUint(uint64(index), 10)
key := strings.Join([]string{queue, istr}, "_") k := strings.Join([]string{queue, istr}, "_")
return Key(key)
return key(k)
} }
func SetupStorage() { func setupStorage() {
err := storage.Open(Config.Storage, cabinet.KCOWRITER|cabinet.KCOCREATE) err := storage.Open(config.storage, cabinet.KCOWRITER|cabinet.KCOCREATE)
if err != nil { if err != nil {
Error(err, "Failed to open database '%s'", Config.Storage) alert(err, "Failed to open database '%s'", config.storage)
} }
} }
func CloseStorage() { func closeStorage() {
var err error var err error
err = storage.Sync(true) err = storage.Sync(true)
if err != nil { if err != nil {
Error(err, "Failed to sync storage (hard)") alert(err, "Failed to sync storage (hard)")
} else { } else {
Log("Storage synchronized") log("Storage synchronized")
} }
err = storage.Close() err = storage.Close()
if err != nil { if err != nil {
Error(err, "Failed to close storage") alert(err, "Failed to close storage")
} else { } else {
Log("Storage closed") log("Storage closed")
} }
} }