1
0
Fork 0

There's no need to balance the number of workers

Goroutines are too cheap to bother
This commit is contained in:
Gregory Eremin 2015-10-27 03:52:54 +03:00
parent 850f0033a5
commit e21ffb6eb4
1 changed files with 8 additions and 74 deletions

View File

@ -18,11 +18,7 @@ type Satan struct {
Publisher Publisher
DaemonStats stats.Publisher
Logger *log.Logger
MinNumWorkers uint32
MaxNumWorkers uint32
numWorkers int64
ScalePlan *ScalePlan
NumWorkers int
daemons []Daemon
queue chan *task
@ -54,14 +50,6 @@ type Publisher interface {
Close()
}
type ScalePlan struct {
Interval time.Duration
MinProcessedTasks uint32
LatencyThreshold time.Duration
TaskWaitThreshold time.Duration
AdjustmentStep uint32
}
type task struct {
daemon Daemon
actor Actor
@ -70,6 +58,10 @@ type task struct {
name string
}
const (
DefaultNumWorkers = 10
)
var (
workerIndex uint64
)
@ -78,8 +70,7 @@ var (
func Summon() *Satan {
return &Satan{
Logger: log.New(os.Stdout, "[daemons] ", log.LstdFlags),
MinNumWorkers: 10,
MaxNumWorkers: 1000,
NumWorkers: DefaultNumWorkers,
queue: make(chan *task),
runtimeStats: stats.NewBasicStats(),
shutdownWorkers: make(chan struct{}),
@ -103,10 +94,8 @@ func (s *Satan) AddDaemon(d Daemon) {
// StartDaemons starts all registered daemons.
func (s *Satan) StartDaemons() {
s.addWorkers(s.MinNumWorkers)
if s.ScalePlan != nil {
go s.autoScale()
for i := 0; i < s.NumWorkers; i++ {
go s.runWorker()
}
}
@ -126,25 +115,10 @@ func (s *Satan) StopDaemons() {
fmt.Println(s.runtimeStats.Fetch(stats.TaskWait))
}
func (s *Satan) addWorkers(num uint32) {
for i := uint32(0); i < num; i++ {
go s.runWorker()
}
}
func (s *Satan) stopWorkers(num uint32) {
for i := uint32(0); i < num; i++ {
s.shutdownWorkers <- struct{}{}
}
}
func (s *Satan) runWorker() {
s.wgWorkers.Add(1)
defer s.wgWorkers.Done()
atomic.AddInt64(&s.numWorkers, 1)
defer atomic.AddInt64(&s.numWorkers, -1)
i := atomic.AddUint64(&workerIndex, 1)
s.Logger.Printf("Starting worker #%d", i)
@ -221,46 +195,6 @@ func (s *Satan) processGeneralTask(t *task) {
t.actor() // <--- ACTION STARTS HERE
}
func (s *Satan) autoScale() {
t := time.NewTicker(s.ScalePlan.Interval)
defer t.Stop()
for {
select {
case <-t.C:
s.adjustNumWorkers()
case <-s.shutdownSystem:
return
}
}
}
func (s *Satan) adjustNumWorkers() {
lat := s.runtimeStats.Fetch(stats.Latency)
tw := s.runtimeStats.Fetch(stats.TaskWait)
if lat.Processed() < int64(s.ScalePlan.MinProcessedTasks) {
return
}
if uint32(s.numWorkers)+s.ScalePlan.AdjustmentStep > s.MaxNumWorkers {
return
}
if lat.P95() > float64(s.ScalePlan.LatencyThreshold) {
s.addWorkers(s.ScalePlan.AdjustmentStep)
s.runtimeStats.Reset()
return
}
if uint32(s.numWorkers)-s.ScalePlan.AdjustmentStep < s.MinNumWorkers {
return
}
if tw.P95() > float64(s.ScalePlan.TaskWaitThreshold) {
s.stopWorkers(s.ScalePlan.AdjustmentStep)
s.runtimeStats.Reset()
return
}
}
func (t *task) String() string {
if t.name == "" {
return fmt.Sprintf("[unnamed %s process]", t.daemon)