Auth works
This commit is contained in:
+40
-16
@@ -4,35 +4,60 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"code.google.com/p/go-uuid/uuid"
|
||||
|
||||
"github.com/fatih/structs"
|
||||
"github.com/localhots/empact/db"
|
||||
"github.com/localhots/empact/task"
|
||||
)
|
||||
|
||||
type (
|
||||
Job struct {
|
||||
Name string
|
||||
actor func(*db.Task)
|
||||
actor func(task.Tasker)
|
||||
workers map[string]*worker
|
||||
tasks chan *db.Task
|
||||
orders chan struct{} // Currently shutdown only
|
||||
tasks chan task.Tasker
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
)
|
||||
|
||||
func New(name string, actor func()) *Job {
|
||||
&Job{
|
||||
var (
|
||||
jobs = map[string]*Job{}
|
||||
)
|
||||
|
||||
func Enqueue(t task.Tasker) {
|
||||
dt := t.T()
|
||||
dt.Job = structs.Name(t)
|
||||
dt.CreatedAt = time.Now()
|
||||
|
||||
j, ok := jobs[dt.Job]
|
||||
if !ok {
|
||||
switch dt.Job {
|
||||
case "FetchAccessTokenTask":
|
||||
j = New(dt.Job, task.FetchAccessToken)
|
||||
case "SyncContribTask":
|
||||
j = New(dt.Job, task.SyncContrib)
|
||||
case "SyncReposTask":
|
||||
j = New(dt.Job, task.SyncRepos)
|
||||
default:
|
||||
panic("Unknown task: " + dt.Job)
|
||||
}
|
||||
jobs[dt.Job] = j
|
||||
j.Resize(1)
|
||||
}
|
||||
j.tasks <- t
|
||||
}
|
||||
|
||||
func New(name string, actor func(task.Tasker)) *Job {
|
||||
return &Job{
|
||||
Name: name,
|
||||
actor: actor,
|
||||
workers: make(map[string]*worker),
|
||||
tasks: make(chan *db.Task, 1000),
|
||||
orders: make(chan struct{}),
|
||||
tasks: make(chan task.Tasker, 1000),
|
||||
}
|
||||
}
|
||||
|
||||
func (j *Job) Perform(t *db.Task) {
|
||||
t.Job = structs.Name(t)
|
||||
t.CreatedAt = time.Now()
|
||||
j.tasks <- t
|
||||
}
|
||||
|
||||
func (j *Job) Size() int {
|
||||
return len(j.workers)
|
||||
}
|
||||
@@ -49,9 +74,8 @@ func (j *Job) Resize(n int) {
|
||||
if del := n - len(j.workers); del > 0 {
|
||||
for i := 0; i < del; i++ {
|
||||
w := &worker{
|
||||
id: newID(),
|
||||
job: j,
|
||||
shutdown: make(<-chan struct{}, 1),
|
||||
id: uuid.New(),
|
||||
job: j,
|
||||
}
|
||||
go w.workHard()
|
||||
j.workers[w.id] = w
|
||||
@@ -59,7 +83,7 @@ func (j *Job) Resize(n int) {
|
||||
j.wg.Add(del)
|
||||
} else {
|
||||
for i := 0; i > del; i-- {
|
||||
j.shutdown <- struct{}{}
|
||||
j.orders <- struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+15
-17
@@ -1,17 +1,17 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"code.google.com/p/go-uuid/uuid"
|
||||
"github.com/localhots/empact/db"
|
||||
"github.com/localhots/empact/task"
|
||||
)
|
||||
|
||||
type (
|
||||
worker struct {
|
||||
id string
|
||||
job *Job
|
||||
shutdown <-chan struct{}
|
||||
id string
|
||||
job *Job
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
)
|
||||
|
||||
@@ -19,7 +19,7 @@ func (w *worker) workHard() {
|
||||
defer w.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-w.shutdown:
|
||||
case <-w.job.orders:
|
||||
return
|
||||
case t := <-w.job.tasks:
|
||||
w.perform(t)
|
||||
@@ -27,19 +27,17 @@ func (w *worker) workHard() {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *worker) perform(t *db.Task) {
|
||||
t.Worker = w.id
|
||||
t.StartedAt = time.Now()
|
||||
func (w *worker) perform(t task.Tasker) {
|
||||
dt := t.T()
|
||||
dt.Worker = w.id
|
||||
dt.StartedAt = time.Now()
|
||||
defer func() {
|
||||
err := recover()
|
||||
t.Duration = time.Since(t.StartedAt).Nanoseconds()
|
||||
t.Error = err.String()
|
||||
t.Save()
|
||||
if err := recover(); err != nil {
|
||||
// dt.Error = err.(string)
|
||||
}
|
||||
dt.Duration = time.Since(dt.StartedAt).Nanoseconds()
|
||||
dt.Save()
|
||||
}()
|
||||
|
||||
w.job.actor(t)
|
||||
}
|
||||
|
||||
func newID() string {
|
||||
return uuid.New()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user