Remove unused code
This commit is contained in:
parent
638a61e8ab
commit
6ed63b93ff
68
counter.go
68
counter.go
|
@ -1,68 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
maxIndex = ^uint(0)
|
||||
)
|
||||
|
||||
type (
|
||||
// Counter is responsible for operating queue read and write indexes
|
||||
counter struct {
|
||||
writeIndex uint // Number of the record last written to the queue
|
||||
readIndex uint // Number of the record last read from the queue
|
||||
// If WriteIndex is greater than ReadIndex then there are unread messages
|
||||
// If WriteIndex is less tham ReadIndex then MaxIndex was reached
|
||||
|
||||
read chan uint
|
||||
mutex sync.Mutex
|
||||
streaming *sync.Cond
|
||||
}
|
||||
)
|
||||
|
||||
func newCounter(wi, ri uint) *counter {
|
||||
m := &sync.Mutex{}
|
||||
m.Lock()
|
||||
|
||||
c := &counter{
|
||||
writeIndex: wi,
|
||||
readIndex: ri,
|
||||
read: make(chan uint),
|
||||
streaming: sync.NewCond(m),
|
||||
}
|
||||
|
||||
go c.stream()
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *counter) write(proc func(i uint) bool) {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
ok := proc(c.writeIndex + 1)
|
||||
if ok {
|
||||
c.writeIndex++
|
||||
c.streaming.Signal()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *counter) distance() uint {
|
||||
d := c.writeIndex - c.readIndex
|
||||
if d < 0 {
|
||||
d += maxIndex
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
func (c *counter) stream() {
|
||||
for {
|
||||
if c.distance() == 0 {
|
||||
c.streaming.Wait()
|
||||
}
|
||||
|
||||
c.read <- c.readIndex + 1
|
||||
c.readIndex++
|
||||
}
|
||||
}
|
75
queue.go
75
queue.go
|
@ -1,75 +0,0 @@
|
|||
package main
|
||||
|
||||
type (
|
||||
queue struct {
|
||||
name string
|
||||
counter *counter
|
||||
}
|
||||
)
|
||||
|
||||
var (
|
||||
queues = make(map[string]*queue)
|
||||
)
|
||||
|
||||
func (q *queue) push(msg message) bool {
|
||||
var err error
|
||||
|
||||
q.counter.write(func(i uint) bool {
|
||||
key := newKey(q.name, i)
|
||||
err = storage.Set(key, msg)
|
||||
if err != nil {
|
||||
alert(err, "Failed to write %d bytes to record '%s'", len(msg), key)
|
||||
}
|
||||
|
||||
return (err == nil)
|
||||
})
|
||||
|
||||
return (err == nil)
|
||||
}
|
||||
|
||||
func (q *queue) tryFetch(abort chan bool) (message, bool) {
|
||||
if q.counter.distance() > 0 {
|
||||
return q.fetch(abort)
|
||||
} else {
|
||||
return message{}, false
|
||||
}
|
||||
}
|
||||
|
||||
func (q *queue) fetch(abort chan bool) (message, bool) {
|
||||
var i uint
|
||||
|
||||
select {
|
||||
case i = <-q.counter.read:
|
||||
case <-abort:
|
||||
return message{}, false
|
||||
}
|
||||
|
||||
k := newKey(q.name, i)
|
||||
msg, err := storage.Get(k)
|
||||
if err != nil {
|
||||
alert(err, "Failed to read record '%s'", k)
|
||||
return msg, false
|
||||
}
|
||||
|
||||
err = storage.Remove(k)
|
||||
if err != nil {
|
||||
alert(err, "Failed to delete record '%s'", k)
|
||||
return msg, false
|
||||
}
|
||||
|
||||
return msg, true
|
||||
}
|
||||
|
||||
func getQueue(name string) *queue {
|
||||
if _, ok := queues[name]; !ok {
|
||||
registerQueue(name, 0, 0)
|
||||
}
|
||||
return queues[name]
|
||||
}
|
||||
|
||||
func registerQueue(name string, wi, ri uint) {
|
||||
queues[name] = &queue{
|
||||
name: name,
|
||||
counter: newCounter(wi, ri),
|
||||
}
|
||||
}
|
97
request.go
97
request.go
|
@ -1,97 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type (
|
||||
request struct {
|
||||
queues []string
|
||||
responseCh chan response
|
||||
abort chan bool
|
||||
dead bool
|
||||
}
|
||||
response struct {
|
||||
queue string
|
||||
message message
|
||||
}
|
||||
)
|
||||
|
||||
var (
|
||||
pool struct {
|
||||
requests []*request
|
||||
mutex sync.Mutex
|
||||
}
|
||||
)
|
||||
|
||||
func registerPublication(q string, msg message) bool {
|
||||
for _, r := range pool.requests {
|
||||
if r.dead {
|
||||
continue
|
||||
}
|
||||
for _, qname := range r.queues {
|
||||
if qname == q {
|
||||
rsp := response{queue: q, message: msg}
|
||||
ok := r.tryRespond(rsp)
|
||||
if ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ok := getQueue(q).push(msg)
|
||||
return ok
|
||||
}
|
||||
|
||||
func registerSubscription(r *request) {
|
||||
for _, qname := range r.queues {
|
||||
q := getQueue(qname)
|
||||
msg, ok := q.tryFetch(r.abort)
|
||||
if ok {
|
||||
rsp := response{queue: qname, message: msg}
|
||||
ok := r.tryRespond(rsp)
|
||||
if !ok {
|
||||
q.push(msg)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
pool.requests = append(pool.requests, r)
|
||||
}
|
||||
|
||||
func (r *request) tryRespond(rsp response) bool {
|
||||
okch := make(chan bool)
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
err := recover()
|
||||
if err != nil { // Panic!
|
||||
r.dead = true
|
||||
okch <- false
|
||||
}
|
||||
}()
|
||||
|
||||
r.responseCh <- rsp // If channel is already closed expect a panic
|
||||
okch <- true
|
||||
}()
|
||||
|
||||
ok := <-okch
|
||||
return ok
|
||||
}
|
||||
|
||||
func (r *request) purge() {
|
||||
pool.mutex.Lock()
|
||||
defer pool.mutex.Unlock()
|
||||
|
||||
r.dead = true
|
||||
deleted := 0
|
||||
for i, req := range pool.requests {
|
||||
if req.dead {
|
||||
pool.requests = append(pool.requests[:i-deleted], pool.requests[i-deleted+1:]...)
|
||||
deleted++
|
||||
}
|
||||
}
|
||||
}
|
69
state.go
69
state.go
|
@ -1,69 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
)
|
||||
|
||||
type (
|
||||
queueState map[string]uint
|
||||
serverState map[string]queueState
|
||||
)
|
||||
|
||||
const (
|
||||
stateMetaKey = "state"
|
||||
stateSaveInterval = 1 // seconds
|
||||
)
|
||||
|
||||
func saveState() {
|
||||
state := make(serverState)
|
||||
for _, q := range queues {
|
||||
state[q.name] = queueState{
|
||||
"wi": q.counter.writeIndex,
|
||||
"ri": q.counter.readIndex,
|
||||
}
|
||||
}
|
||||
|
||||
jsn, _ := json.Marshal(state)
|
||||
k := key(stateMetaKey)
|
||||
if err := storage.Set(k, jsn); err != nil {
|
||||
alert(err, "Failed to persist state")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func loadState() {
|
||||
state := make(serverState)
|
||||
k := key(stateMetaKey)
|
||||
|
||||
jsn, err := storage.Get(k)
|
||||
if err != nil {
|
||||
log("State not found")
|
||||
return
|
||||
}
|
||||
|
||||
err = json.Unmarshal(jsn, &state)
|
||||
if err != nil {
|
||||
log("Failed to load state")
|
||||
return
|
||||
}
|
||||
|
||||
for qname, meta := range state {
|
||||
registerQueue(qname, meta["wi"], meta["ri"])
|
||||
}
|
||||
|
||||
log("State successfully loaded")
|
||||
}
|
||||
|
||||
func keepStatePersisted() {
|
||||
t := time.NewTicker(stateSaveInterval * time.Second)
|
||||
|
||||
for {
|
||||
<-t.C
|
||||
saveState()
|
||||
err := storage.Sync(false)
|
||||
if err != nil {
|
||||
alert(err, "Failed to sync storage")
|
||||
}
|
||||
}
|
||||
}
|
49
storage.go
49
storage.go
|
@ -1,49 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"bitbucket.org/ww/cabinet"
|
||||
)
|
||||
|
||||
type (
|
||||
message []byte
|
||||
key []byte
|
||||
)
|
||||
|
||||
var (
|
||||
storage = cabinet.New()
|
||||
)
|
||||
|
||||
func newKey(queue string, index uint) key {
|
||||
istr := strconv.FormatUint(uint64(index), 10)
|
||||
k := strings.Join([]string{queue, istr}, "_")
|
||||
|
||||
return key(k)
|
||||
}
|
||||
|
||||
func setupStorage() {
|
||||
err := storage.Open(config.storage, cabinet.KCOWRITER|cabinet.KCOCREATE)
|
||||
if err != nil {
|
||||
alert(err, "Failed to open database '%s'", config.storage)
|
||||
}
|
||||
}
|
||||
|
||||
func closeStorage() {
|
||||
var err error
|
||||
|
||||
err = storage.Sync(true)
|
||||
if err != nil {
|
||||
alert(err, "Failed to sync storage (hard)")
|
||||
} else {
|
||||
log("Storage synchronized")
|
||||
}
|
||||
|
||||
err = storage.Close()
|
||||
if err != nil {
|
||||
alert(err, "Failed to close storage")
|
||||
} else {
|
||||
log("Storage closed")
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue