1
0
Fork 0

Task reports

This commit is contained in:
Gregory Eremin 2015-03-05 03:51:17 +07:00
parent 8ac16bb695
commit ed666552ef
8 changed files with 80 additions and 86 deletions

View File

@ -17,13 +17,13 @@ type (
)
const (
importContribQuery = "" +
saveContribQuery = "" +
"replace into contributions (week, author, owner, repo, commits, additions, deletions) " +
"values (?, ?, ?, ?, ?, ?, ?)"
)
func ImportContrib(c *Contrib) {
if _, err := stmt(importContribQuery).Exec(structs.Values(c)); err != nil {
func (c *Contrib) Save() {
if _, err := stmt(saveContribQuery).Exec(structs.Values(c)); err != nil {
panic(err)
}
}

View File

@ -12,11 +12,11 @@ type (
)
const (
repoImportQuery = "replace into repos (owner, name, updated_at) values (?, ?, now())"
saveRepoQuery = "replace into repos (owner, name, updated_at) values (?, ?, now())"
)
func ImportRepo(r *Repo) {
if _, err := stmt(repoImportQuery).Exec(structs.Values(r)); err != nil {
func (r *Repo) Save() {
if _, err := stmt(saveRepoQuery).Exec(structs.Values(r)); err != nil {
panic(err)
}
}

32
db/task.go Normal file
View File

@ -0,0 +1,32 @@
package db
import (
"time"
"github.com/fatih/structs"
)
type (
Task struct {
Token string
Owner string
Job string
Worker string
Duration int64
Error string
CreatedAt time.Time
StartedAt time.Time
}
)
const (
saveTaskQuery = "" +
"insert into tasks (token, owner, job, worker, duration, error, created_at, started_at) " +
"values (?, ?, ?, ?, ?, ?, ?, ?)"
)
func (t *Task) Save() {
if _, err := stmt(saveTaskQuery).Exec(structs.Values(t)); err != nil {
panic(err)
}
}

View File

@ -2,14 +2,18 @@ package job
import (
"sync"
"time"
"github.com/fatih/structs"
"github.com/localhots/steward/db"
)
type (
Job struct {
Name string
actor func(Task)
actor func(*db.Task)
workers map[string]*worker
tasks chan Task
tasks chan *db.Task
wg sync.WaitGroup
}
)
@ -19,11 +23,13 @@ func New(name string, actor func()) *Job {
Name: name,
actor: actor,
workers: make(map[string]*worker),
tasks: make(chan Task),
tasks: make(chan *db.Task, 1000),
}
}
func (j *Job) Perform(t Task) {
func (j *Job) Perform(t *db.Task) {
t.Job = structs.Name(t)
t.CreatedAt = time.Now()
j.tasks <- t
}

View File

@ -1,26 +0,0 @@
package job
import (
"time"
"code.google.com/p/go-uuid/uuid"
"github.com/fatih/structs"
)
type (
Task struct{}
report struct {
duration time.Duration
err error
}
)
func newID() string {
return uuid.New()
}
func (t *Task) report(rep report) {
meta := structs.Map(t)
meta["duration"] = rep.duration
meta["error"] = rep.err
}

View File

@ -2,6 +2,9 @@ package job
import (
"time"
"code.google.com/p/go-uuid/uuid"
"github.com/localhots/steward/db"
)
type (
@ -24,15 +27,19 @@ func (w *worker) workHard() {
}
}
func (w *worker) perform(t Task) {
start := time.Now()
func (w *worker) perform(t *db.Task) {
t.Worker = w.id
t.StartedAt = time.Now()
defer func() {
err := recover()
t.report(report{
duration: time.Since(start),
success: err,
})
t.Duration = time.Since(t.StartedAt).Nanoseconds()
t.Error = err.String()
t.Save()
}()
w.job.actor(t)
}
func newID() string {
return uuid.New()
}

View File

@ -2,29 +2,19 @@ package task
import (
"github.com/localhots/steward/db"
"github.com/localhots/steward/job"
)
type (
SyncContribTask struct {
Owner string
Repo string
Token string
job.Task
Repo string
db.Task
}
)
func SyncContrib(t SyncContribTask) {
contribs := fetchContrib(t.Token, t.Owner, t.Repo)
for _, c := range contribs {
db.ImportRepo(c)
}
}
func fetchContrib(token, owner, repo string) (res []*db.Contrib) {
client := newGithubClient(token)
contribs, resp, err := client.Repositories.ListContributorsStats(owner, repo)
saveResponseMeta(token, resp)
client := newGithubClient(t.Token)
contribs, resp, err := client.Repositories.ListContributorsStats(t.Owner, t.Repo)
saveResponseMeta(t.Token, resp)
if err != nil {
if err.Error() == "EOF" {
// Empty repository, not an actual error
@ -42,14 +32,12 @@ func fetchContrib(token, owner, repo string) (res []*db.Contrib) {
res = append(res, &db.Contrib{
Week: week.Week.Time.Unix(),
Author: *c.Author.Login,
Owner: owner,
Repo: repo,
Owner: t.Owner,
Repo: t.Repo,
Commits: *week.Commits,
Additions: *week.Additions,
Deletions: *week.Deletions,
})
}
}
return
}

View File

@ -3,50 +3,37 @@ package task
import (
"github.com/google/go-github/github"
"github.com/localhots/steward/db"
"github.com/localhots/steward/job"
)
type (
SyncReposTask struct {
Owner string
Token string
job.Task
db.Task
}
)
func SyncRepos(t SyncReposTask) {
repos := fetchRepos(t.Token, t.Owner)
for _, repo := range repos {
db.ImportRepo(&db.Repo{
Owner: t.Owner,
Name: repo,
})
client := newGithubClient(token)
names := []string{}
opt := &github.RepositoryListByOrgOptions{
ListOptions: github.ListOptions{},
}
}
func fetchRepos(token, owner string) {
var (
client = newGithubClient(token)
names = []string{}
opt = &github.RepositoryListByOrgOptions{
ListOptions: github.ListOptions{},
}
)
for {
opt.Page++
repos, resp, err := client.Repositories.ListByOrg(owner, opt)
saveResponseMeta(token, resp)
repos, resp, err := client.Repositories.ListByOrg(t.Owner, opt)
saveResponseMeta(t.Token, resp)
if err != nil {
panic(err)
}
for _, repo := range repos {
names = append(names, *repo.Name)
r := &db.Repo{
Owner: t.Owner,
Name: *repo.Name,
}
r.Save()
}
if len(repos) < 30 {
break
}
}
return names
}