Make Satan not backend opinionated
This commit is contained in:
parent
dff6c81d04
commit
87b8e851cc
57
satan.go
57
satan.go
|
@ -4,24 +4,40 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/localhots/satan/backend"
|
|
||||||
"github.com/localhots/satan/backend/kafka"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Satan is the master daemon.
|
// Satan is the master daemon.
|
||||||
type Satan struct {
|
type Satan struct {
|
||||||
|
SubscribeFunc SubscribeFunc
|
||||||
|
Publisher Publisher
|
||||||
|
|
||||||
daemons []Daemon
|
daemons []Daemon
|
||||||
backend backend.Backend
|
|
||||||
queue chan *task
|
queue chan *task
|
||||||
|
shutdown chan struct{}
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
latency *statistics
|
latency *statistics
|
||||||
shutdown chan struct{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Actor is a function that could be executed by daemon workers.
|
// Actor is a function that could be executed by daemon workers.
|
||||||
type Actor func()
|
type Actor func()
|
||||||
|
|
||||||
|
// SubscribeFunc is a function that is used by daemons to subscribe to messages.
|
||||||
|
type SubscribeFunc func(consumer, topic string) Streamer
|
||||||
|
|
||||||
|
// Streamer is the interface that wraps message consumers. Error handling
|
||||||
|
// should be provided by the implementation. Feel free to panic.
|
||||||
|
type Streamer interface {
|
||||||
|
Messages() <-chan []byte
|
||||||
|
Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publisher is the interface that wraps message publishers. Error handling
|
||||||
|
// should be provided by the implementation. Feel free to panic.
|
||||||
|
type Publisher interface {
|
||||||
|
Publish(msg []byte)
|
||||||
|
Close()
|
||||||
|
}
|
||||||
|
|
||||||
type task struct {
|
type task struct {
|
||||||
daemon Daemon
|
daemon Daemon
|
||||||
actor Actor
|
actor Actor
|
||||||
|
@ -44,12 +60,15 @@ func Summon() *Satan {
|
||||||
|
|
||||||
// AddDaemon adds a new daemon.
|
// AddDaemon adds a new daemon.
|
||||||
func (s *Satan) AddDaemon(d Daemon) {
|
func (s *Satan) AddDaemon(d Daemon) {
|
||||||
d.initialize(d, s.queue)
|
base := d.base()
|
||||||
if c, ok := d.(Consumer); ok {
|
base.self = d
|
||||||
c.setBackend(s.backend)
|
base.subscribeFunc = s.SubscribeFunc
|
||||||
}
|
base.publisher = s.Publisher
|
||||||
go d.Startup()
|
base.queue = s.queue
|
||||||
|
base.shutdown = make(chan struct{})
|
||||||
|
base.stats = newStatistics()
|
||||||
|
|
||||||
|
go d.Startup()
|
||||||
s.daemons = append(s.daemons, d)
|
s.daemons = append(s.daemons, d)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,7 +95,6 @@ func (s *Satan) StopDaemons() {
|
||||||
close(s.shutdown)
|
close(s.shutdown)
|
||||||
s.wg.Wait()
|
s.wg.Wait()
|
||||||
close(s.queue)
|
close(s.queue)
|
||||||
s.backend.Close()
|
|
||||||
|
|
||||||
log.Printf("Task processing latency statistics:\n%s\n", s.latency.snapshot())
|
log.Printf("Task processing latency statistics:\n%s\n", s.latency.snapshot())
|
||||||
}
|
}
|
||||||
|
@ -112,20 +130,3 @@ func (s *Satan) processTask(t *task) {
|
||||||
dur := time.Now().UnixNano() - start.UnixNano()
|
dur := time.Now().UnixNano() - start.UnixNano()
|
||||||
t.daemon.base().stats.add(time.Duration(dur))
|
t.daemon.base().stats.add(time.Duration(dur))
|
||||||
}
|
}
|
||||||
|
|
||||||
// InitializeKafka initializes Kafka backend.
|
|
||||||
func (s *Satan) InitializeKafka(id string, brokers []string) error {
|
|
||||||
k, err := kafka.New(id, brokers)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err = k.InitializeProducer(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err = k.InitializeConsumer(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
s.backend = k
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue