From af2726414078840aae32aa4c982d3a8c5cf66ef5 Mon Sep 17 00:00:00 2001 From: zelig Date: Fri, 4 Jul 2014 19:37:40 +0100 Subject: [PATCH 1/5] remove reactor code from ethutil --- ethutil/reactor.go | 87 ----------------------------------------- ethutil/reactor_test.go | 30 -------------- 2 files changed, 117 deletions(-) delete mode 100644 ethutil/reactor.go delete mode 100644 ethutil/reactor_test.go diff --git a/ethutil/reactor.go b/ethutil/reactor.go deleted file mode 100644 index 7cf1452456..0000000000 --- a/ethutil/reactor.go +++ /dev/null @@ -1,87 +0,0 @@ -package ethutil - -import ( - "sync" -) - -type ReactorEvent struct { - mut sync.Mutex - event string - chans []chan React -} - -// Post the specified reactor resource on the channels -// currently subscribed -func (e *ReactorEvent) Post(react React) { - e.mut.Lock() - defer e.mut.Unlock() - - for _, ch := range e.chans { - go func(ch chan React) { - ch <- react - }(ch) - } -} - -// Add a subscriber to this event -func (e *ReactorEvent) Add(ch chan React) { - e.mut.Lock() - defer e.mut.Unlock() - - e.chans = append(e.chans, ch) -} - -// Remove a subscriber -func (e *ReactorEvent) Remove(ch chan React) { - e.mut.Lock() - defer e.mut.Unlock() - - for i, c := range e.chans { - if c == ch { - e.chans = append(e.chans[:i], e.chans[i+1:]...) - } - } -} - -// Basic reactor resource -type React struct { - Resource interface{} - Event string -} - -// The reactor basic engine. Acts as bridge -// between the events and the subscribers/posters -type ReactorEngine struct { - patterns map[string]*ReactorEvent -} - -func NewReactorEngine() *ReactorEngine { - return &ReactorEngine{patterns: make(map[string]*ReactorEvent)} -} - -// Subscribe a channel to the specified event -func (reactor *ReactorEngine) Subscribe(event string, ch chan React) { - ev := reactor.patterns[event] - // Create a new event if one isn't available - if ev == nil { - ev = &ReactorEvent{event: event} - reactor.patterns[event] = ev - } - - // Add the channel to reactor event handler - ev.Add(ch) -} - -func (reactor *ReactorEngine) Unsubscribe(event string, ch chan React) { - ev := reactor.patterns[event] - if ev != nil { - ev.Remove(ch) - } -} - -func (reactor *ReactorEngine) Post(event string, resource interface{}) { - ev := reactor.patterns[event] - if ev != nil { - ev.Post(React{Resource: resource, Event: event}) - } -} diff --git a/ethutil/reactor_test.go b/ethutil/reactor_test.go deleted file mode 100644 index 48c2f0df35..0000000000 --- a/ethutil/reactor_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package ethutil - -import "testing" - -func TestReactorAdd(t *testing.T) { - engine := NewReactorEngine() - ch := make(chan React) - engine.Subscribe("test", ch) - if len(engine.patterns) != 1 { - t.Error("Expected patterns to be 1, got", len(engine.patterns)) - } -} - -func TestReactorEvent(t *testing.T) { - engine := NewReactorEngine() - - // Buffer 1, so it doesn't block for this test - ch := make(chan React, 1) - engine.Subscribe("test", ch) - engine.Post("test", "hello") - - value := <-ch - if val, ok := value.Resource.(string); ok { - if val != "hello" { - t.Error("Expected Resource to be 'hello', got", val) - } - } else { - t.Error("Unable to cast") - } -} From 584d1c61ec93df3417f2ce8ece041b81a5ec63a6 Mon Sep 17 00:00:00 2001 From: zelig Date: Fri, 4 Jul 2014 19:38:44 +0100 Subject: [PATCH 2/5] use ethreact.Event and ethreact.ReactorEngine --- ethchain/dagger.go | 5 +++-- ethchain/state_manager.go | 3 ++- ethereum.go | 11 ++++++++--- ethminer/miner.go | 12 ++++++------ 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/ethchain/dagger.go b/ethchain/dagger.go index 4dda21ff5d..adf1c2f05e 100644 --- a/ethchain/dagger.go +++ b/ethchain/dagger.go @@ -3,6 +3,7 @@ package ethchain import ( "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" + "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethutil" "github.com/obscuren/sha3" "hash" @@ -14,7 +15,7 @@ import ( var powlogger = ethlog.NewLogger("POW") type PoW interface { - Search(block *Block, reactChan chan ethutil.React) []byte + Search(block *Block, reactChan chan ethreact.Event) []byte Verify(hash []byte, diff *big.Int, nonce []byte) bool } @@ -22,7 +23,7 @@ type EasyPow struct { hash *big.Int } -func (pow *EasyPow) Search(block *Block, reactChan chan ethutil.React) []byte { +func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte { r := rand.New(rand.NewSource(time.Now().UnixNano())) hash := block.HashNoNonce() diff := block.Difficulty diff --git a/ethchain/state_manager.go b/ethchain/state_manager.go index cec424583a..962f95b290 100644 --- a/ethchain/state_manager.go +++ b/ethchain/state_manager.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" + "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethtrie" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" @@ -36,7 +37,7 @@ type EthManager interface { BlockChain() *BlockChain TxPool() *TxPool Broadcast(msgType ethwire.MsgType, data []interface{}) - Reactor() *ethutil.ReactorEngine + Reactor() *ethreact.ReactorEngine PeerCount() int IsMining() bool IsListening() bool diff --git a/ethereum.go b/ethereum.go index 35d98e8311..de4e915a12 100644 --- a/ethereum.go +++ b/ethereum.go @@ -6,6 +6,7 @@ import ( "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" + "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethrpc" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" @@ -71,7 +72,7 @@ type Ethereum struct { listening bool - reactor *ethutil.ReactorEngine + reactor *ethreact.ReactorEngine RpcServer *ethrpc.JsonRpcServer @@ -106,7 +107,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager keyManager: keyManager, clientIdentity: clientIdentity, } - ethereum.reactor = ethutil.NewReactorEngine() + ethereum.reactor = ethreact.New() ethereum.txPool = ethchain.NewTxPool(ethereum) ethereum.blockChain = ethchain.NewBlockChain(ethereum) @@ -118,7 +119,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager return ethereum, nil } -func (s *Ethereum) Reactor() *ethutil.ReactorEngine { +func (s *Ethereum) Reactor() *ethreact.ReactorEngine { return s.reactor } @@ -350,6 +351,7 @@ func (s *Ethereum) ReapDeadPeerHandler() { // Start the ethereum func (s *Ethereum) Start(seed bool) { + s.reactor.Start() // Bind to addr and port ln, err := net.Listen("tcp", ":"+s.Port) if err != nil { @@ -461,6 +463,9 @@ func (s *Ethereum) Stop() { s.txPool.Stop() s.stateManager.Stop() + s.reactor.Flush() + s.reactor.Stop() + ethlogger.Infoln("Server stopped") close(s.shutdownChan) } diff --git a/ethminer/miner.go b/ethminer/miner.go index 71d4b2428a..8224c5441b 100644 --- a/ethminer/miner.go +++ b/ethminer/miner.go @@ -4,7 +4,7 @@ import ( "bytes" "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethlog" - "github.com/ethereum/eth-go/ethutil" + "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethwire" "sort" ) @@ -15,19 +15,19 @@ type Miner struct { pow ethchain.PoW ethereum ethchain.EthManager coinbase []byte - reactChan chan ethutil.React + reactChan chan ethreact.Event txs ethchain.Transactions uncles []*ethchain.Block block *ethchain.Block powChan chan []byte - powQuitChan chan ethutil.React + powQuitChan chan ethreact.Event quitChan chan bool } func NewDefaultMiner(coinbase []byte, ethereum ethchain.EthManager) Miner { - reactChan := make(chan ethutil.React, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in - powChan := make(chan []byte, 1) // This is the channel that receives valid sha hases for a given block - powQuitChan := make(chan ethutil.React, 1) // This is the channel that can exit the miner thread + reactChan := make(chan ethreact.Event, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in + powChan := make(chan []byte, 1) // This is the channel that receives valid sha hases for a given block + powQuitChan := make(chan ethreact.Event, 1) // This is the channel that can exit the miner thread quitChan := make(chan bool, 1) ethereum.Reactor().Subscribe("newBlock", reactChan) From 0c6f1c9c3aaba8e5a15ed50e90e65dd344afef8a Mon Sep 17 00:00:00 2001 From: zelig Date: Fri, 4 Jul 2014 19:38:53 +0100 Subject: [PATCH 3/5] ethreact - consistent renaming - React -> Event - ReactorEvent -> EventHandler - NewReactorEngine -> New - async ReactorEngine main loop with select on main eventChannel and quit channel - ReactorEngine main loop control with Start() Stop() Flush() - ReactorEngine.dispatch - use sync.RWMutex - delete eventHandler if subscribed channels go to 0 --- ethreact/README.md | 40 +++++++++ ethreact/reactor.go | 175 +++++++++++++++++++++++++++++++++++++++ ethreact/reactor_test.go | 63 ++++++++++++++ 3 files changed, 278 insertions(+) create mode 100644 ethreact/README.md create mode 100644 ethreact/reactor.go create mode 100644 ethreact/reactor_test.go diff --git a/ethreact/README.md b/ethreact/README.md new file mode 100644 index 0000000000..592b50b96a --- /dev/null +++ b/ethreact/README.md @@ -0,0 +1,40 @@ +# ethreact + +ethereum event reactor. Component of the ethereum stack. +various events like state change on an account or new block found are broadcast to subscribers. +Broadcasting to subscribers is running on its own routine and globally order preserving. + +## Clients +### subscribe + + eventChannel := make(chan ethreact.Event) + reactor.Subscribe(event, eventChannel) + +The same channel can be subscribed to multiple events but only once for each event. In order to allow order of events to be preserved, broadcast of events is synchronous within the main broadcast loop. Therefore any blocking subscriber channels will be skipped, i.e. missing broadcasting events while they are blocked. + +### unsubscribe + + reactor.Unsubscribe(event, eventChannel) + +### Processing events + +event.Resource is of type interface{}. The actual type of event.Resource depends on event.Name and may need to be cast for processing. + + var event ethreact.Event + for { + select { + case event = <-eventChannel: + processTransaction(event.Resource.(Transaction)) + } + } + +## Broadcast + + reactor := ethreact.New() + reactor.Start() + reactor.Post(name, resource) + reactor.Flush() // wait till all broadcast messages are dispatched + reactor.Stop() // stop the main broadcast loop immediately (even if there are unbroadcast events left) + + + diff --git a/ethreact/reactor.go b/ethreact/reactor.go new file mode 100644 index 0000000000..3802d95b33 --- /dev/null +++ b/ethreact/reactor.go @@ -0,0 +1,175 @@ +package ethreact + +import ( + "github.com/ethereum/eth-go/ethlog" + "sync" +) + +var logger = ethlog.NewLogger("REACTOR") + +type EventHandler struct { + lock sync.RWMutex + name string + chans []chan Event +} + +// Post the Event with the reactor resource on the channels +// currently subscribed to the event +func (e *EventHandler) Post(event Event) { + e.lock.RLock() + defer e.lock.RUnlock() + + // if we want to preserve order pushing to subscibed channels + // dispatching should be syncrounous + // this means if subscribed event channel is blocked (closed or has fixed capacity) + // the reactor dispatch will be blocked, so we need to mitigate by skipping + // rogue blocking subscribers + for i, ch := range e.chans { + select { + case ch <- event: + default: + logger.Warnln("subscribing channel %d to event %s blocked. skipping", i, event.Name) + } + } +} + +// Add a subscriber to this event +func (e *EventHandler) Add(ch chan Event) { + e.lock.Lock() + defer e.lock.Unlock() + + e.chans = append(e.chans, ch) +} + +// Remove a subscriber +func (e *EventHandler) Remove(ch chan Event) int { + e.lock.Lock() + defer e.lock.Unlock() + + for i, c := range e.chans { + if c == ch { + e.chans = append(e.chans[:i], e.chans[i+1:]...) + } + } + return len(e.chans) +} + +// Basic reactor resource +type Event struct { + Resource interface{} + Name string +} + +// The reactor basic engine. Acts as bridge +// between the events and the subscribers/posters +type ReactorEngine struct { + lock sync.RWMutex + eventChannel chan Event + eventHandlers map[string]*EventHandler + quit chan bool + shutdownChannel chan bool + running bool + drained bool +} + +func New() *ReactorEngine { + return &ReactorEngine{ + eventHandlers: make(map[string]*EventHandler), + eventChannel: make(chan Event), + quit: make(chan bool, 1), + shutdownChannel: make(chan bool, 1), + } +} + +func (reactor *ReactorEngine) Start() { + reactor.lock.Lock() + defer reactor.lock.Unlock() + if !reactor.running { + go func() { + out: + for { + select { + case <-reactor.quit: + break out + case event := <-reactor.eventChannel: + // needs to be called syncronously to keep order of events + reactor.dispatch(event) + default: + reactor.drained = true + } + } + reactor.lock.Lock() + defer reactor.lock.Unlock() + reactor.running = false + logger.Infoln("stopped") + close(reactor.shutdownChannel) + }() + reactor.running = true + logger.Infoln("started") + } +} + +func (reactor *ReactorEngine) Stop() { + reactor.lock.RLock() + if reactor.running { + reactor.quit <- true + } + reactor.lock.RUnlock() + <-reactor.shutdownChannel +} + +func (reactor *ReactorEngine) Flush() { + for !reactor.drained { + } +} + +// Subscribe a channel to the specified event +func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) { + reactor.lock.Lock() + defer reactor.lock.Unlock() + + eventHandler := reactor.eventHandlers[event] + // Create a new event handler if one isn't available + if eventHandler == nil { + eventHandler = &EventHandler{name: event} + reactor.eventHandlers[event] = eventHandler + } + // Add the events channel to reactor event handler + eventHandler.Add(eventChannel) + logger.Debugln("added new subscription to %s", event) +} + +func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) { + reactor.lock.Lock() + defer reactor.lock.Unlock() + + eventHandler := reactor.eventHandlers[event] + if eventHandler != nil { + len := eventHandler.Remove(eventChannel) + if len == 0 { + reactor.eventHandlers[event] = nil + } + logger.Debugln("removed subscription to %s", event) + } +} + +func (reactor *ReactorEngine) Post(event string, resource interface{}) { + reactor.lock.Lock() + defer reactor.lock.Unlock() + + if reactor.running { + reactor.drained = false + reactor.eventChannel <- Event{Resource: resource, Name: event} + } +} + +func (reactor *ReactorEngine) dispatch(event Event) { + name := event.Name + eventHandler := reactor.eventHandlers[name] + // if no subscriptions to this event type - no event handler created + // then noone to notify + if eventHandler != nil { + // needs to be called syncronously + eventHandler.Post(event) + } +} diff --git a/ethreact/reactor_test.go b/ethreact/reactor_test.go new file mode 100644 index 0000000000..801a8abd01 --- /dev/null +++ b/ethreact/reactor_test.go @@ -0,0 +1,63 @@ +package ethreact + +import ( + "fmt" + "testing" +) + +func TestReactorAdd(t *testing.T) { + reactor := New() + ch := make(chan Event) + reactor.Subscribe("test", ch) + if reactor.eventHandlers["test"] == nil { + t.Error("Expected new eventHandler to be created") + } + reactor.Unsubscribe("test", ch) + if reactor.eventHandlers["test"] != nil { + t.Error("Expected eventHandler to be removed") + } +} + +func TestReactorEvent(t *testing.T) { + var name string + reactor := New() + // Buffer the channel, so it doesn't block for this test + cap := 20 + ch := make(chan Event, cap) + reactor.Subscribe("even", ch) + reactor.Subscribe("odd", ch) + reactor.Post("even", "disappears") // should not broadcast if engine not started + reactor.Start() + for i := 0; i < cap; i++ { + if i%2 == 0 { + name = "even" + } else { + name = "odd" + } + reactor.Post(name, i) + } + reactor.Post("test", cap) // this should not block + i := 0 + reactor.Flush() + close(ch) + for event := range ch { + fmt.Printf("%d: %v", i, event) + if i%2 == 0 { + name = "even" + } else { + name = "odd" + } + if val, ok := event.Resource.(int); ok { + if i != val || event.Name != name { + t.Error("Expected event %d to be of type %s and resource %d, got ", i, name, i, val) + } + } else { + t.Error("Unable to cast") + } + i++ + } + if i != cap { + t.Error("excpected exactly %d events, got ", i) + } + reactor.Stop() +} From d4300c406c5f98d35857b6e53b0427be5f45e3b2 Mon Sep 17 00:00:00 2001 From: zelig Date: Sat, 5 Jul 2014 18:36:22 +0100 Subject: [PATCH 4/5] logger fix - introduce quit, drained, shutdown channels - mainLoop falls through reading message channel to drained state, and waits is blocked in default branch until any message is sent - Flush() waits for <-drained - Stop() pushes quit and nodges mainloop out of blocking drained state - package-global mutex - Reset() - clear tests --- ethlog/loggers.go | 57 +++++++++++++++++++++++++++++------------- ethlog/loggers_test.go | 23 ++++++++--------- 2 files changed, 50 insertions(+), 30 deletions(-) diff --git a/ethlog/loggers.go b/ethlog/loggers.go index 219c782408..d7707cf9e4 100644 --- a/ethlog/loggers.go +++ b/ethlog/loggers.go @@ -40,6 +40,9 @@ func (msg *logMessage) send(logger LogSystem) { var logMessages chan (*logMessage) var logSystems []LogSystem var quit chan bool +var drained chan bool +var shutdown chan bool +var mutex = sync.Mutex{} type LogLevel uint8 @@ -57,29 +60,41 @@ func start() { out: for { select { + case <-quit: + break out case msg := <-logMessages: for _, logSystem := range logSystems { if logSystem.GetLogLevel() >= msg.LogLevel { msg.send(logSystem) } } - case <-quit: - break out + case drained <- true: + default: + drained <- true // this blocks until a message is sent to the queu } } + close(shutdown) } -// waits until log messages are drained (dispatched to log writers) -func Flush() { - quit <- true - -done: - for { +func Reset() { + mutex.Lock() + defer mutex.Unlock() + if logSystems != nil { + quit <- true select { - case <-logMessages: - default: - break done + case <-drained: } + <-shutdown + } + logSystems = nil +} + +// waits until log messages are drained (dispatched to log writers) +func Flush() { + mutex.Lock() + defer mutex.Unlock() + if logSystems != nil { + <-drained } } @@ -92,28 +107,34 @@ func NewLogger(tag string) *Logger { } func AddLogSystem(logSystem LogSystem) { - var mutex = &sync.Mutex{} mutex.Lock() defer mutex.Unlock() if logSystems == nil { logMessages = make(chan *logMessage) quit = make(chan bool) + drained = make(chan bool, 1) + shutdown = make(chan bool, 1) go start() } logSystems = append(logSystems, logSystem) } +func send(msg *logMessage) { + select { + case <-drained: + } + logMessages <- msg +} + func (logger *Logger) sendln(level LogLevel, v ...interface{}) { - if logMessages != nil { - msg := newPrintlnLogMessage(level, logger.tag, v...) - logMessages <- msg + if logSystems != nil { + send(newPrintlnLogMessage(level, logger.tag, v...)) } } func (logger *Logger) sendf(level LogLevel, format string, v ...interface{}) { - if logMessages != nil { - msg := newPrintfLogMessage(level, logger.tag, format, v...) - logMessages <- msg + if logSystems != nil { + send(newPrintfLogMessage(level, logger.tag, format, v...)) } } diff --git a/ethlog/loggers_test.go b/ethlog/loggers_test.go index 89f4166810..9fff471c18 100644 --- a/ethlog/loggers_test.go +++ b/ethlog/loggers_test.go @@ -28,10 +28,6 @@ func (t *TestLogSystem) GetLogLevel() LogLevel { return t.level } -func quote(s string) string { - return fmt.Sprintf("'%s'", s) -} - func TestLoggerPrintln(t *testing.T) { logger := NewLogger("TEST") testLogSystem := &TestLogSystem{level: WarnLevel} @@ -41,10 +37,10 @@ func TestLoggerPrintln(t *testing.T) { logger.Infoln("info") logger.Debugln("debug") Flush() + Reset() output := testLogSystem.Output - fmt.Println(quote(output)) if output != "[TEST] error\n[TEST] warn\n" { - t.Error("Expected logger output '[TEST] error\\n[TEST] warn\\n', got ", quote(testLogSystem.Output)) + t.Error("Expected logger output '[TEST] error\\n[TEST] warn\\n', got ", testLogSystem.Output) } } @@ -57,10 +53,10 @@ func TestLoggerPrintf(t *testing.T) { logger.Infof("info") logger.Debugf("debug") Flush() + Reset() output := testLogSystem.Output - fmt.Println(quote(output)) if output != "[TEST] error to { 2}\n[TEST] warn" { - t.Error("Expected logger output '[TEST] error to { 2}\\n[TEST] warn', got ", quote(testLogSystem.Output)) + t.Error("Expected logger output '[TEST] error to { 2}\\n[TEST] warn', got ", testLogSystem.Output) } } @@ -73,13 +69,14 @@ func TestMultipleLogSystems(t *testing.T) { logger.Errorln("error") logger.Warnln("warn") Flush() + Reset() output0 := testLogSystem0.Output output1 := testLogSystem1.Output if output0 != "[TEST] error\n" { - t.Error("Expected logger 0 output '[TEST] error\\n', got ", quote(testLogSystem0.Output)) + t.Error("Expected logger 0 output '[TEST] error\\n', got ", testLogSystem0.Output) } if output1 != "[TEST] error\n[TEST] warn\n" { - t.Error("Expected logger 1 output '[TEST] error\\n[TEST] warn\\n', got ", quote(testLogSystem1.Output)) + t.Error("Expected logger 1 output '[TEST] error\\n[TEST] warn\\n', got ", testLogSystem1.Output) } } @@ -92,11 +89,11 @@ func TestFileLogSystem(t *testing.T) { logger.Errorf("error to %s\n", filename) logger.Warnln("warn") Flush() + Reset() contents, _ := ioutil.ReadFile(filename) output := string(contents) - fmt.Println(quote(output)) if output != "[TEST] error to test.log\n[TEST] warn\n" { - t.Error("Expected contents of file 'test.log': '[TEST] error to test.log\\n[TEST] warn\\n', got ", quote(output)) + t.Error("Expected contents of file 'test.log': '[TEST] error to test.log\\n[TEST] warn\\n', got ", output) } else { os.Remove(filename) } @@ -105,5 +102,7 @@ func TestFileLogSystem(t *testing.T) { func TestNoLogSystem(t *testing.T) { logger := NewLogger("TEST") logger.Warnln("warn") + fmt.Println("1") Flush() + Reset() } From 5a2afc575485e2d651b9840f5d1ea080cdc72fa7 Mon Sep 17 00:00:00 2001 From: zelig Date: Sat, 5 Jul 2014 19:56:01 +0100 Subject: [PATCH 5/5] fix reactor engine main loop blocked to wait if drained --- ethreact/reactor.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/ethreact/reactor.go b/ethreact/reactor.go index 3802d95b33..f42f71202b 100644 --- a/ethreact/reactor.go +++ b/ethreact/reactor.go @@ -28,7 +28,7 @@ func (e *EventHandler) Post(event Event) { select { case ch <- event: default: - logger.Warnln("subscribing channel %d to event %s blocked. skipping", i, event.Name) + logger.Warnf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name) } } } @@ -69,7 +69,7 @@ type ReactorEngine struct { quit chan bool shutdownChannel chan bool running bool - drained bool + drained chan bool } func New() *ReactorEngine { @@ -77,6 +77,7 @@ func New() *ReactorEngine { eventHandlers: make(map[string]*EventHandler), eventChannel: make(chan Event), quit: make(chan bool, 1), + drained: make(chan bool, 1), shutdownChannel: make(chan bool, 1), } } @@ -94,8 +95,9 @@ func (reactor *ReactorEngine) Start() { case event := <-reactor.eventChannel: // needs to be called syncronously to keep order of events reactor.dispatch(event) + case reactor.drained <- true: default: - reactor.drained = true + reactor.drained <- true // blocking till message is coming in } } reactor.lock.Lock() @@ -113,14 +115,16 @@ func (reactor *ReactorEngine) Stop() { reactor.lock.RLock() if reactor.running { reactor.quit <- true + select { + case <-reactor.drained: + } } reactor.lock.RUnlock() <-reactor.shutdownChannel } func (reactor *ReactorEngine) Flush() { - for !reactor.drained { - } + <-reactor.drained } // Subscribe a channel to the specified event @@ -136,7 +140,7 @@ func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) { } // Add the events channel to reactor event handler eventHandler.Add(eventChannel) - logger.Debugln("added new subscription to %s", event) + logger.Debugf("added new subscription to %s", event) } func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) { @@ -149,7 +153,7 @@ func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) if len == 0 { reactor.eventHandlers[event] = nil } - logger.Debugln("removed subscription to %s", event) + logger.Debugf("removed subscription to %s", event) } } @@ -158,8 +162,10 @@ func (reactor *ReactorEngine) Post(event string, resource interface{}) { defer reactor.lock.Unlock() if reactor.running { - reactor.drained = false reactor.eventChannel <- Event{Resource: resource, Name: event} + select { + case <-reactor.drained: + } } }