diff --git a/job/job.go b/job/job.go new file mode 100644 index 0000000..c77c791 --- /dev/null +++ b/job/job.go @@ -0,0 +1,46 @@ +package job + +import ( + "sync" +) + +type ( + Job struct { + Name string + actor func() + workers map[string]*worker + orders chan order + wg sync.WaitGroup + } +) + +func New(name string, actor func()) *Job { + &Job{ + Name: name, + actor: actor, + workers: make(map[string]*worker), + orders: make(chan order), + } +} + +func (j *Job) Workers(n int) { + if n < 0 { + n = 0 + } + if del := n - len(j.workers); del > 0 { + for i := 0; i < del; i++ { + w := &worker{ + id: newID(), + job: j.Name, + wg: j.wg, + actor: j.actor, + } + go w.workHard() + } + j.wg.Add(del) + } else { + for i := 0; i > del; i-- { + j.orders <- stop + } + } +} diff --git a/job/management.go b/job/management.go new file mode 100644 index 0000000..d41012c --- /dev/null +++ b/job/management.go @@ -0,0 +1,23 @@ +package job + +import ( + "time" + + "code.google.com/p/go-uuid/uuid" +) + +type ( + order byte + report struct { + duration time.Duration + success bool + } +) + +const ( + stop order = iota +) + +func newID() string { + return uuid.New() +} diff --git a/job/worker.go b/job/worker.go new file mode 100644 index 0000000..98021f1 --- /dev/null +++ b/job/worker.go @@ -0,0 +1,47 @@ +package job + +import ( + "sync" + "time" +) + +type ( + worker struct { + id string + job string + wg *sync.WaitGroup + actor func() + reports chan<- report + orders <-chan order + } +) + +func (w *worker) workHard() { + defer w.wg.Done() + for { + select { + case o := <-w.orders: + switch o { + case stop: + return + default: + panic("Confused") + } + default: + action() + } + } +} + +func (w *worker) action() { + start := time.Now() + defer func() { + err := recover() + w.reports <- report{ + duration: time.Since(start), + success: (err == nil), + } + }() + + w.actor() +}