We don't need jobs!
This commit is contained in:
		
							parent
							
								
									3e0ac1ea2d
								
							
						
					
					
						commit
						522d53b14b
					
				
							
								
								
									
										89
									
								
								job/job.go
									
									
									
									
									
								
							
							
						
						
									
										89
									
								
								job/job.go
									
									
									
									
									
								
							@ -1,89 +0,0 @@
 | 
			
		||||
package job
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"code.google.com/p/go-uuid/uuid"
 | 
			
		||||
 | 
			
		||||
	"github.com/fatih/structs"
 | 
			
		||||
	"github.com/localhots/empact/task"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type (
 | 
			
		||||
	Job struct {
 | 
			
		||||
		Name    string
 | 
			
		||||
		actor   func(task.Tasker)
 | 
			
		||||
		workers map[string]*worker
 | 
			
		||||
		orders  chan struct{} // Currently shutdown only
 | 
			
		||||
		tasks   chan task.Tasker
 | 
			
		||||
		wg      sync.WaitGroup
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	jobs = map[string]*Job{}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func Enqueue(t task.Tasker) {
 | 
			
		||||
	dt := t.T()
 | 
			
		||||
	dt.Job = structs.Name(t)
 | 
			
		||||
	dt.CreatedAt = time.Now()
 | 
			
		||||
 | 
			
		||||
	j, ok := jobs[dt.Job]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		switch dt.Job {
 | 
			
		||||
		case "FetchAccessTokenTask":
 | 
			
		||||
			j = New(dt.Job, task.FetchAccessToken)
 | 
			
		||||
		case "SyncContribTask":
 | 
			
		||||
			j = New(dt.Job, task.SyncContrib)
 | 
			
		||||
		case "SyncReposTask":
 | 
			
		||||
			j = New(dt.Job, task.SyncRepos)
 | 
			
		||||
		default:
 | 
			
		||||
			panic("Unknown task: " + dt.Job)
 | 
			
		||||
		}
 | 
			
		||||
		jobs[dt.Job] = j
 | 
			
		||||
		j.Resize(1)
 | 
			
		||||
	}
 | 
			
		||||
	j.tasks <- t
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func New(name string, actor func(task.Tasker)) *Job {
 | 
			
		||||
	return &Job{
 | 
			
		||||
		Name:    name,
 | 
			
		||||
		actor:   actor,
 | 
			
		||||
		workers: make(map[string]*worker),
 | 
			
		||||
		orders:  make(chan struct{}),
 | 
			
		||||
		tasks:   make(chan task.Tasker, 1000),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (j *Job) Size() int {
 | 
			
		||||
	return len(j.workers)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (j *Job) Stop() {
 | 
			
		||||
	j.Resize(0)
 | 
			
		||||
	j.wg.Wait()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (j *Job) Resize(n int) {
 | 
			
		||||
	if n < 0 {
 | 
			
		||||
		n = 0
 | 
			
		||||
	}
 | 
			
		||||
	if del := n - len(j.workers); del > 0 {
 | 
			
		||||
		for i := 0; i < del; i++ {
 | 
			
		||||
			w := &worker{
 | 
			
		||||
				id:  uuid.New(),
 | 
			
		||||
				job: j,
 | 
			
		||||
			}
 | 
			
		||||
			go w.workHard()
 | 
			
		||||
			j.workers[w.id] = w
 | 
			
		||||
		}
 | 
			
		||||
		j.wg.Add(del)
 | 
			
		||||
	} else {
 | 
			
		||||
		for i := 0; i > del; i-- {
 | 
			
		||||
			j.orders <- struct{}{}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@ -1,43 +0,0 @@
 | 
			
		||||
package job
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/localhots/empact/task"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type (
 | 
			
		||||
	worker struct {
 | 
			
		||||
		id  string
 | 
			
		||||
		job *Job
 | 
			
		||||
		wg  *sync.WaitGroup
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func (w *worker) workHard() {
 | 
			
		||||
	defer w.wg.Done()
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-w.job.orders:
 | 
			
		||||
			return
 | 
			
		||||
		case t := <-w.job.tasks:
 | 
			
		||||
			w.perform(t)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *worker) perform(t task.Tasker) {
 | 
			
		||||
	dt := t.T()
 | 
			
		||||
	dt.Worker = w.id
 | 
			
		||||
	dt.StartedAt = time.Now()
 | 
			
		||||
	defer func() {
 | 
			
		||||
		if err := recover(); err != nil {
 | 
			
		||||
			// dt.Error = err.(string)
 | 
			
		||||
		}
 | 
			
		||||
		dt.Duration = time.Since(dt.StartedAt).Nanoseconds()
 | 
			
		||||
		dt.Save()
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	w.job.actor(t)
 | 
			
		||||
}
 | 
			
		||||
@ -12,20 +12,11 @@ import (
 | 
			
		||||
	"github.com/localhots/empact/db"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type (
 | 
			
		||||
	FetchAccessTokenTask struct {
 | 
			
		||||
		Code   string
 | 
			
		||||
		Result chan string
 | 
			
		||||
		*db.Task
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func FetchAccessToken(tk Tasker) {
 | 
			
		||||
	t := tk.(*FetchAccessTokenTask)
 | 
			
		||||
func FetchAccessToken(code string, result chan string) {
 | 
			
		||||
	payload := url.Values{}
 | 
			
		||||
	payload.Set("client_id", config.C().ClientID)
 | 
			
		||||
	payload.Set("client_secret", config.C().ClientSecret)
 | 
			
		||||
	payload.Set("code", t.Code)
 | 
			
		||||
	payload.Set("code", code)
 | 
			
		||||
	payload.Set("redirect_uri", config.C().RedirectURI)
 | 
			
		||||
 | 
			
		||||
	buf := bytes.NewBuffer([]byte(payload.Encode()))
 | 
			
		||||
@ -72,5 +63,5 @@ func FetchAccessToken(tk Tasker) {
 | 
			
		||||
	fmt.Println("Saving token", tok)
 | 
			
		||||
	tok.Save()
 | 
			
		||||
 | 
			
		||||
	t.Result <- user.Login
 | 
			
		||||
	result <- user.Login
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -6,13 +6,6 @@ import (
 | 
			
		||||
	"github.com/localhots/empact/db"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type (
 | 
			
		||||
	Tasker interface {
 | 
			
		||||
		Save()
 | 
			
		||||
		T() *db.Task
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func newGithubClient(token string) *github.Client {
 | 
			
		||||
	trans := &oauth.Transport{
 | 
			
		||||
		Token: &oauth.Token{AccessToken: token},
 | 
			
		||||
 | 
			
		||||
@ -4,22 +4,13 @@ import (
 | 
			
		||||
	"github.com/localhots/empact/db"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type (
 | 
			
		||||
	SyncContribTask struct {
 | 
			
		||||
		Repo string
 | 
			
		||||
		*db.Task
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func SyncContrib(tk Tasker) {
 | 
			
		||||
	t := tk.(*SyncContribTask)
 | 
			
		||||
	client := newGithubClient(t.Token)
 | 
			
		||||
	contribs, resp, err := client.Repositories.ListContributorsStats(t.Owner, t.Repo)
 | 
			
		||||
	saveResponseMeta(t.Token, resp)
 | 
			
		||||
func SyncContrib(token, owner, repo string) {
 | 
			
		||||
	client := newGithubClient(token)
 | 
			
		||||
	contribs, resp, err := client.Repositories.ListContributorsStats(owner, repo)
 | 
			
		||||
	saveResponseMeta(token, resp)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if err.Error() == "EOF" {
 | 
			
		||||
			// Empty repository, not an actual error
 | 
			
		||||
			return
 | 
			
		||||
			return // Empty repository, not an actual error
 | 
			
		||||
		}
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -5,29 +5,22 @@ import (
 | 
			
		||||
	"github.com/localhots/empact/db"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type (
 | 
			
		||||
	SyncReposTask struct {
 | 
			
		||||
		*db.Task
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func SyncRepos(tk Tasker) {
 | 
			
		||||
	t := tk.(*SyncReposTask)
 | 
			
		||||
	client := newGithubClient(t.Token)
 | 
			
		||||
func SyncRepos(token, owner string) {
 | 
			
		||||
	client := newGithubClient(token)
 | 
			
		||||
	opt := &github.RepositoryListByOrgOptions{
 | 
			
		||||
		ListOptions: github.ListOptions{},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		opt.Page++
 | 
			
		||||
		repos, resp, err := client.Repositories.ListByOrg(t.Owner, opt)
 | 
			
		||||
		saveResponseMeta(t.Token, resp)
 | 
			
		||||
		repos, resp, err := client.Repositories.ListByOrg(owner, opt)
 | 
			
		||||
		saveResponseMeta(token, resp)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			panic(err)
 | 
			
		||||
		}
 | 
			
		||||
		for _, repo := range repos {
 | 
			
		||||
			r := &db.Repo{
 | 
			
		||||
				Owner: t.Owner,
 | 
			
		||||
				Owner: owner,
 | 
			
		||||
				Name:  *repo.Name,
 | 
			
		||||
			}
 | 
			
		||||
			r.Save()
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user