Jobs simplified

This commit is contained in:
2015-03-05 02:08:36 +07:00
parent 9bb84d27b7
commit 3216f261c3
5 changed files with 111 additions and 36 deletions
+22 -9
View File
@@ -7,9 +7,9 @@ import (
type (
Job struct {
Name string
actor func()
actor func(Task)
workers map[string]*worker
orders chan order
tasks chan Task
wg sync.WaitGroup
}
)
@@ -19,28 +19,41 @@ func New(name string, actor func()) *Job {
Name: name,
actor: actor,
workers: make(map[string]*worker),
orders: make(chan order),
tasks: make(chan Task),
}
}
func (j *Job) Workers(n int) {
func (j *Job) Perform(t Task) {
j.tasks <- t
}
func (j *Job) Size() int {
return len(j.workers)
}
func (j *Job) Stop() {
j.Resize(0)
j.wg.Wait()
}
func (j *Job) Resize(n int) {
if n < 0 {
n = 0
}
if del := n - len(j.workers); del > 0 {
for i := 0; i < del; i++ {
w := &worker{
id: newID(),
job: j.Name,
wg: j.wg,
actor: j.actor,
id: newID(),
job: j,
shutdown: make(<-chan struct{}, 1),
}
go w.workHard()
j.workers[w.id] = w
}
j.wg.Add(del)
} else {
for i := 0; i > del; i-- {
j.orders <- stop
j.shutdown <- struct{}{}
}
}
}
+9 -6
View File
@@ -4,20 +4,23 @@ import (
"time"
"code.google.com/p/go-uuid/uuid"
"github.com/fatih/structs"
)
type (
order byte
Task struct{}
report struct {
duration time.Duration
success bool
err error
}
)
const (
stop order = iota
)
func newID() string {
return uuid.New()
}
func (t *Task) report(rep report) {
meta := structs.Map(t)
meta["duration"] = rep.duration
meta["error"] = rep.err
}
+12 -21
View File
@@ -1,18 +1,14 @@
package job
import (
"sync"
"time"
)
type (
worker struct {
id string
job string
wg *sync.WaitGroup
actor func()
reports chan<- report
orders <-chan order
id string
job *Job
shutdown <-chan struct{}
}
)
@@ -20,28 +16,23 @@ func (w *worker) workHard() {
defer w.wg.Done()
for {
select {
case o := <-w.orders:
switch o {
case stop:
return
default:
panic("Confused")
}
default:
action()
case <-w.shutdown:
return
case t := <-w.job.tasks:
w.perform(t)
}
}
}
func (w *worker) action() {
func (w *worker) perform(t Task) {
start := time.Now()
defer func() {
err := recover()
w.reports <- report{
t.report(report{
duration: time.Since(start),
success: (err == nil),
}
success: err,
})
}()
w.actor()
w.job.actor(t)
}