1
0
Fork 0
empact/job/job.go

66 lines
990 B
Go
Raw Normal View History

2015-03-04 11:43:44 +00:00
package job
import (
"sync"
2015-03-04 20:51:17 +00:00
"time"
"github.com/fatih/structs"
"github.com/localhots/steward/db"
2015-03-04 11:43:44 +00:00
)
type (
Job struct {
Name string
2015-03-04 20:51:17 +00:00
actor func(*db.Task)
2015-03-04 11:43:44 +00:00
workers map[string]*worker
2015-03-04 20:51:17 +00:00
tasks chan *db.Task
2015-03-04 11:43:44 +00:00
wg sync.WaitGroup
}
)
func New(name string, actor func()) *Job {
&Job{
Name: name,
actor: actor,
workers: make(map[string]*worker),
2015-03-04 20:51:17 +00:00
tasks: make(chan *db.Task, 1000),
2015-03-04 11:43:44 +00:00
}
}
2015-03-04 20:51:17 +00:00
func (j *Job) Perform(t *db.Task) {
t.Job = structs.Name(t)
t.CreatedAt = time.Now()
2015-03-04 19:08:36 +00:00
j.tasks <- t
}
func (j *Job) Size() int {
return len(j.workers)
}
func (j *Job) Stop() {
j.Resize(0)
j.wg.Wait()
}
func (j *Job) Resize(n int) {
2015-03-04 11:43:44 +00:00
if n < 0 {
n = 0
}
if del := n - len(j.workers); del > 0 {
for i := 0; i < del; i++ {
w := &worker{
2015-03-04 19:08:36 +00:00
id: newID(),
job: j,
shutdown: make(<-chan struct{}, 1),
2015-03-04 11:43:44 +00:00
}
go w.workHard()
2015-03-04 19:08:36 +00:00
j.workers[w.id] = w
2015-03-04 11:43:44 +00:00
}
j.wg.Add(del)
} else {
for i := 0; i > del; i-- {
2015-03-04 19:08:36 +00:00
j.shutdown <- struct{}{}
2015-03-04 11:43:44 +00:00
}
}
}