1
0
Fork 0

Move task to a separate file, move processing logic to task.process

This commit is contained in:
Gregory Eremin 2015-10-17 06:16:00 +03:00
parent e8aebe3b1c
commit 559886f297
2 changed files with 47 additions and 40 deletions

View File

@ -1,9 +1,7 @@
package satan
import (
"fmt"
"log"
"runtime/debug"
"sync"
"time"
)
@ -40,14 +38,6 @@ type Publisher interface {
Close()
}
type task struct {
daemon Daemon
actor Actor
createdAt time.Time
system bool
name string
}
const (
defaultNumWorkers = 10
)
@ -112,7 +102,9 @@ func (s *Satan) runWorker(i int) {
case t := <-s.queue:
dur := time.Now().UnixNano() - t.createdAt.UnixNano()
s.latency.add(time.Duration(dur))
s.processTask(t)
if restart := t.process(); restart {
s.queue <- t
}
default:
select {
case <-s.shutdown:
@ -122,32 +114,3 @@ func (s *Satan) runWorker(i int) {
}
}
}
func (s *Satan) processTask(t *task) {
defer func(start time.Time) {
dur := time.Now().UnixNano() - start.UnixNano()
t.daemon.base().stats.add(time.Duration(dur))
if err := recover(); err != nil {
if t.system {
log.Printf("System process %s recovered from a panic\nError: %v\n", t, err)
debug.PrintStack()
// Restarting system task
s.queue <- t
} else {
t.daemon.base().handlePanic(err)
}
}
}(time.Now())
t.actor() // <--- THE ACTION HAPPENS HERE
}
func (t *task) String() string {
if t.name == "" {
return fmt.Sprintf("[unnamed %s process]", t.daemon.base())
}
return fmt.Sprintf("%s[%s]", t.daemon.base(), t.name)
}

44
task.go Normal file
View File

@ -0,0 +1,44 @@
package satan
import (
"fmt"
"log"
"runtime/debug"
"time"
)
type task struct {
daemon Daemon
actor Actor
createdAt time.Time
system bool
name string
}
func (t *task) process() (restart bool) {
defer func(start time.Time) {
dur := time.Now().UnixNano() - start.UnixNano()
t.daemon.base().stats.add(time.Duration(dur))
if err := recover(); err != nil {
if t.system {
log.Printf("System process %s recovered from a panic\nError: %v\n", t, err)
debug.PrintStack()
restart = true
} else {
t.daemon.base().handlePanic(err)
}
}
}(time.Now())
t.actor() // <--- THE ACTION HAPPENS HERE
return
}
func (t *task) String() string {
if t.name == "" {
return fmt.Sprintf("[unnamed %s process]", t.daemon.base())
}
return fmt.Sprintf("%s[%s]", t.daemon.base(), t.name)
}