1
0
Fork 0

Handle task scheduling in case of premature process termination

This commit is contained in:
Gregory Eremin 2016-01-25 04:03:13 +03:00
parent 08cf3bf4da
commit 43a3bd98a1
1 changed files with 15 additions and 4 deletions

View File

@ -71,23 +71,24 @@ func (d *BaseDaemon) Process(a Actor) {
if d.limit != nil { if d.limit != nil {
d.limit.Wait(1) d.limit.Wait(1)
} }
d.queue <- &task{
d.tryEnqueue(&task{
daemon: d.self, daemon: d.self,
actor: a, actor: a,
createdAt: time.Now(), createdAt: time.Now(),
} })
} }
// SystemProcess creates a system task that is restarted in case of failure // SystemProcess creates a system task that is restarted in case of failure
// and then adds it to processing queue. // and then adds it to processing queue.
func (d *BaseDaemon) SystemProcess(name string, a Actor) { func (d *BaseDaemon) SystemProcess(name string, a Actor) {
d.queue <- &task{ d.tryEnqueue(&task{
daemon: d.self, daemon: d.self,
actor: a, actor: a,
createdAt: time.Now(), createdAt: time.Now(),
system: true, system: true,
name: name, name: name,
} })
} }
// Subscribe subscriasdsdfsdgdfgdfsg sdgsdfg sdfgs dfgdfgdfg. // Subscribe subscriasdsdfsdgdfgdfsg sdgsdfg sdfgs dfgdfgdfg.
@ -191,6 +192,16 @@ func (d *BaseDaemon) base() *BaseDaemon {
return d return d
} }
func (d *BaseDaemon) tryEnqueue(t *task) {
defer func() {
if err := recover(); err != nil {
d.Logf("Failed to enqueue task %q due to process termination", t)
}
}()
d.queue <- t
}
func (d *BaseDaemon) handlePanic(err interface{}) { func (d *BaseDaemon) handlePanic(err interface{}) {
if d.panicHandler != nil { if d.panicHandler != nil {
d.panicHandler(err) d.panicHandler(err)