1
0
Fork 0

Implement configurable scaling PoC

This commit is contained in:
Gregory Eremin 2015-10-27 03:33:04 +03:00
parent b0142de4a7
commit 4eae1c3b8d
2 changed files with 61 additions and 12 deletions

View File

@ -19,8 +19,8 @@ type Satan struct {
DaemonStats stats.Publisher
Logger *log.Logger
DefaultNumWorkers int
AutoScale bool
DefaultNumWorkers uint32
ScaleSettings *ScaleSettings
daemons []Daemon
queue chan *task
@ -52,6 +52,14 @@ type Publisher interface {
Close()
}
type ScaleSettings struct {
Interval time.Duration
MinProcessedTasks uint32
LatencyThreshold time.Duration
TaskWaitThreshold time.Duration
AdjustmentStep uint32
}
type task struct {
daemon Daemon
actor Actor
@ -69,7 +77,6 @@ func Summon() *Satan {
return &Satan{
Logger: log.New(os.Stdout, "[daemons] ", log.LstdFlags),
DefaultNumWorkers: 10,
AutoScale: false,
queue: make(chan *task),
runtimeStats: stats.NewBasicStats(),
shutdownWorkers: make(chan struct{}),
@ -94,6 +101,10 @@ func (s *Satan) AddDaemon(d Daemon) {
// StartDaemons starts all registered daemons.
func (s *Satan) StartDaemons() {
s.addWorkers(s.DefaultNumWorkers)
if s.ScaleSettings != nil {
go s.autoScale()
}
}
// StopDaemons stops all running daemons.
@ -112,14 +123,14 @@ func (s *Satan) StopDaemons() {
fmt.Println(s.runtimeStats.Fetch(stats.TaskWait))
}
func (s *Satan) addWorkers(num int) {
for i := 0; i < num; i++ {
func (s *Satan) addWorkers(num uint32) {
for i := uint32(0); i < num; i++ {
go s.runWorker()
}
}
func (s *Satan) stopWorkers(num int) {
for i := 0; i < num; i++ {
func (s *Satan) stopWorkers(num uint32) {
for i := uint32(0); i < num; i++ {
s.shutdownWorkers <- struct{}{}
}
}
@ -204,6 +215,40 @@ func (s *Satan) processGeneralTask(t *task) {
t.actor() // <--- ACTION STARTS HERE
}
func (s *Satan) autoScale() {
t := time.NewTicker(s.ScaleSettings.Interval)
defer t.Stop()
for {
select {
case <-t.C:
s.adjustNumWorkers()
case <-s.shutdownSystem:
return
}
}
}
func (s *Satan) adjustNumWorkers() {
lat := s.runtimeStats.Fetch(stats.Latency)
tw := s.runtimeStats.Fetch(stats.TaskWait)
if lat.Processed() < int64(s.ScaleSettings.MinProcessedTasks) {
return
}
if lat.P95() > float64(s.ScaleSettings.LatencyThreshold) {
s.addWorkers(s.ScaleSettings.AdjustmentStep)
s.runtimeStats.Reset()
return
}
if tw.P95() > float64(s.ScaleSettings.TaskWaitThreshold) {
s.stopWorkers(s.ScaleSettings.AdjustmentStep)
s.runtimeStats.Reset()
return
}
}
func (t *task) String() string {
if t.name == "" {
return fmt.Sprintf("[unnamed %s process]", t.daemon)

View File

@ -10,7 +10,8 @@ import (
type Manager interface {
Publisher
Fetcher
Fetch(name string) Stats
Reset()
}
type Publisher interface {
@ -18,10 +19,6 @@ type Publisher interface {
Error(name string)
}
type Fetcher interface {
Fetch(name string) Stats
}
type Stats interface {
Processed() int64
Errors() int64
@ -62,6 +59,13 @@ func (b *base) Fetch(name string) Stats {
return b.metrics(name)
}
func (b *base) Reset() {
for _, s := range b.stats {
s.time.Clear()
s.errors.Clear()
}
}
func (s *baseStats) Processed() int64 {
return s.time.Count()
}