ethlog: fix concurrency

Rather than spawning a new goroutine for each message,
run each log system in a dedicated goroutine.

Ensure that logging is still asynchronous by using a per-system buffer
(currently 500 messages). If it overflows all logging will hang,
but that's better than spawning indefinitely many goroutines.
poc8
Felix Lange 10 years ago
parent c090a77f1c
commit fd9b03a431
  1. 94
      ethlog/loggers.go

@ -47,75 +47,97 @@ const (
)
var (
mutex sync.RWMutex // protects logSystems
logSystems []LogSystem
logMessages = make(chan message)
drainWaitReq = make(chan chan struct{})
logMessageC = make(chan message)
addSystemC = make(chan LogSystem)
flushC = make(chan chan struct{})
resetC = make(chan chan struct{})
)
func init() {
go dispatchLoop()
}
// each system can buffer this many messages before
// blocking incoming log messages.
const sysBufferSize = 500
func dispatchLoop() {
var drainWait []chan struct{}
dispatchDone := make(chan struct{})
pending := 0
var (
systems []LogSystem
systemIn []chan message
systemWG sync.WaitGroup
)
bootSystem := func(sys LogSystem) {
in := make(chan message, sysBufferSize)
systemIn = append(systemIn, in)
systemWG.Add(1)
go sysLoop(sys, in, &systemWG)
}
for {
select {
case msg := <-logMessages:
go dispatch(msg, dispatchDone)
pending++
case waiter := <-drainWaitReq:
if pending == 0 {
close(waiter)
} else {
drainWait = append(drainWait, waiter)
case msg := <-logMessageC:
for _, c := range systemIn {
c <- msg
}
case sys := <-addSystemC:
systems = append(systems, sys)
bootSystem(sys)
case waiter := <-resetC:
// reset means terminate all systems
for _, c := range systemIn {
close(c)
}
case <-dispatchDone:
pending--
if pending == 0 {
for _, c := range drainWait {
systems = nil
systemIn = nil
systemWG.Wait()
close(waiter)
case waiter := <-flushC:
// flush means reboot all systems
for _, c := range systemIn {
close(c)
}
drainWait = nil
systemIn = nil
systemWG.Wait()
for _, sys := range systems {
bootSystem(sys)
}
close(waiter)
}
}
}
func dispatch(msg message, done chan<- struct{}) {
mutex.RLock()
for _, sys := range logSystems {
func sysLoop(sys LogSystem, in <-chan message, wg *sync.WaitGroup) {
for msg := range in {
if sys.GetLogLevel() >= msg.level {
sys.LogPrint(msg.level, msg.msg)
}
}
mutex.RUnlock()
done <- struct{}{}
wg.Done()
}
// Reset removes all active log systems.
// It blocks until all current messages have been delivered.
func Reset() {
mutex.Lock()
logSystems = nil
mutex.Unlock()
waiter := make(chan struct{})
resetC <- waiter
<-waiter
}
// Flush waits until all current log messages have been dispatched to
// the active log systems.
func Flush() {
waiter := make(chan struct{})
drainWaitReq <- waiter
flushC <- waiter
<-waiter
}
// AddLogSystem starts printing messages to the given LogSystem.
func AddLogSystem(logSystem LogSystem) {
mutex.Lock()
logSystems = append(logSystems, logSystem)
mutex.Unlock()
func AddLogSystem(sys LogSystem) {
addSystemC <- sys
}
// A Logger prints messages prefixed by a given tag. It provides named
@ -130,11 +152,11 @@ func NewLogger(tag string) *Logger {
}
func (logger *Logger) sendln(level LogLevel, v ...interface{}) {
logMessages <- message{level, logger.tag + fmt.Sprintln(v...)}
logMessageC <- message{level, logger.tag + fmt.Sprintln(v...)}
}
func (logger *Logger) sendf(level LogLevel, format string, v ...interface{}) {
logMessages <- message{level, logger.tag + fmt.Sprintf(format, v...)}
logMessageC <- message{level, logger.tag + fmt.Sprintf(format, v...)}
}
// Errorln writes a message with ErrorLevel.

Loading…
Cancel
Save