From 522d53b14be64b9f87e15922caf6065a4892c93b Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Fri, 6 Mar 2015 17:01:55 +0700 Subject: [PATCH] We don't need jobs! --- job/job.go | 89 -------------------------------------------- job/worker.go | 43 --------------------- task/access_token.go | 15 ++------ task/common.go | 7 ---- task/sync_contrib.go | 19 +++------- task/sync_repos.go | 17 +++------ 6 files changed, 13 insertions(+), 177 deletions(-) delete mode 100644 job/job.go delete mode 100644 job/worker.go diff --git a/job/job.go b/job/job.go deleted file mode 100644 index 7c5a753..0000000 --- a/job/job.go +++ /dev/null @@ -1,89 +0,0 @@ -package job - -import ( - "sync" - "time" - - "code.google.com/p/go-uuid/uuid" - - "github.com/fatih/structs" - "github.com/localhots/empact/task" -) - -type ( - Job struct { - Name string - actor func(task.Tasker) - workers map[string]*worker - orders chan struct{} // Currently shutdown only - tasks chan task.Tasker - wg sync.WaitGroup - } -) - -var ( - jobs = map[string]*Job{} -) - -func Enqueue(t task.Tasker) { - dt := t.T() - dt.Job = structs.Name(t) - dt.CreatedAt = time.Now() - - j, ok := jobs[dt.Job] - if !ok { - switch dt.Job { - case "FetchAccessTokenTask": - j = New(dt.Job, task.FetchAccessToken) - case "SyncContribTask": - j = New(dt.Job, task.SyncContrib) - case "SyncReposTask": - j = New(dt.Job, task.SyncRepos) - default: - panic("Unknown task: " + dt.Job) - } - jobs[dt.Job] = j - j.Resize(1) - } - j.tasks <- t -} - -func New(name string, actor func(task.Tasker)) *Job { - return &Job{ - Name: name, - actor: actor, - workers: make(map[string]*worker), - orders: make(chan struct{}), - tasks: make(chan task.Tasker, 1000), - } -} - -func (j *Job) Size() int { - return len(j.workers) -} - -func (j *Job) Stop() { - j.Resize(0) - j.wg.Wait() -} - -func (j *Job) Resize(n int) { - if n < 0 { - n = 0 - } - if del := n - len(j.workers); del > 0 { - for i := 0; i < del; i++ { - w := &worker{ - id: uuid.New(), - job: j, - } - go w.workHard() - j.workers[w.id] = w - } - j.wg.Add(del) - } else { - for i := 0; i > del; i-- { - j.orders <- struct{}{} - } - } -} diff --git a/job/worker.go b/job/worker.go deleted file mode 100644 index 754465c..0000000 --- a/job/worker.go +++ /dev/null @@ -1,43 +0,0 @@ -package job - -import ( - "sync" - "time" - - "github.com/localhots/empact/task" -) - -type ( - worker struct { - id string - job *Job - wg *sync.WaitGroup - } -) - -func (w *worker) workHard() { - defer w.wg.Done() - for { - select { - case <-w.job.orders: - return - case t := <-w.job.tasks: - w.perform(t) - } - } -} - -func (w *worker) perform(t task.Tasker) { - dt := t.T() - dt.Worker = w.id - dt.StartedAt = time.Now() - defer func() { - if err := recover(); err != nil { - // dt.Error = err.(string) - } - dt.Duration = time.Since(dt.StartedAt).Nanoseconds() - dt.Save() - }() - - w.job.actor(t) -} diff --git a/task/access_token.go b/task/access_token.go index 87987e1..829e058 100644 --- a/task/access_token.go +++ b/task/access_token.go @@ -12,20 +12,11 @@ import ( "github.com/localhots/empact/db" ) -type ( - FetchAccessTokenTask struct { - Code string - Result chan string - *db.Task - } -) - -func FetchAccessToken(tk Tasker) { - t := tk.(*FetchAccessTokenTask) +func FetchAccessToken(code string, result chan string) { payload := url.Values{} payload.Set("client_id", config.C().ClientID) payload.Set("client_secret", config.C().ClientSecret) - payload.Set("code", t.Code) + payload.Set("code", code) payload.Set("redirect_uri", config.C().RedirectURI) buf := bytes.NewBuffer([]byte(payload.Encode())) @@ -72,5 +63,5 @@ func FetchAccessToken(tk Tasker) { fmt.Println("Saving token", tok) tok.Save() - t.Result <- user.Login + result <- user.Login } diff --git a/task/common.go b/task/common.go index dcee152..ac4b64b 100644 --- a/task/common.go +++ b/task/common.go @@ -6,13 +6,6 @@ import ( "github.com/localhots/empact/db" ) -type ( - Tasker interface { - Save() - T() *db.Task - } -) - func newGithubClient(token string) *github.Client { trans := &oauth.Transport{ Token: &oauth.Token{AccessToken: token}, diff --git a/task/sync_contrib.go b/task/sync_contrib.go index f1d7aa9..d895d0f 100644 --- a/task/sync_contrib.go +++ b/task/sync_contrib.go @@ -4,22 +4,13 @@ import ( "github.com/localhots/empact/db" ) -type ( - SyncContribTask struct { - Repo string - *db.Task - } -) - -func SyncContrib(tk Tasker) { - t := tk.(*SyncContribTask) - client := newGithubClient(t.Token) - contribs, resp, err := client.Repositories.ListContributorsStats(t.Owner, t.Repo) - saveResponseMeta(t.Token, resp) +func SyncContrib(token, owner, repo string) { + client := newGithubClient(token) + contribs, resp, err := client.Repositories.ListContributorsStats(owner, repo) + saveResponseMeta(token, resp) if err != nil { if err.Error() == "EOF" { - // Empty repository, not an actual error - return + return // Empty repository, not an actual error } panic(err) } diff --git a/task/sync_repos.go b/task/sync_repos.go index 0dad71c..cfb6c10 100644 --- a/task/sync_repos.go +++ b/task/sync_repos.go @@ -5,29 +5,22 @@ import ( "github.com/localhots/empact/db" ) -type ( - SyncReposTask struct { - *db.Task - } -) - -func SyncRepos(tk Tasker) { - t := tk.(*SyncReposTask) - client := newGithubClient(t.Token) +func SyncRepos(token, owner string) { + client := newGithubClient(token) opt := &github.RepositoryListByOrgOptions{ ListOptions: github.ListOptions{}, } for { opt.Page++ - repos, resp, err := client.Repositories.ListByOrg(t.Owner, opt) - saveResponseMeta(t.Token, resp) + repos, resp, err := client.Repositories.ListByOrg(owner, opt) + saveResponseMeta(token, resp) if err != nil { panic(err) } for _, repo := range repos { r := &db.Repo{ - Owner: t.Owner, + Owner: owner, Name: *repo.Name, } r.Save()