Add consumer support to satan
This commit is contained in:
parent
8dd218a366
commit
4db43004ae
17
satan.go
17
satan.go
@ -4,21 +4,28 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/localhots/satan/backend"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Satan is the master daemon.
|
// Satan is the master daemon.
|
||||||
type Satan struct {
|
type Satan struct {
|
||||||
daemons []Daemon
|
daemons []Daemon
|
||||||
|
backend backend.Backend
|
||||||
queue chan *task
|
queue chan *task
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
latency *statistics
|
latency *statistics
|
||||||
shutdown chan struct{}
|
shutdown chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Actor is a function that could be executed by daemon workers.
|
||||||
|
type Actor func()
|
||||||
|
|
||||||
type task struct {
|
type task struct {
|
||||||
daemon Daemon
|
daemon Daemon
|
||||||
actor Actor
|
actor Actor
|
||||||
createdAt time.Time
|
createdAt time.Time
|
||||||
|
system bool
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -36,7 +43,10 @@ func Summon() *Satan {
|
|||||||
|
|
||||||
// AddDaemon adds a new daemon.
|
// AddDaemon adds a new daemon.
|
||||||
func (s *Satan) AddDaemon(d Daemon) {
|
func (s *Satan) AddDaemon(d Daemon) {
|
||||||
d.base().initialize(d, s.queue)
|
d.initialize(d, s.queue)
|
||||||
|
if c, ok := d.(Consumer); ok {
|
||||||
|
c.setBackend(s.backend)
|
||||||
|
}
|
||||||
go d.Startup()
|
go d.Startup()
|
||||||
|
|
||||||
s.daemons = append(s.daemons, d)
|
s.daemons = append(s.daemons, d)
|
||||||
@ -55,6 +65,8 @@ func (s *Satan) StartDaemons() {
|
|||||||
|
|
||||||
// StopDaemons stops all running daemons.
|
// StopDaemons stops all running daemons.
|
||||||
func (s *Satan) StopDaemons() {
|
func (s *Satan) StopDaemons() {
|
||||||
|
// First closing backend consumers will begin to close
|
||||||
|
s.backend.Close()
|
||||||
for _, d := range s.daemons {
|
for _, d := range s.daemons {
|
||||||
close(d.base().shutdown)
|
close(d.base().shutdown)
|
||||||
d.Shutdown()
|
d.Shutdown()
|
||||||
@ -62,8 +74,9 @@ func (s *Satan) StopDaemons() {
|
|||||||
log.Printf("%s daemon performace statistics:\n%s\n",
|
log.Printf("%s daemon performace statistics:\n%s\n",
|
||||||
d.base(), d.base().stats.snapshot())
|
d.base(), d.base().stats.snapshot())
|
||||||
}
|
}
|
||||||
close(s.queue)
|
close(s.shutdown)
|
||||||
s.wg.Wait()
|
s.wg.Wait()
|
||||||
|
close(s.queue)
|
||||||
|
|
||||||
log.Printf("Task processing latency statistics:\n%s\n", s.latency.snapshot())
|
log.Printf("Task processing latency statistics:\n%s\n", s.latency.snapshot())
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user