1
0
Fork 0

Use two db queueing interfaces: Now and Later

This commit is contained in:
Gregory Eremin 2015-03-21 22:19:32 +07:00
parent 68c1465c92
commit 5259b8552a
5 changed files with 57 additions and 65 deletions

View File

@ -11,7 +11,8 @@ import (
var ( var (
db *sqlx.DB db *sqlx.DB
queryQueue = make(chan func()) priorityQueue = make(chan func())
delayedQueue = make(chan func(), 1000)
) )
func Connect(params string) (err error) { func Connect(params string) (err error) {
@ -44,13 +45,22 @@ func mustSelectN(dest interface{}, query string, params interface{}) {
} }
} }
func Queue(fun func()) { func Now(fun func()) {
queryQueue <- fun priorityQueue <- fun
}
func Later(fun func()) {
delayedQueue <- fun
} }
func processQueue() { func processQueue() {
for { for {
(<-queryQueue)() var fun func()
select {
case fun = <-priorityQueue:
case fun = <-delayedQueue:
}
fun()
} }
} }

View File

@ -26,15 +26,13 @@ func Authenticate(code string) (token, login string, err error) {
return return
} }
login = user.Login login = user.Login
log.Println("Saving user", user) db.Later(func() { user.Save() })
db.Queue(func() { user.Save() })
tok := &db.Token{ tok := &db.Token{
User: login, User: login,
Token: token, Token: token,
} }
log.Println("Saving token", tok) db.Later(func() { tok.Save() })
db.Queue(func() { tok.Save() })
return return
} }
@ -64,6 +62,7 @@ func FetchAccessToken(code string) (token string, err error) {
if token = pairs.Get("access_token"); token == "" { if token = pairs.Get("access_token"); token == "" {
err = fmt.Errorf("Failed to fetch access token usign code %q: %s", code, pairs.Get("error_description")) err = fmt.Errorf("Failed to fetch access token usign code %q: %s", code, pairs.Get("error_description"))
} }
return return
} }

View File

@ -7,6 +7,36 @@ import (
"github.com/localhots/empact/db" "github.com/localhots/empact/db"
) )
func SyncUserInfo(token, login string) {
defer report(time.Now(), "SyncUserInfo (%s)", login)
client := newGithubClient(token)
var user *github.User
var resp *github.Response
var err error
if user, resp, err = client.Users.Get(login); err != nil {
panic(err)
}
saveResponseMeta(token, resp)
var name, avatarURL string
if user.Name != nil {
name = *user.Name
}
if user.AvatarURL != nil {
avatarURL = *user.AvatarURL
}
u := &db.User{
Login: *user.Login,
Name: name,
ID: *user.ID,
AvatarURL: avatarURL,
}
db.Now(func() { u.Save() })
return
}
func SyncOrgRepos(token string, org *db.Org) { func SyncOrgRepos(token string, org *db.Org) {
defer report(time.Now(), "SyncOrgRepos (%s)", org.Login) defer report(time.Now(), "SyncOrgRepos (%s)", org.Login)
client := newGithubClient(token) client := newGithubClient(token)
@ -36,7 +66,7 @@ func SyncOrgRepos(token string, org *db.Org) {
IsFork: *repo.Fork, IsFork: *repo.Fork,
} }
go SyncContrib(token, org.Login, r) go SyncContrib(token, org.Login, r)
db.Queue(func() { r.Save() }) db.Now(func() { r.Save() })
} }
if opt.Page >= resp.LastPage { if opt.Page >= resp.LastPage {
break break
@ -74,7 +104,7 @@ func SyncContrib(token, owner string, repo *db.Repo) {
Additions: *week.Additions, Additions: *week.Additions,
Deletions: *week.Deletions, Deletions: *week.Deletions,
} }
db.Queue(func() { c.Save() }) db.Now(func() { c.Save() })
} }
} }
} }
@ -111,7 +141,7 @@ func SyncUserOrgs(token string) {
go SyncOrgTeams(token, o) go SyncOrgTeams(token, o)
go SyncOrgMembers(token, o) go SyncOrgMembers(token, o)
go SyncOrgRepos(token, o) go SyncOrgRepos(token, o)
db.Queue(func() { o.Save() }) db.Now(func() { o.Save() })
} }
if opt.Page >= resp.LastPage { if opt.Page >= resp.LastPage {
break break
@ -146,7 +176,7 @@ func SyncOrgTeams(token string, org *db.Org) {
} }
go SyncTeamMembers(token, t) go SyncTeamMembers(token, t)
go SyncTeamRepos(token, t) go SyncTeamRepos(token, t)
db.Queue(func() { t.Save() }) db.Now(func() { t.Save() })
} }
if opt.Page >= resp.LastPage { if opt.Page >= resp.LastPage {
break break
@ -180,7 +210,7 @@ func SyncOrgMembers(token string, org *db.Org) {
break break
} }
} }
db.Queue(func() { db.SaveOrgMembers(org.ID, ids) }) db.Now(func() { db.SaveOrgMembers(org.ID, ids) })
return return
} }
@ -208,7 +238,7 @@ func SyncTeamMembers(token string, team *db.Team) {
break break
} }
} }
db.Queue(func() { db.SaveTeamMembers(team.OrgID, team.ID, ids) }) db.Now(func() { db.SaveTeamMembers(team.OrgID, team.ID, ids) })
return return
} }
@ -236,7 +266,7 @@ func SyncTeamRepos(token string, team *db.Team) {
break break
} }
} }
db.Queue(func() { db.SaveTeamRepos(team.OrgID, team.ID, ids) }) db.Now(func() { db.SaveTeamRepos(team.OrgID, team.ID, ids) })
return return
} }

View File

@ -21,13 +21,13 @@ func saveResponseMeta(token string, res *github.Response) {
if res == nil { if res == nil {
return return
} }
tok := &db.Token{ t := &db.Token{
Token: token, Token: token,
Quota: res.Limit, Quota: res.Limit,
Remaining: res.Remaining, Remaining: res.Remaining,
ResetAt: res.Reset.Time, ResetAt: res.Reset.Time,
} }
db.Queue(func() { tok.Save() }) db.Later(func() { t.Save() })
} }
func report(start time.Time, format string, args ...interface{}) { func report(start time.Time, format string, args ...interface{}) {

View File

@ -1,47 +0,0 @@
package task
import (
"time"
"github.com/google/go-github/github"
"github.com/localhots/empact/db"
)
func FetchUserInfo(token, login string) (u *db.User, err error) {
defer report(time.Now(), "FetchUserInfo (%s)", login)
client := newGithubClient(token)
var user *github.User
var resp *github.Response
if user, resp, err = client.Users.Get(login); err != nil {
return
}
saveResponseMeta(token, resp)
name := ""
if n := user.Name; n != nil {
name = *user.Name
}
avatarURL := ""
if url := user.AvatarURL; url != nil {
avatarURL = *user.AvatarURL
}
u = &db.User{
Login: *user.Login,
Name: name,
ID: *user.ID,
AvatarURL: avatarURL,
}
return
}
func SyncUserInfo(token, login string) (err error) {
defer report(time.Now(), "SyncUserInfo (%s)", login)
var u *db.User
if u, err = FetchUserInfo(token, login); err == nil {
db.Queue(func() { u.Save() })
}
return
}