From fd9b03a431a9b8bb28a5b681f97e0b2d18ef7a3e Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 16 Oct 2014 11:50:10 +0200 Subject: [PATCH] ethlog: fix concurrency Rather than spawning a new goroutine for each message, run each log system in a dedicated goroutine. Ensure that logging is still asynchronous by using a per-system buffer (currently 500 messages). If it overflows all logging will hang, but that's better than spawning indefinitely many goroutines. --- ethlog/loggers.go | 98 +++++++++++++++++++++++++++++------------------ 1 file changed, 60 insertions(+), 38 deletions(-) diff --git a/ethlog/loggers.go b/ethlog/loggers.go index f5ec4d4029..732d6a970f 100644 --- a/ethlog/loggers.go +++ b/ethlog/loggers.go @@ -47,75 +47,97 @@ const ( ) var ( - mutex sync.RWMutex // protects logSystems - logSystems []LogSystem - - logMessages = make(chan message) - drainWaitReq = make(chan chan struct{}) + logMessageC = make(chan message) + addSystemC = make(chan LogSystem) + flushC = make(chan chan struct{}) + resetC = make(chan chan struct{}) ) func init() { go dispatchLoop() } +// each system can buffer this many messages before +// blocking incoming log messages. +const sysBufferSize = 500 + func dispatchLoop() { - var drainWait []chan struct{} - dispatchDone := make(chan struct{}) - pending := 0 + var ( + systems []LogSystem + systemIn []chan message + systemWG sync.WaitGroup + ) + bootSystem := func(sys LogSystem) { + in := make(chan message, sysBufferSize) + systemIn = append(systemIn, in) + systemWG.Add(1) + go sysLoop(sys, in, &systemWG) + } + for { select { - case msg := <-logMessages: - go dispatch(msg, dispatchDone) - pending++ - case waiter := <-drainWaitReq: - if pending == 0 { - close(waiter) - } else { - drainWait = append(drainWait, waiter) + case msg := <-logMessageC: + for _, c := range systemIn { + c <- msg + } + + case sys := <-addSystemC: + systems = append(systems, sys) + bootSystem(sys) + + case waiter := <-resetC: + // reset means terminate all systems + for _, c := range systemIn { + close(c) + } + systems = nil + systemIn = nil + systemWG.Wait() + close(waiter) + + case waiter := <-flushC: + // flush means reboot all systems + for _, c := range systemIn { + close(c) } - case <-dispatchDone: - pending-- - if pending == 0 { - for _, c := range drainWait { - close(c) - } - drainWait = nil + systemIn = nil + systemWG.Wait() + for _, sys := range systems { + bootSystem(sys) } + close(waiter) } } } -func dispatch(msg message, done chan<- struct{}) { - mutex.RLock() - for _, sys := range logSystems { +func sysLoop(sys LogSystem, in <-chan message, wg *sync.WaitGroup) { + for msg := range in { if sys.GetLogLevel() >= msg.level { sys.LogPrint(msg.level, msg.msg) } } - mutex.RUnlock() - done <- struct{}{} + wg.Done() } // Reset removes all active log systems. +// It blocks until all current messages have been delivered. func Reset() { - mutex.Lock() - logSystems = nil - mutex.Unlock() + waiter := make(chan struct{}) + resetC <- waiter + <-waiter } // Flush waits until all current log messages have been dispatched to // the active log systems. func Flush() { waiter := make(chan struct{}) - drainWaitReq <- waiter + flushC <- waiter <-waiter } // AddLogSystem starts printing messages to the given LogSystem. -func AddLogSystem(logSystem LogSystem) { - mutex.Lock() - logSystems = append(logSystems, logSystem) - mutex.Unlock() +func AddLogSystem(sys LogSystem) { + addSystemC <- sys } // A Logger prints messages prefixed by a given tag. It provides named @@ -130,11 +152,11 @@ func NewLogger(tag string) *Logger { } func (logger *Logger) sendln(level LogLevel, v ...interface{}) { - logMessages <- message{level, logger.tag + fmt.Sprintln(v...)} + logMessageC <- message{level, logger.tag + fmt.Sprintln(v...)} } func (logger *Logger) sendf(level LogLevel, format string, v ...interface{}) { - logMessages <- message{level, logger.tag + fmt.Sprintf(format, v...)} + logMessageC <- message{level, logger.tag + fmt.Sprintf(format, v...)} } // Errorln writes a message with ErrorLevel.