diff --git a/.gitignore b/.gitignore index d344ba6..87f58f1 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ config.json +*.txt diff --git a/config.example.json b/config.example.json index 5e927e1..452275b 100644 --- a/config.example.json +++ b/config.example.json @@ -1,4 +1,8 @@ { + "app_domain": "localhost", + "database_uri": "root@/empact", + "github_auth_url": "https://github.com/login/oauth/authorize", + "github_access_token_url": "https://github.com/login/oauth/access_token", "github_client_id": "XXXXXXXXXXXXXXXXXXXX", "github_client_secret": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", "github_redirect_uri": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" diff --git a/db/token.go b/db/token.go index 25bc029..fd47dc2 100644 --- a/db/token.go +++ b/db/token.go @@ -15,3 +15,7 @@ type ( CreatedAt time.Time } ) + +func UpdateToken(t *Token) { + +} diff --git a/job/job.go b/job/job.go index ebc1035..7c5a753 100644 --- a/job/job.go +++ b/job/job.go @@ -4,35 +4,60 @@ import ( "sync" "time" + "code.google.com/p/go-uuid/uuid" + "github.com/fatih/structs" - "github.com/localhots/empact/db" + "github.com/localhots/empact/task" ) type ( Job struct { Name string - actor func(*db.Task) + actor func(task.Tasker) workers map[string]*worker - tasks chan *db.Task + orders chan struct{} // Currently shutdown only + tasks chan task.Tasker wg sync.WaitGroup } ) -func New(name string, actor func()) *Job { - &Job{ +var ( + jobs = map[string]*Job{} +) + +func Enqueue(t task.Tasker) { + dt := t.T() + dt.Job = structs.Name(t) + dt.CreatedAt = time.Now() + + j, ok := jobs[dt.Job] + if !ok { + switch dt.Job { + case "FetchAccessTokenTask": + j = New(dt.Job, task.FetchAccessToken) + case "SyncContribTask": + j = New(dt.Job, task.SyncContrib) + case "SyncReposTask": + j = New(dt.Job, task.SyncRepos) + default: + panic("Unknown task: " + dt.Job) + } + jobs[dt.Job] = j + j.Resize(1) + } + j.tasks <- t +} + +func New(name string, actor func(task.Tasker)) *Job { + return &Job{ Name: name, actor: actor, workers: make(map[string]*worker), - tasks: make(chan *db.Task, 1000), + orders: make(chan struct{}), + tasks: make(chan task.Tasker, 1000), } } -func (j *Job) Perform(t *db.Task) { - t.Job = structs.Name(t) - t.CreatedAt = time.Now() - j.tasks <- t -} - func (j *Job) Size() int { return len(j.workers) } @@ -49,9 +74,8 @@ func (j *Job) Resize(n int) { if del := n - len(j.workers); del > 0 { for i := 0; i < del; i++ { w := &worker{ - id: newID(), - job: j, - shutdown: make(<-chan struct{}, 1), + id: uuid.New(), + job: j, } go w.workHard() j.workers[w.id] = w @@ -59,7 +83,7 @@ func (j *Job) Resize(n int) { j.wg.Add(del) } else { for i := 0; i > del; i-- { - j.shutdown <- struct{}{} + j.orders <- struct{}{} } } } diff --git a/job/worker.go b/job/worker.go index b26a38c..754465c 100644 --- a/job/worker.go +++ b/job/worker.go @@ -1,17 +1,17 @@ package job import ( + "sync" "time" - "code.google.com/p/go-uuid/uuid" - "github.com/localhots/empact/db" + "github.com/localhots/empact/task" ) type ( worker struct { - id string - job *Job - shutdown <-chan struct{} + id string + job *Job + wg *sync.WaitGroup } ) @@ -19,7 +19,7 @@ func (w *worker) workHard() { defer w.wg.Done() for { select { - case <-w.shutdown: + case <-w.job.orders: return case t := <-w.job.tasks: w.perform(t) @@ -27,19 +27,17 @@ func (w *worker) workHard() { } } -func (w *worker) perform(t *db.Task) { - t.Worker = w.id - t.StartedAt = time.Now() +func (w *worker) perform(t task.Tasker) { + dt := t.T() + dt.Worker = w.id + dt.StartedAt = time.Now() defer func() { - err := recover() - t.Duration = time.Since(t.StartedAt).Nanoseconds() - t.Error = err.String() - t.Save() + if err := recover(); err != nil { + // dt.Error = err.(string) + } + dt.Duration = time.Since(dt.StartedAt).Nanoseconds() + dt.Save() }() w.job.actor(t) } - -func newID() string { - return uuid.New() -} diff --git a/main.go b/main.go new file mode 100644 index 0000000..3a0fac3 --- /dev/null +++ b/main.go @@ -0,0 +1,15 @@ +package main + +import ( + "github.com/localhots/empact/config" + "github.com/localhots/empact/db" + "github.com/localhots/empact/server" +) + +func main() { + if err := db.Connect(config.C().DatabaseURI); err != nil { + panic(err) + } + + server.Start() +} diff --git a/server/auth.go b/server/auth.go index 425b50b..134fa7f 100644 --- a/server/auth.go +++ b/server/auth.go @@ -1,18 +1,14 @@ package server import ( - "bytes" "fmt" - "io/ioutil" "net/http" "net/url" "github.com/localhots/empact/config" -) - -const ( - authURL = "https://github.com/login/oauth/authorize" - accessTokenURL = "https://github.com/login/oauth/access_token" + "github.com/localhots/empact/db" + "github.com/localhots/empact/job" + "github.com/localhots/empact/task" ) func authSigninHandler(w http.ResponseWriter, r *http.Request) { @@ -20,39 +16,28 @@ func authSigninHandler(w http.ResponseWriter, r *http.Request) { params.Set("client_id", config.C().ClientID) params.Set("redirect_uri", config.C().RedirectURI) params.Set("scope", "repo") - http.Redirect(w, r, authURL+"?"+params.Encode(), 302) + http.Redirect(w, r, config.C().AuthURL+"?"+params.Encode(), 302) } func authCallbackHandler(w http.ResponseWriter, r *http.Request) { if r.FormValue("error") != "" { w.Write([]byte(r.FormValue("error_description"))) } else { - fmt.Println("Got code: ", r.FormValue("code")) - token := getAccessToken(r.FormValue("code")) - fmt.Println("Got access token: ", token) - w.Write([]byte(token)) + code := r.FormValue("code") + fmt.Println("Got code: ", code) + + res := make(chan string) + job.Enqueue(&task.FetchAccessTokenTask{ + Code: code, + Result: res, + Task: &db.Task{}, + }) + + if token, ok := <-res; ok { + fmt.Println("Got access token: ", token) + w.Write([]byte(token)) + } else { + panic("Failed to fetch token") + } } } - -func getAccessToken(code string) string { - payload := url.Values{} - payload.Set("client_id", config.C().ClientID) - payload.Set("client_secret", config.C().ClientSecret) - payload.Set("code", code) - payload.Set("redirect_uri", config.C().RedirectURI) - - buf := bytes.NewBuffer([]byte(payload.Encode())) - resp, err := http.Post(accessTokenURL, "application/x-www-form-urlencoded", buf) - if err != nil { - panic(err) - } - - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - panic(err) - } - - pairs, _ := url.ParseQuery(string(body)) - return pairs.Get("access_token") -} diff --git a/server/server.go b/server/server.go index d33fc9d..2272fad 100644 --- a/server/server.go +++ b/server/server.go @@ -1,11 +1,39 @@ package server import ( + "fmt" "net/http" + "time" + + "code.google.com/p/go-uuid/uuid" +) + +const ( + sessionCookie = "session_id" ) func Start() { + fmt.Println("Starting server at http://localhost:8080") + http.HandleFunc("/", sessionHandler) http.HandleFunc("/auth/signin", authSigninHandler) http.HandleFunc("/auth/callback", authCallbackHandler) http.ListenAndServe(":8080", nil) } + +func sessionHandler(w http.ResponseWriter, r *http.Request) { + if cook, err := r.Cookie(sessionCookie); err != nil { + cook = &http.Cookie{ + Name: sessionCookie, + Value: uuid.New(), + Path: "/", + Expires: time.Now().Add(365 * 24 * time.Hour), + HttpOnly: true, + } + http.SetCookie(w, cook) + } +} + +func sessionID(r *http.Request) string { + cook, _ := r.Cookie(sessionCookie) + return cook.Value +} diff --git a/task/access_token.go b/task/access_token.go new file mode 100644 index 0000000..2b4a0e8 --- /dev/null +++ b/task/access_token.go @@ -0,0 +1,44 @@ +package task + +import ( + "bytes" + "io/ioutil" + "net/http" + "net/url" + + "github.com/localhots/empact/config" + "github.com/localhots/empact/db" +) + +type ( + FetchAccessTokenTask struct { + Code string + Result chan string + *db.Task + } +) + +func FetchAccessToken(tk Tasker) { + t := tk.(*FetchAccessTokenTask) + payload := url.Values{} + payload.Set("client_id", config.C().ClientID) + payload.Set("client_secret", config.C().ClientSecret) + payload.Set("code", t.Code) + payload.Set("redirect_uri", config.C().RedirectURI) + + buf := bytes.NewBuffer([]byte(payload.Encode())) + resp, err := http.Post(config.C().AccessTokenURL, "application/x-www-form-urlencoded", buf) + if err != nil { + panic(err) + } + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + panic(err) + } + + pairs, _ := url.ParseQuery(string(body)) + + t.Result <- pairs.Get("access_token") +} diff --git a/task/common.go b/task/common.go index 0f38f59..18b394a 100644 --- a/task/common.go +++ b/task/common.go @@ -6,6 +6,13 @@ import ( "github.com/localhots/empact/db" ) +type ( + Tasker interface { + Save() + T() *db.Task + } +) + func newGithubClient(token string) *github.Client { trans := &oauth.Transport{ Token: &oauth.Token{AccessToken: token}, diff --git a/task/sync_contrib.go b/task/sync_contrib.go index 2fbadbf..f1d7aa9 100644 --- a/task/sync_contrib.go +++ b/task/sync_contrib.go @@ -7,11 +7,12 @@ import ( type ( SyncContribTask struct { Repo string - db.Task + *db.Task } ) -func SyncContrib(t SyncContribTask) { +func SyncContrib(tk Tasker) { + t := tk.(*SyncContribTask) client := newGithubClient(t.Token) contribs, resp, err := client.Repositories.ListContributorsStats(t.Owner, t.Repo) saveResponseMeta(t.Token, resp) diff --git a/task/sync_repos.go b/task/sync_repos.go index af7a3e1..0dad71c 100644 --- a/task/sync_repos.go +++ b/task/sync_repos.go @@ -7,13 +7,13 @@ import ( type ( SyncReposTask struct { - db.Task + *db.Task } ) -func SyncRepos(t SyncReposTask) { - client := newGithubClient(token) - names := []string{} +func SyncRepos(tk Tasker) { + t := tk.(*SyncReposTask) + client := newGithubClient(t.Token) opt := &github.RepositoryListByOrgOptions{ ListOptions: github.ListOptions{}, }