1
0
Fork 0

Job abstraction

This commit is contained in:
Gregory Eremin 2015-03-04 18:43:44 +07:00
parent a4da88659d
commit 9bb84d27b7
3 changed files with 116 additions and 0 deletions

46
job/job.go Normal file
View File

@ -0,0 +1,46 @@
package job
import (
"sync"
)
type (
Job struct {
Name string
actor func()
workers map[string]*worker
orders chan order
wg sync.WaitGroup
}
)
func New(name string, actor func()) *Job {
&Job{
Name: name,
actor: actor,
workers: make(map[string]*worker),
orders: make(chan order),
}
}
func (j *Job) Workers(n int) {
if n < 0 {
n = 0
}
if del := n - len(j.workers); del > 0 {
for i := 0; i < del; i++ {
w := &worker{
id: newID(),
job: j.Name,
wg: j.wg,
actor: j.actor,
}
go w.workHard()
}
j.wg.Add(del)
} else {
for i := 0; i > del; i-- {
j.orders <- stop
}
}
}

23
job/management.go Normal file
View File

@ -0,0 +1,23 @@
package job
import (
"time"
"code.google.com/p/go-uuid/uuid"
)
type (
order byte
report struct {
duration time.Duration
success bool
}
)
const (
stop order = iota
)
func newID() string {
return uuid.New()
}

47
job/worker.go Normal file
View File

@ -0,0 +1,47 @@
package job
import (
"sync"
"time"
)
type (
worker struct {
id string
job string
wg *sync.WaitGroup
actor func()
reports chan<- report
orders <-chan order
}
)
func (w *worker) workHard() {
defer w.wg.Done()
for {
select {
case o := <-w.orders:
switch o {
case stop:
return
default:
panic("Confused")
}
default:
action()
}
}
}
func (w *worker) action() {
start := time.Now()
defer func() {
err := recover()
w.reports <- report{
duration: time.Since(start),
success: (err == nil),
}
}()
w.actor()
}