From 797e89212070a495432fdc7fa6ee2ab75b4455a3 Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Sat, 24 Oct 2015 19:25:16 +0300 Subject: [PATCH] Customizable logger --- daemon.go | 17 +++++++++++++---- example/daemons/number_printer.go | 5 ++--- example/main.go | 15 +++------------ satan.go | 18 +++++++++++------- 4 files changed, 29 insertions(+), 26 deletions(-) diff --git a/daemon.go b/daemon.go index 27af83f..ab6f747 100644 --- a/daemon.go +++ b/daemon.go @@ -45,12 +45,13 @@ type Daemon interface { // BaseDaemon is the parent structure for all daemons. type BaseDaemon struct { - subscribeFunc SubscribeFunc - publisher Publisher self Daemon name string queue chan<- *task + logger *log.Logger panicHandler PanicHandler + subscribeFunc SubscribeFunc + publisher Publisher shutdown chan struct{} limit *ratelimit.Bucket } @@ -127,10 +128,10 @@ func (d *BaseDaemon) Publish(msg []byte) { func (d *BaseDaemon) LimitRate(times int, per time.Duration) { rate := float64(time.Second) / float64(per) * float64(times) if rate <= 0 { - log.Println("Daemon %s processing rate was limited to %d. Using 1 instead", d.base(), rate) + d.logger.Println("Daemon %s processing rate was limited to %d. Using 1 instead", d.base(), rate) rate = 1.0 } - log.Printf("Daemon %s processing rate is limited to %.2f ops/s", d.base(), rate) + d.logger.Printf("Daemon %s processing rate is limited to %.2f ops/s", d.base(), rate) d.limit = ratelimit.NewBucketWithRate(rate, 1) } @@ -155,6 +156,14 @@ func (d *BaseDaemon) Continue() bool { } } +func (d *BaseDaemon) Log(v ...interface{}) { + d.logger.Println(v...) +} + +func (d *BaseDaemon) Logf(format string, v ...interface{}) { + d.logger.Printf(format, v...) +} + // String returns the name of the Deamon unerlying struct. func (d *BaseDaemon) String() string { if d.name == "" { diff --git a/example/daemons/number_printer.go b/example/daemons/number_printer.go index 41ca0b2..1290c61 100644 --- a/example/daemons/number_printer.go +++ b/example/daemons/number_printer.go @@ -1,7 +1,6 @@ package daemons import ( - "log" "math/rand" "time" @@ -16,7 +15,7 @@ type NumberPrinter struct { // Startup sets up panic handler and starts enqueuing number printing jobs. func (n *NumberPrinter) Startup() { n.HandlePanics(func(err interface{}) { - log.Printf("Oh, crap! There was a panic, take a look: %v", err) + n.Logf("Oh, crap! There was a panic, take a look: %v", err) }) n.LimitRate(3, time.Second) @@ -44,6 +43,6 @@ func (n *NumberPrinter) makeActor(num int) satan.Actor { panic("Nooooo! Random number generator returned a zero!") } - log.Println("Number printer says:", num) + n.Log("Number printer says:", num) } } diff --git a/example/main.go b/example/main.go index afb1ceb..0ca346b 100644 --- a/example/main.go +++ b/example/main.go @@ -2,8 +2,6 @@ package main import ( "flag" - "io/ioutil" - "log" "os" "os/signal" "strings" @@ -15,27 +13,20 @@ import ( ) func main() { - var debug bool var brokers string - flag.BoolVar(&debug, "v", false, "Verbose mode") flag.StringVar(&brokers, "brokers", "127.0.0.1:9092", "Kafka broker addresses separated by space") flag.Parse() - log.SetOutput(ioutil.Discard) - if debug { - log.SetOutput(os.Stderr) - } - kafka.Initialize(strings.Split(brokers, " ")) defer kafka.Shutdown() - logger := stats.NewStdoutLogger(0) - defer logger.Print() + statsLogger := stats.NewStdoutLogger(0) + defer statsLogger.Print() s := satan.Summon() s.SubscribeFunc = kafka.Subscribe - s.Statistics = logger + s.Statistics = statsLogger s.AddDaemon(&daemons.NumberPrinter{}) s.AddDaemon(&daemons.PriceConsumer{}) diff --git a/satan.go b/satan.go index 9613d2f..2e3e3df 100644 --- a/satan.go +++ b/satan.go @@ -3,6 +3,7 @@ package satan import ( "fmt" "log" + "os" "runtime/debug" "sync" "sync/atomic" @@ -14,6 +15,7 @@ type Satan struct { SubscribeFunc SubscribeFunc Publisher Publisher Statistics StatsPublisher + Logger *log.Logger daemons []Daemon queue chan *task @@ -83,6 +85,7 @@ var ( // Summon creates a new instance of Satan. func Summon() *Satan { return &Satan{ + Logger: log.New(os.Stdout, "[daemons] ", log.LstdFlags), queue: make(chan *task), shutdownWorkers: make(chan struct{}), shutdownSystem: make(chan struct{}), @@ -96,6 +99,7 @@ func (s *Satan) AddDaemon(d Daemon) { base.subscribeFunc = s.SubscribeFunc base.publisher = s.Publisher base.queue = s.queue + base.logger = s.Logger base.shutdown = s.shutdownSystem go d.Startup() @@ -137,11 +141,11 @@ func (s *Satan) runWorker() { defer s.wgWorkers.Done() i := atomic.AddUint64(&workerIndex, 1) - log.Printf("Starting worker #%d", i) + s.Logger.Printf("Starting worker #%d", i) defer func() { if err := recover(); err != nil { - log.Printf("Worker #%d crashed. Error: %v\n", i, err) + s.Logger.Printf("Worker #%d crashed. Error: %v\n", i, err) debug.PrintStack() go s.runWorker() // Restarting worker } @@ -155,7 +159,7 @@ func (s *Satan) runWorker() { s.Statistics.Add("TaskWait", time.Duration(dur)) s.processTask(t) case <-s.shutdownWorkers: - log.Printf("Worker #%d has stopped", i) + s.Logger.Printf("Worker #%d has stopped", i) return } } @@ -177,17 +181,17 @@ func (s *Satan) processSystemTask(t *task) { defer s.wgSystem.Done() defer func() { if err := recover(); err != nil { - log.Printf("System task %s recovered from a panic\nError: %v\n", t, err) + s.Logger.Printf("System task %s recovered from a panic\nError: %v\n", t, err) debug.PrintStack() t.createdAt = time.Now() s.queue <- t // Restarting task } else { - log.Printf("System task %s has stopped\n", t) + s.Logger.Printf("System task %s has stopped\n", t) } }() - log.Printf("Starting system task %s\n", t) + s.Logger.Printf("Starting system task %s\n", t) t.actor() // <--- ACTION STARTS HERE } @@ -198,7 +202,7 @@ func (s *Satan) processGeneralTask(t *task) { s.Statistics.Error(t.daemon.base().String()) } t.daemon.base().handlePanic(err) - log.Printf("Daemon %s recovered from a panic\nError: %v\n", t.daemon.base(), err) + s.Logger.Printf("Daemon %s recovered from a panic\nError: %v\n", t.daemon.base(), err) debug.PrintStack() } }()