From 3216f261c3894a53a8310c8f7eaa7f7b06e9ac29 Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Thu, 5 Mar 2015 02:08:36 +0700 Subject: [PATCH] Jobs simplified --- job/job.go | 31 +++++++++++++++++-------- job/management.go | 15 +++++++----- job/worker.go | 33 ++++++++++---------------- task/common.go | 13 +++++++++++ task/sync_contrib.go | 55 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 111 insertions(+), 36 deletions(-) create mode 100644 task/common.go create mode 100644 task/sync_contrib.go diff --git a/job/job.go b/job/job.go index c77c791..d7258ec 100644 --- a/job/job.go +++ b/job/job.go @@ -7,9 +7,9 @@ import ( type ( Job struct { Name string - actor func() + actor func(Task) workers map[string]*worker - orders chan order + tasks chan Task wg sync.WaitGroup } ) @@ -19,28 +19,41 @@ func New(name string, actor func()) *Job { Name: name, actor: actor, workers: make(map[string]*worker), - orders: make(chan order), + tasks: make(chan Task), } } -func (j *Job) Workers(n int) { +func (j *Job) Perform(t Task) { + j.tasks <- t +} + +func (j *Job) Size() int { + return len(j.workers) +} + +func (j *Job) Stop() { + j.Resize(0) + j.wg.Wait() +} + +func (j *Job) Resize(n int) { if n < 0 { n = 0 } if del := n - len(j.workers); del > 0 { for i := 0; i < del; i++ { w := &worker{ - id: newID(), - job: j.Name, - wg: j.wg, - actor: j.actor, + id: newID(), + job: j, + shutdown: make(<-chan struct{}, 1), } go w.workHard() + j.workers[w.id] = w } j.wg.Add(del) } else { for i := 0; i > del; i-- { - j.orders <- stop + j.shutdown <- struct{}{} } } } diff --git a/job/management.go b/job/management.go index d41012c..73058a4 100644 --- a/job/management.go +++ b/job/management.go @@ -4,20 +4,23 @@ import ( "time" "code.google.com/p/go-uuid/uuid" + "github.com/fatih/structs" ) type ( - order byte + Task struct{} report struct { duration time.Duration - success bool + err error } ) -const ( - stop order = iota -) - func newID() string { return uuid.New() } + +func (t *Task) report(rep report) { + meta := structs.Map(t) + meta["duration"] = rep.duration + meta["error"] = rep.err +} diff --git a/job/worker.go b/job/worker.go index 98021f1..ccc682a 100644 --- a/job/worker.go +++ b/job/worker.go @@ -1,18 +1,14 @@ package job import ( - "sync" "time" ) type ( worker struct { - id string - job string - wg *sync.WaitGroup - actor func() - reports chan<- report - orders <-chan order + id string + job *Job + shutdown <-chan struct{} } ) @@ -20,28 +16,23 @@ func (w *worker) workHard() { defer w.wg.Done() for { select { - case o := <-w.orders: - switch o { - case stop: - return - default: - panic("Confused") - } - default: - action() + case <-w.shutdown: + return + case t := <-w.job.tasks: + w.perform(t) } } } -func (w *worker) action() { +func (w *worker) perform(t Task) { start := time.Now() defer func() { err := recover() - w.reports <- report{ + t.report(report{ duration: time.Since(start), - success: (err == nil), - } + success: err, + }) }() - w.actor() + w.job.actor(t) } diff --git a/task/common.go b/task/common.go new file mode 100644 index 0000000..6a7f3f5 --- /dev/null +++ b/task/common.go @@ -0,0 +1,13 @@ +package task + +import ( + "code.google.com/p/goauth2/oauth" + "github.com/google/go-github/github" +) + +func newGithubClient(token string) *github.Client { + trans := &oauth.Transport{ + Token: &oauth.Token{AccessToken: token}, + } + return github.NewClient(trans.Client()) +} diff --git a/task/sync_contrib.go b/task/sync_contrib.go new file mode 100644 index 0000000..f66fa21 --- /dev/null +++ b/task/sync_contrib.go @@ -0,0 +1,55 @@ +package task + +import ( + "github.com/google/go-github/github" + "github.com/localhots/steward/db" + "github.com/localhots/steward/job" +) + +type ( + SyncContribTask struct { + Owner string + Repo string + Token string + job.Task + } +) + +func SyncContrib(t SyncContribTask) { + contribs := fetchContrib(newGithubClient(t.Token), t.Owner, t.Repo) + for _, c := range contribs { + db.ImportRepo(c) + } +} + +func fetchContrib(client *github.Client, owner, repo string) (res []*db.Contrib) { + contribs, resp, err := client.Repositories.ListContributorsStats(owner, repo) + // c.saveResponseMeta(resp) + if err != nil { + if err.Error() == "EOF" { + // Empty repository, not an actual error + return + } + panic(err) + } + + for _, c := range contribs { + for _, week := range c.Weeks { + if *week.Commits == 0 { + continue + } + + res = append(res, &db.Contrib{ + Week: week.Week.Time.Unix(), + Author: *c.Author.Login, + Owner: owner, + Repo: repo, + Commits: *week.Commits, + Additions: *week.Additions, + Deletions: *week.Deletions, + }) + } + } + + return +}