diff --git a/db/contrib.go b/db/contrib.go index 70f6b1d..9ad736c 100644 --- a/db/contrib.go +++ b/db/contrib.go @@ -17,13 +17,13 @@ type ( ) const ( - importContribQuery = "" + + saveContribQuery = "" + "replace into contributions (week, author, owner, repo, commits, additions, deletions) " + "values (?, ?, ?, ?, ?, ?, ?)" ) -func ImportContrib(c *Contrib) { - if _, err := stmt(importContribQuery).Exec(structs.Values(c)); err != nil { +func (c *Contrib) Save() { + if _, err := stmt(saveContribQuery).Exec(structs.Values(c)); err != nil { panic(err) } } diff --git a/db/repo.go b/db/repo.go index 93fdb31..3792840 100644 --- a/db/repo.go +++ b/db/repo.go @@ -12,11 +12,11 @@ type ( ) const ( - repoImportQuery = "replace into repos (owner, name, updated_at) values (?, ?, now())" + saveRepoQuery = "replace into repos (owner, name, updated_at) values (?, ?, now())" ) -func ImportRepo(r *Repo) { - if _, err := stmt(repoImportQuery).Exec(structs.Values(r)); err != nil { +func (r *Repo) Save() { + if _, err := stmt(saveRepoQuery).Exec(structs.Values(r)); err != nil { panic(err) } } diff --git a/db/task.go b/db/task.go new file mode 100644 index 0000000..1912654 --- /dev/null +++ b/db/task.go @@ -0,0 +1,32 @@ +package db + +import ( + "time" + + "github.com/fatih/structs" +) + +type ( + Task struct { + Token string + Owner string + Job string + Worker string + Duration int64 + Error string + CreatedAt time.Time + StartedAt time.Time + } +) + +const ( + saveTaskQuery = "" + + "insert into tasks (token, owner, job, worker, duration, error, created_at, started_at) " + + "values (?, ?, ?, ?, ?, ?, ?, ?)" +) + +func (t *Task) Save() { + if _, err := stmt(saveTaskQuery).Exec(structs.Values(t)); err != nil { + panic(err) + } +} diff --git a/job/job.go b/job/job.go index d7258ec..2d5840e 100644 --- a/job/job.go +++ b/job/job.go @@ -2,14 +2,18 @@ package job import ( "sync" + "time" + + "github.com/fatih/structs" + "github.com/localhots/steward/db" ) type ( Job struct { Name string - actor func(Task) + actor func(*db.Task) workers map[string]*worker - tasks chan Task + tasks chan *db.Task wg sync.WaitGroup } ) @@ -19,11 +23,13 @@ func New(name string, actor func()) *Job { Name: name, actor: actor, workers: make(map[string]*worker), - tasks: make(chan Task), + tasks: make(chan *db.Task, 1000), } } -func (j *Job) Perform(t Task) { +func (j *Job) Perform(t *db.Task) { + t.Job = structs.Name(t) + t.CreatedAt = time.Now() j.tasks <- t } diff --git a/job/management.go b/job/management.go deleted file mode 100644 index 73058a4..0000000 --- a/job/management.go +++ /dev/null @@ -1,26 +0,0 @@ -package job - -import ( - "time" - - "code.google.com/p/go-uuid/uuid" - "github.com/fatih/structs" -) - -type ( - Task struct{} - report struct { - duration time.Duration - err error - } -) - -func newID() string { - return uuid.New() -} - -func (t *Task) report(rep report) { - meta := structs.Map(t) - meta["duration"] = rep.duration - meta["error"] = rep.err -} diff --git a/job/worker.go b/job/worker.go index ccc682a..ab85753 100644 --- a/job/worker.go +++ b/job/worker.go @@ -2,6 +2,9 @@ package job import ( "time" + + "code.google.com/p/go-uuid/uuid" + "github.com/localhots/steward/db" ) type ( @@ -24,15 +27,19 @@ func (w *worker) workHard() { } } -func (w *worker) perform(t Task) { - start := time.Now() +func (w *worker) perform(t *db.Task) { + t.Worker = w.id + t.StartedAt = time.Now() defer func() { err := recover() - t.report(report{ - duration: time.Since(start), - success: err, - }) + t.Duration = time.Since(t.StartedAt).Nanoseconds() + t.Error = err.String() + t.Save() }() w.job.actor(t) } + +func newID() string { + return uuid.New() +} diff --git a/task/sync_contrib.go b/task/sync_contrib.go index 8554d49..4e3276e 100644 --- a/task/sync_contrib.go +++ b/task/sync_contrib.go @@ -2,29 +2,19 @@ package task import ( "github.com/localhots/steward/db" - "github.com/localhots/steward/job" ) type ( SyncContribTask struct { - Owner string - Repo string - Token string - job.Task + Repo string + db.Task } ) func SyncContrib(t SyncContribTask) { - contribs := fetchContrib(t.Token, t.Owner, t.Repo) - for _, c := range contribs { - db.ImportRepo(c) - } -} - -func fetchContrib(token, owner, repo string) (res []*db.Contrib) { - client := newGithubClient(token) - contribs, resp, err := client.Repositories.ListContributorsStats(owner, repo) - saveResponseMeta(token, resp) + client := newGithubClient(t.Token) + contribs, resp, err := client.Repositories.ListContributorsStats(t.Owner, t.Repo) + saveResponseMeta(t.Token, resp) if err != nil { if err.Error() == "EOF" { // Empty repository, not an actual error @@ -42,14 +32,12 @@ func fetchContrib(token, owner, repo string) (res []*db.Contrib) { res = append(res, &db.Contrib{ Week: week.Week.Time.Unix(), Author: *c.Author.Login, - Owner: owner, - Repo: repo, + Owner: t.Owner, + Repo: t.Repo, Commits: *week.Commits, Additions: *week.Additions, Deletions: *week.Deletions, }) } } - - return } diff --git a/task/sync_repos.go b/task/sync_repos.go index e35d185..6fc108a 100644 --- a/task/sync_repos.go +++ b/task/sync_repos.go @@ -3,50 +3,37 @@ package task import ( "github.com/google/go-github/github" "github.com/localhots/steward/db" - "github.com/localhots/steward/job" ) type ( SyncReposTask struct { - Owner string - Token string - job.Task + db.Task } ) func SyncRepos(t SyncReposTask) { - repos := fetchRepos(t.Token, t.Owner) - for _, repo := range repos { - db.ImportRepo(&db.Repo{ - Owner: t.Owner, - Name: repo, - }) + client := newGithubClient(token) + names := []string{} + opt := &github.RepositoryListByOrgOptions{ + ListOptions: github.ListOptions{}, } -} - -func fetchRepos(token, owner string) { - var ( - client = newGithubClient(token) - names = []string{} - opt = &github.RepositoryListByOrgOptions{ - ListOptions: github.ListOptions{}, - } - ) for { opt.Page++ - repos, resp, err := client.Repositories.ListByOrg(owner, opt) - saveResponseMeta(token, resp) + repos, resp, err := client.Repositories.ListByOrg(t.Owner, opt) + saveResponseMeta(t.Token, resp) if err != nil { panic(err) } for _, repo := range repos { - names = append(names, *repo.Name) + r := &db.Repo{ + Owner: t.Owner, + Name: *repo.Name, + } + r.Save() } if len(repos) < 30 { break } } - - return names }