From 5259b8552a832bd3388c61b087e00a62c773afa7 Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Sat, 21 Mar 2015 22:19:32 +0700 Subject: [PATCH] Use two db queueing interfaces: Now and Later --- db/db.go | 20 +++++++++++++++----- task/auth.go | 7 +++---- task/sync.go | 44 +++++++++++++++++++++++++++++++++++++------- task/task.go | 4 ++-- task/user.go | 47 ----------------------------------------------- 5 files changed, 57 insertions(+), 65 deletions(-) delete mode 100644 task/user.go diff --git a/db/db.go b/db/db.go index 4b16e5c..b925076 100644 --- a/db/db.go +++ b/db/db.go @@ -10,8 +10,9 @@ import ( ) var ( - db *sqlx.DB - queryQueue = make(chan func()) + db *sqlx.DB + priorityQueue = make(chan func()) + delayedQueue = make(chan func(), 1000) ) func Connect(params string) (err error) { @@ -44,13 +45,22 @@ func mustSelectN(dest interface{}, query string, params interface{}) { } } -func Queue(fun func()) { - queryQueue <- fun +func Now(fun func()) { + priorityQueue <- fun +} + +func Later(fun func()) { + delayedQueue <- fun } func processQueue() { for { - (<-queryQueue)() + var fun func() + select { + case fun = <-priorityQueue: + case fun = <-delayedQueue: + } + fun() } } diff --git a/task/auth.go b/task/auth.go index 8fb7c7c..61b40a3 100644 --- a/task/auth.go +++ b/task/auth.go @@ -26,15 +26,13 @@ func Authenticate(code string) (token, login string, err error) { return } login = user.Login - log.Println("Saving user", user) - db.Queue(func() { user.Save() }) + db.Later(func() { user.Save() }) tok := &db.Token{ User: login, Token: token, } - log.Println("Saving token", tok) - db.Queue(func() { tok.Save() }) + db.Later(func() { tok.Save() }) return } @@ -64,6 +62,7 @@ func FetchAccessToken(code string) (token string, err error) { if token = pairs.Get("access_token"); token == "" { err = fmt.Errorf("Failed to fetch access token usign code %q: %s", code, pairs.Get("error_description")) } + return } diff --git a/task/sync.go b/task/sync.go index c6eb602..3001516 100644 --- a/task/sync.go +++ b/task/sync.go @@ -7,6 +7,36 @@ import ( "github.com/localhots/empact/db" ) +func SyncUserInfo(token, login string) { + defer report(time.Now(), "SyncUserInfo (%s)", login) + client := newGithubClient(token) + + var user *github.User + var resp *github.Response + var err error + if user, resp, err = client.Users.Get(login); err != nil { + panic(err) + } + saveResponseMeta(token, resp) + + var name, avatarURL string + if user.Name != nil { + name = *user.Name + } + if user.AvatarURL != nil { + avatarURL = *user.AvatarURL + } + u := &db.User{ + Login: *user.Login, + Name: name, + ID: *user.ID, + AvatarURL: avatarURL, + } + db.Now(func() { u.Save() }) + + return +} + func SyncOrgRepos(token string, org *db.Org) { defer report(time.Now(), "SyncOrgRepos (%s)", org.Login) client := newGithubClient(token) @@ -36,7 +66,7 @@ func SyncOrgRepos(token string, org *db.Org) { IsFork: *repo.Fork, } go SyncContrib(token, org.Login, r) - db.Queue(func() { r.Save() }) + db.Now(func() { r.Save() }) } if opt.Page >= resp.LastPage { break @@ -74,7 +104,7 @@ func SyncContrib(token, owner string, repo *db.Repo) { Additions: *week.Additions, Deletions: *week.Deletions, } - db.Queue(func() { c.Save() }) + db.Now(func() { c.Save() }) } } } @@ -111,7 +141,7 @@ func SyncUserOrgs(token string) { go SyncOrgTeams(token, o) go SyncOrgMembers(token, o) go SyncOrgRepos(token, o) - db.Queue(func() { o.Save() }) + db.Now(func() { o.Save() }) } if opt.Page >= resp.LastPage { break @@ -146,7 +176,7 @@ func SyncOrgTeams(token string, org *db.Org) { } go SyncTeamMembers(token, t) go SyncTeamRepos(token, t) - db.Queue(func() { t.Save() }) + db.Now(func() { t.Save() }) } if opt.Page >= resp.LastPage { break @@ -180,7 +210,7 @@ func SyncOrgMembers(token string, org *db.Org) { break } } - db.Queue(func() { db.SaveOrgMembers(org.ID, ids) }) + db.Now(func() { db.SaveOrgMembers(org.ID, ids) }) return } @@ -208,7 +238,7 @@ func SyncTeamMembers(token string, team *db.Team) { break } } - db.Queue(func() { db.SaveTeamMembers(team.OrgID, team.ID, ids) }) + db.Now(func() { db.SaveTeamMembers(team.OrgID, team.ID, ids) }) return } @@ -236,7 +266,7 @@ func SyncTeamRepos(token string, team *db.Team) { break } } - db.Queue(func() { db.SaveTeamRepos(team.OrgID, team.ID, ids) }) + db.Now(func() { db.SaveTeamRepos(team.OrgID, team.ID, ids) }) return } diff --git a/task/task.go b/task/task.go index ab5ae73..3255ec0 100644 --- a/task/task.go +++ b/task/task.go @@ -21,13 +21,13 @@ func saveResponseMeta(token string, res *github.Response) { if res == nil { return } - tok := &db.Token{ + t := &db.Token{ Token: token, Quota: res.Limit, Remaining: res.Remaining, ResetAt: res.Reset.Time, } - db.Queue(func() { tok.Save() }) + db.Later(func() { t.Save() }) } func report(start time.Time, format string, args ...interface{}) { diff --git a/task/user.go b/task/user.go deleted file mode 100644 index 113ef9b..0000000 --- a/task/user.go +++ /dev/null @@ -1,47 +0,0 @@ -package task - -import ( - "time" - - "github.com/google/go-github/github" - "github.com/localhots/empact/db" -) - -func FetchUserInfo(token, login string) (u *db.User, err error) { - defer report(time.Now(), "FetchUserInfo (%s)", login) - client := newGithubClient(token) - var user *github.User - var resp *github.Response - if user, resp, err = client.Users.Get(login); err != nil { - return - } - saveResponseMeta(token, resp) - - name := "" - if n := user.Name; n != nil { - name = *user.Name - } - - avatarURL := "" - if url := user.AvatarURL; url != nil { - avatarURL = *user.AvatarURL - } - - u = &db.User{ - Login: *user.Login, - Name: name, - ID: *user.ID, - AvatarURL: avatarURL, - } - - return -} - -func SyncUserInfo(token, login string) (err error) { - defer report(time.Now(), "SyncUserInfo (%s)", login) - var u *db.User - if u, err = FetchUserInfo(token, login); err == nil { - db.Queue(func() { u.Save() }) - } - return -}