Implement processing rate limits
This commit is contained in:
parent
a9a50522fb
commit
0f4bed32c8
16
daemon.go
16
daemon.go
@ -8,6 +8,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/juju/ratelimit"
|
||||||
"github.com/localhots/satan/caller"
|
"github.com/localhots/satan/caller"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -53,6 +54,7 @@ type BaseDaemon struct {
|
|||||||
queue chan<- *task
|
queue chan<- *task
|
||||||
panicHandler func()
|
panicHandler func()
|
||||||
shutdown chan struct{}
|
shutdown chan struct{}
|
||||||
|
limit *ratelimit.Bucket
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -62,6 +64,9 @@ var (
|
|||||||
|
|
||||||
// Process creates a task and then adds it to processing queue.
|
// Process creates a task and then adds it to processing queue.
|
||||||
func (d *BaseDaemon) Process(a Actor) {
|
func (d *BaseDaemon) Process(a Actor) {
|
||||||
|
if d.limit != nil {
|
||||||
|
d.limit.Wait(1)
|
||||||
|
}
|
||||||
d.queue <- &task{
|
d.queue <- &task{
|
||||||
daemon: d.self,
|
daemon: d.self,
|
||||||
actor: a,
|
actor: a,
|
||||||
@ -117,6 +122,17 @@ func (d *BaseDaemon) Publish(msg []byte) {
|
|||||||
d.publisher.Publish(msg)
|
d.publisher.Publish(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LimitRate limits the daemons processing rate.
|
||||||
|
func (d *BaseDaemon) LimitRate(times int, per time.Duration) {
|
||||||
|
rate := float64(time.Second) / float64(per) * float64(times)
|
||||||
|
if rate < 0 {
|
||||||
|
log.Println("Daemon %s processing rate was limited to %d. Using 1 instead", d.base(), rate)
|
||||||
|
rate = 1.0
|
||||||
|
}
|
||||||
|
log.Printf("Daemon %s processing rate is limited to %.2f ops/s", d.base(), rate)
|
||||||
|
d.limit = ratelimit.NewBucketWithRate(rate, 1)
|
||||||
|
}
|
||||||
|
|
||||||
// HandlePanics sets up a panic handler function for the daemon.
|
// HandlePanics sets up a panic handler function for the daemon.
|
||||||
func (d *BaseDaemon) HandlePanics(f func()) {
|
func (d *BaseDaemon) HandlePanics(f func()) {
|
||||||
d.panicHandler = f
|
d.panicHandler = f
|
||||||
|
@ -20,6 +20,7 @@ func (n *NumberPrinter) Startup() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
n.SystemProcess("Random Number Generator", n.generateNumbers)
|
n.SystemProcess("Random Number Generator", n.generateNumbers)
|
||||||
|
n.LimitRate(1, 2*time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown is empty due to the lack of cleanup.
|
// Shutdown is empty due to the lack of cleanup.
|
||||||
@ -33,9 +34,6 @@ func (n *NumberPrinter) generateNumbers() {
|
|||||||
// Generate a random number between 1000 and 9999 and print it
|
// Generate a random number between 1000 and 9999 and print it
|
||||||
num := 1000 + rand.Intn(9000)
|
num := 1000 + rand.Intn(9000)
|
||||||
n.Process(n.makeActor(num))
|
n.Process(n.makeActor(num))
|
||||||
|
|
||||||
// Sleep for a second or less
|
|
||||||
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,6 +22,7 @@ func (p *PriceConsumer) Startup() {
|
|||||||
b.Subscribe("ProductPriceUpdates", func(u PriceUpdate) {
|
b.Subscribe("ProductPriceUpdates", func(u PriceUpdate) {
|
||||||
log.Printf("Price for %q is now $%.2f", u.Product, u.Amount)
|
log.Printf("Price for %q is now $%.2f", u.Product, u.Amount)
|
||||||
})
|
})
|
||||||
|
p.LimitRate(1, 500*time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown is empty because PriceConsumer requires no cleanup upon exiting.
|
// Shutdown is empty because PriceConsumer requires no cleanup upon exiting.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user