It is possible now to run system processes in the shared pool
This commit is contained in:
parent
86c1b42540
commit
8dd218a366
73
daemon.go
73
daemon.go
@ -38,10 +38,11 @@ type Daemon interface {
|
|||||||
// base is a (hack) function that allows the Daemon interface to reference
|
// base is a (hack) function that allows the Daemon interface to reference
|
||||||
// underlying BaseDaemon structure.
|
// underlying BaseDaemon structure.
|
||||||
base() *BaseDaemon
|
base() *BaseDaemon
|
||||||
}
|
|
||||||
|
|
||||||
// Actor is a function that could be executed by daemon workers.
|
// initialize is also a hack that is used by the Satan to initialize
|
||||||
type Actor func()
|
// base daemon fields.
|
||||||
|
initialize(self Daemon, queue chan<- *task)
|
||||||
|
}
|
||||||
|
|
||||||
// BaseDaemon is the parent structure for all daemons.
|
// BaseDaemon is the parent structure for all daemons.
|
||||||
type BaseDaemon struct {
|
type BaseDaemon struct {
|
||||||
@ -54,61 +55,77 @@ type BaseDaemon struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Process creates a task and then adds it to processing queue.
|
// Process creates a task and then adds it to processing queue.
|
||||||
func (b *BaseDaemon) Process(a Actor) {
|
func (d *BaseDaemon) Process(a Actor) {
|
||||||
b.queue <- &task{
|
d.enqueue(a, false)
|
||||||
daemon: b.self,
|
}
|
||||||
|
|
||||||
|
// SystemProcess creates a system task that is restarted in case of failure
|
||||||
|
// and then adds it to processing queue.
|
||||||
|
func (d *BaseDaemon) SystemProcess(a Actor) {
|
||||||
|
d.enqueue(a, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *BaseDaemon) enqueue(a Actor, system bool) {
|
||||||
|
d.queue <- &task{
|
||||||
|
daemon: d.self,
|
||||||
actor: a,
|
actor: a,
|
||||||
createdAt: time.Now(),
|
createdAt: time.Now(),
|
||||||
|
system: system,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandlePanics sets up a panic handler function for the daemon.
|
// HandlePanics sets up a panic handler function for the daemon.
|
||||||
func (b *BaseDaemon) HandlePanics(f func()) {
|
func (d *BaseDaemon) HandlePanics(f func()) {
|
||||||
b.panicHandler = f
|
d.panicHandler = f
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShutdownRequested returns a channel that is closed the moment daemon shutdown
|
// ShutdownRequested returns a channel that is closed the moment daemon shutdown
|
||||||
// is requested.
|
// is requested.
|
||||||
func (b *BaseDaemon) ShutdownRequested() <-chan struct{} {
|
func (d *BaseDaemon) ShutdownRequested() <-chan struct{} {
|
||||||
return b.shutdown
|
return d.shutdown
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShouldShutdown returns true if daemon should shutdown and false otherwise.
|
// ShouldShutdown returns true if daemon should shutdown and false otherwise.
|
||||||
func (b *BaseDaemon) ShouldShutdown() bool {
|
func (d *BaseDaemon) ShouldShutdown() bool {
|
||||||
return b.shutdown == nil
|
select {
|
||||||
|
case <-d.shutdown:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// String returns the name of the Deamon unerlying struct.
|
// String returns the name of the Deamon unerlying struct.
|
||||||
func (b *BaseDaemon) String() string {
|
func (d *BaseDaemon) String() string {
|
||||||
if b.name == "" {
|
if d.name == "" {
|
||||||
b.name = strings.Split(fmt.Sprintf("%T", b.self), ".")[1]
|
d.name = strings.Split(fmt.Sprintf("%T", d.self), ".")[1]
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.name
|
return d.name
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize saves a reference to the child daemon which is then used to print
|
// initialize saves a reference to the child daemon which is then used to print
|
||||||
// the daemons' name. It also initializes other struct fields.
|
// the daemons' name. It also initializes other struct fields.
|
||||||
func (b *BaseDaemon) initialize(self Daemon, queue chan<- *task) {
|
func (d *BaseDaemon) initialize(self Daemon, queue chan<- *task) {
|
||||||
b.self = self
|
d.self = self
|
||||||
b.stats = newStatistics()
|
d.stats = newStatistics()
|
||||||
b.queue = queue
|
d.queue = queue
|
||||||
b.shutdown = make(chan struct{})
|
d.shutdown = make(chan struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// base is a (hack) function that allows the Daemon interface to reference
|
// base is a (hack) function that allows the Daemon interface to reference
|
||||||
// underlying BaseDaemon structure.
|
// underlying BaseDaemon structure.
|
||||||
func (b *BaseDaemon) base() *BaseDaemon {
|
func (d *BaseDaemon) base() *BaseDaemon {
|
||||||
return b
|
return d
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BaseDaemon) handlePanic() {
|
func (d *BaseDaemon) handlePanic() {
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
b.stats.registerError()
|
d.stats.registerError()
|
||||||
if b.panicHandler != nil {
|
if d.panicHandler != nil {
|
||||||
b.panicHandler()
|
d.panicHandler()
|
||||||
}
|
}
|
||||||
log.Printf("Daemon %s recovered from panic. Error: %v\n", b, err)
|
log.Printf("Daemon %s recovered from panic. Error: %v\n", d, err)
|
||||||
debug.PrintStack()
|
debug.PrintStack()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user