1
0
Fork 0

Customizable logger

This commit is contained in:
Gregory Eremin 2015-10-24 19:25:16 +03:00
parent 4465d2c561
commit 797e892120
4 changed files with 29 additions and 26 deletions

View File

@ -45,12 +45,13 @@ type Daemon interface {
// BaseDaemon is the parent structure for all daemons.
type BaseDaemon struct {
subscribeFunc SubscribeFunc
publisher Publisher
self Daemon
name string
queue chan<- *task
logger *log.Logger
panicHandler PanicHandler
subscribeFunc SubscribeFunc
publisher Publisher
shutdown chan struct{}
limit *ratelimit.Bucket
}
@ -127,10 +128,10 @@ func (d *BaseDaemon) Publish(msg []byte) {
func (d *BaseDaemon) LimitRate(times int, per time.Duration) {
rate := float64(time.Second) / float64(per) * float64(times)
if rate <= 0 {
log.Println("Daemon %s processing rate was limited to %d. Using 1 instead", d.base(), rate)
d.logger.Println("Daemon %s processing rate was limited to %d. Using 1 instead", d.base(), rate)
rate = 1.0
}
log.Printf("Daemon %s processing rate is limited to %.2f ops/s", d.base(), rate)
d.logger.Printf("Daemon %s processing rate is limited to %.2f ops/s", d.base(), rate)
d.limit = ratelimit.NewBucketWithRate(rate, 1)
}
@ -155,6 +156,14 @@ func (d *BaseDaemon) Continue() bool {
}
}
func (d *BaseDaemon) Log(v ...interface{}) {
d.logger.Println(v...)
}
func (d *BaseDaemon) Logf(format string, v ...interface{}) {
d.logger.Printf(format, v...)
}
// String returns the name of the Deamon unerlying struct.
func (d *BaseDaemon) String() string {
if d.name == "" {

View File

@ -1,7 +1,6 @@
package daemons
import (
"log"
"math/rand"
"time"
@ -16,7 +15,7 @@ type NumberPrinter struct {
// Startup sets up panic handler and starts enqueuing number printing jobs.
func (n *NumberPrinter) Startup() {
n.HandlePanics(func(err interface{}) {
log.Printf("Oh, crap! There was a panic, take a look: %v", err)
n.Logf("Oh, crap! There was a panic, take a look: %v", err)
})
n.LimitRate(3, time.Second)
@ -44,6 +43,6 @@ func (n *NumberPrinter) makeActor(num int) satan.Actor {
panic("Nooooo! Random number generator returned a zero!")
}
log.Println("Number printer says:", num)
n.Log("Number printer says:", num)
}
}

View File

@ -2,8 +2,6 @@ package main
import (
"flag"
"io/ioutil"
"log"
"os"
"os/signal"
"strings"
@ -15,27 +13,20 @@ import (
)
func main() {
var debug bool
var brokers string
flag.BoolVar(&debug, "v", false, "Verbose mode")
flag.StringVar(&brokers, "brokers", "127.0.0.1:9092", "Kafka broker addresses separated by space")
flag.Parse()
log.SetOutput(ioutil.Discard)
if debug {
log.SetOutput(os.Stderr)
}
kafka.Initialize(strings.Split(brokers, " "))
defer kafka.Shutdown()
logger := stats.NewStdoutLogger(0)
defer logger.Print()
statsLogger := stats.NewStdoutLogger(0)
defer statsLogger.Print()
s := satan.Summon()
s.SubscribeFunc = kafka.Subscribe
s.Statistics = logger
s.Statistics = statsLogger
s.AddDaemon(&daemons.NumberPrinter{})
s.AddDaemon(&daemons.PriceConsumer{})

View File

@ -3,6 +3,7 @@ package satan
import (
"fmt"
"log"
"os"
"runtime/debug"
"sync"
"sync/atomic"
@ -14,6 +15,7 @@ type Satan struct {
SubscribeFunc SubscribeFunc
Publisher Publisher
Statistics StatsPublisher
Logger *log.Logger
daemons []Daemon
queue chan *task
@ -83,6 +85,7 @@ var (
// Summon creates a new instance of Satan.
func Summon() *Satan {
return &Satan{
Logger: log.New(os.Stdout, "[daemons] ", log.LstdFlags),
queue: make(chan *task),
shutdownWorkers: make(chan struct{}),
shutdownSystem: make(chan struct{}),
@ -96,6 +99,7 @@ func (s *Satan) AddDaemon(d Daemon) {
base.subscribeFunc = s.SubscribeFunc
base.publisher = s.Publisher
base.queue = s.queue
base.logger = s.Logger
base.shutdown = s.shutdownSystem
go d.Startup()
@ -137,11 +141,11 @@ func (s *Satan) runWorker() {
defer s.wgWorkers.Done()
i := atomic.AddUint64(&workerIndex, 1)
log.Printf("Starting worker #%d", i)
s.Logger.Printf("Starting worker #%d", i)
defer func() {
if err := recover(); err != nil {
log.Printf("Worker #%d crashed. Error: %v\n", i, err)
s.Logger.Printf("Worker #%d crashed. Error: %v\n", i, err)
debug.PrintStack()
go s.runWorker() // Restarting worker
}
@ -155,7 +159,7 @@ func (s *Satan) runWorker() {
s.Statistics.Add("TaskWait", time.Duration(dur))
s.processTask(t)
case <-s.shutdownWorkers:
log.Printf("Worker #%d has stopped", i)
s.Logger.Printf("Worker #%d has stopped", i)
return
}
}
@ -177,17 +181,17 @@ func (s *Satan) processSystemTask(t *task) {
defer s.wgSystem.Done()
defer func() {
if err := recover(); err != nil {
log.Printf("System task %s recovered from a panic\nError: %v\n", t, err)
s.Logger.Printf("System task %s recovered from a panic\nError: %v\n", t, err)
debug.PrintStack()
t.createdAt = time.Now()
s.queue <- t // Restarting task
} else {
log.Printf("System task %s has stopped\n", t)
s.Logger.Printf("System task %s has stopped\n", t)
}
}()
log.Printf("Starting system task %s\n", t)
s.Logger.Printf("Starting system task %s\n", t)
t.actor() // <--- ACTION STARTS HERE
}
@ -198,7 +202,7 @@ func (s *Satan) processGeneralTask(t *task) {
s.Statistics.Error(t.daemon.base().String())
}
t.daemon.base().handlePanic(err)
log.Printf("Daemon %s recovered from a panic\nError: %v\n", t.daemon.base(), err)
s.Logger.Printf("Daemon %s recovered from a panic\nError: %v\n", t.daemon.base(), err)
debug.PrintStack()
}
}()