From 4d77b7facecfea7069af15f19429585687c47fbb Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 7 Jul 2014 12:30:25 +0100 Subject: [PATCH 1/9] remove extra case in main loop --- ethreact/reactor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethreact/reactor.go b/ethreact/reactor.go index f42f71202b..8e72ca903e 100644 --- a/ethreact/reactor.go +++ b/ethreact/reactor.go @@ -95,7 +95,7 @@ func (reactor *ReactorEngine) Start() { case event := <-reactor.eventChannel: // needs to be called syncronously to keep order of events reactor.dispatch(event) - case reactor.drained <- true: + // case reactor.drained <- true: default: reactor.drained <- true // blocking till message is coming in } From 5c03adbdededd31cb73f64ced01e33154347e193 Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 14 Jul 2014 18:37:01 +0100 Subject: [PATCH 2/9] fix logger channel blocking --- ethlog/loggers.go | 55 +++++++++++++++++++++--------------------- ethlog/loggers_test.go | 15 ++++++++++++ 2 files changed, 43 insertions(+), 27 deletions(-) diff --git a/ethlog/loggers.go b/ethlog/loggers.go index d7707cf9e4..4e950cadde 100644 --- a/ethlog/loggers.go +++ b/ethlog/loggers.go @@ -39,9 +39,8 @@ func (msg *logMessage) send(logger LogSystem) { var logMessages chan (*logMessage) var logSystems []LogSystem -var quit chan bool +var quit chan chan error var drained chan bool -var shutdown chan bool var mutex = sync.Mutex{} type LogLevel uint8 @@ -55,44 +54,54 @@ const ( DebugDetailLevel ) +func dispatch(msg *logMessage) { + for _, logSystem := range logSystems { + if logSystem.GetLogLevel() >= msg.LogLevel { + msg.send(logSystem) + } + } +} + // log messages are dispatched to log writers func start() { -out: for { select { - case <-quit: - break out + case status := <-quit: + status <- nil + return case msg := <-logMessages: - for _, logSystem := range logSystems { - if logSystem.GetLogLevel() >= msg.LogLevel { - msg.send(logSystem) - } - } - case drained <- true: + dispatch(msg) default: - drained <- true // this blocks until a message is sent to the queu + drained <- true // this blocks until a message is sent to the queue } } - close(shutdown) +} + +func send(msg *logMessage) { + logMessages <- msg + select { + case <-drained: + default: + } } func Reset() { mutex.Lock() defer mutex.Unlock() if logSystems != nil { - quit <- true + status := make(chan error) + quit <- status select { case <-drained: + default: } - <-shutdown + <-status } logSystems = nil } // waits until log messages are drained (dispatched to log writers) func Flush() { - mutex.Lock() - defer mutex.Unlock() if logSystems != nil { <-drained } @@ -110,22 +119,14 @@ func AddLogSystem(logSystem LogSystem) { mutex.Lock() defer mutex.Unlock() if logSystems == nil { - logMessages = make(chan *logMessage) - quit = make(chan bool) + logMessages = make(chan *logMessage, 5) + quit = make(chan chan error, 1) drained = make(chan bool, 1) - shutdown = make(chan bool, 1) go start() } logSystems = append(logSystems, logSystem) } -func send(msg *logMessage) { - select { - case <-drained: - } - logMessages <- msg -} - func (logger *Logger) sendln(level LogLevel, v ...interface{}) { if logSystems != nil { send(newPrintlnLogMessage(level, logger.tag, v...)) diff --git a/ethlog/loggers_test.go b/ethlog/loggers_test.go index 9fff471c18..0e1c12e551 100644 --- a/ethlog/loggers_test.go +++ b/ethlog/loggers_test.go @@ -28,6 +28,21 @@ func (t *TestLogSystem) GetLogLevel() LogLevel { return t.level } +func TestLoggerFlush(t *testing.T) { + logger := NewLogger("TEST") + testLogSystem := &TestLogSystem{level: WarnLevel} + AddLogSystem(testLogSystem) + for i := 0; i < 5; i++ { + logger.Errorf(".") + } + Flush() + Reset() + output := testLogSystem.Output + if output != "[TEST] .[TEST] .[TEST] .[TEST] .[TEST] ." { + t.Error("Expected complete logger output '[TEST] .[TEST] .[TEST] .[TEST] .[TEST] .', got ", output) + } +} + func TestLoggerPrintln(t *testing.T) { logger := NewLogger("TEST") testLogSystem := &TestLogSystem{level: WarnLevel} From dc11b5c55e2888a7a3dac51fedc3864d112136ce Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 14 Jul 2014 18:40:18 +0100 Subject: [PATCH 3/9] fix reactor channel blocking --- ethreact/reactor.go | 53 +++++++++++++++++++++++---------------------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/ethreact/reactor.go b/ethreact/reactor.go index 8e72ca903e..a26f82a97f 100644 --- a/ethreact/reactor.go +++ b/ethreact/reactor.go @@ -7,6 +7,10 @@ import ( var logger = ethlog.NewLogger("REACTOR") +const ( + eventBufferSize int = 10 +) + type EventHandler struct { lock sync.RWMutex name string @@ -21,7 +25,7 @@ func (e *EventHandler) Post(event Event) { // if we want to preserve order pushing to subscibed channels // dispatching should be syncrounous - // this means if subscribed event channel is blocked (closed or has fixed capacity) + // this means if subscribed event channel is blocked // the reactor dispatch will be blocked, so we need to mitigate by skipping // rogue blocking subscribers for i, ch := range e.chans { @@ -63,22 +67,20 @@ type Event struct { // The reactor basic engine. Acts as bridge // between the events and the subscribers/posters type ReactorEngine struct { - lock sync.RWMutex - eventChannel chan Event - eventHandlers map[string]*EventHandler - quit chan bool - shutdownChannel chan bool - running bool - drained chan bool + lock sync.RWMutex + eventChannel chan Event + eventHandlers map[string]*EventHandler + quit chan chan error + running bool + drained chan bool } func New() *ReactorEngine { return &ReactorEngine{ - eventHandlers: make(map[string]*EventHandler), - eventChannel: make(chan Event), - quit: make(chan bool, 1), - drained: make(chan bool, 1), - shutdownChannel: make(chan bool, 1), + eventHandlers: make(map[string]*EventHandler), + eventChannel: make(chan Event, eventBufferSize), + quit: make(chan chan error, 1), + drained: make(chan bool, 1), } } @@ -87,24 +89,22 @@ func (reactor *ReactorEngine) Start() { defer reactor.lock.Unlock() if !reactor.running { go func() { - out: for { select { - case <-reactor.quit: - break out + case status := <-reactor.quit: + reactor.lock.Lock() + defer reactor.lock.Unlock() + reactor.running = false + logger.Infoln("stopped") + status <- nil + return case event := <-reactor.eventChannel: // needs to be called syncronously to keep order of events reactor.dispatch(event) - // case reactor.drained <- true: default: reactor.drained <- true // blocking till message is coming in } } - reactor.lock.Lock() - defer reactor.lock.Unlock() - reactor.running = false - logger.Infoln("stopped") - close(reactor.shutdownChannel) }() reactor.running = true logger.Infoln("started") @@ -112,15 +112,15 @@ func (reactor *ReactorEngine) Start() { } func (reactor *ReactorEngine) Stop() { - reactor.lock.RLock() if reactor.running { - reactor.quit <- true + status := make(chan error) + reactor.quit <- status select { case <-reactor.drained: + default: } + <-status } - reactor.lock.RUnlock() - <-reactor.shutdownChannel } func (reactor *ReactorEngine) Flush() { @@ -165,6 +165,7 @@ func (reactor *ReactorEngine) Post(event string, resource interface{}) { reactor.eventChannel <- Event{Resource: resource, Name: event} select { case <-reactor.drained: + default: } } } From d1c89727dcf10c87bd1df68da9508cec047c56cf Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 14 Jul 2014 19:02:34 +0100 Subject: [PATCH 4/9] fix send overwritten by merge --- ethlog/loggers.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ethlog/loggers.go b/ethlog/loggers.go index f131861025..59021d1699 100644 --- a/ethlog/loggers.go +++ b/ethlog/loggers.go @@ -130,14 +130,14 @@ func AddLogSystem(logSystem LogSystem) { func (logger *Logger) sendln(level LogLevel, v ...interface{}) { if logMessages != nil { msg := newPrintlnLogMessage(level, logger.tag, v...) - logMessages <- msg + send(msg) } } func (logger *Logger) sendf(level LogLevel, format string, v ...interface{}) { if logMessages != nil { msg := newPrintfLogMessage(level, logger.tag, format, v...) - logMessages <- msg + send(msg) } } From 0ecc5c815e4550d7e8dca7b4ab4c549b10bf0b19 Mon Sep 17 00:00:00 2001 From: zelig Date: Tue, 15 Jul 2014 00:15:37 +0100 Subject: [PATCH 5/9] reactor test --- ethreact/reactor_test.go | 63 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 ethreact/reactor_test.go diff --git a/ethreact/reactor_test.go b/ethreact/reactor_test.go new file mode 100644 index 0000000000..801a8abd01 --- /dev/null +++ b/ethreact/reactor_test.go @@ -0,0 +1,63 @@ +package ethreact + +import ( + "fmt" + "testing" +) + +func TestReactorAdd(t *testing.T) { + reactor := New() + ch := make(chan Event) + reactor.Subscribe("test", ch) + if reactor.eventHandlers["test"] == nil { + t.Error("Expected new eventHandler to be created") + } + reactor.Unsubscribe("test", ch) + if reactor.eventHandlers["test"] != nil { + t.Error("Expected eventHandler to be removed") + } +} + +func TestReactorEvent(t *testing.T) { + var name string + reactor := New() + // Buffer the channel, so it doesn't block for this test + cap := 20 + ch := make(chan Event, cap) + reactor.Subscribe("even", ch) + reactor.Subscribe("odd", ch) + reactor.Post("even", "disappears") // should not broadcast if engine not started + reactor.Start() + for i := 0; i < cap; i++ { + if i%2 == 0 { + name = "even" + } else { + name = "odd" + } + reactor.Post(name, i) + } + reactor.Post("test", cap) // this should not block + i := 0 + reactor.Flush() + close(ch) + for event := range ch { + fmt.Printf("%d: %v", i, event) + if i%2 == 0 { + name = "even" + } else { + name = "odd" + } + if val, ok := event.Resource.(int); ok { + if i != val || event.Name != name { + t.Error("Expected event %d to be of type %s and resource %d, got ", i, name, i, val) + } + } else { + t.Error("Unable to cast") + } + i++ + } + if i != cap { + t.Error("excpected exactly %d events, got ", i) + } + reactor.Stop() +} From 1735ec0362e84455126d8c1bd380ecae436d1167 Mon Sep 17 00:00:00 2001 From: zelig Date: Tue, 15 Jul 2014 01:11:06 +0100 Subject: [PATCH 6/9] use ethreact.Event and ethreact.ReactorEngine --- ethchain/dagger.go | 5 ++- ethchain/state_manager.go | 3 +- ethereum.go | 10 +++-- ethminer/miner.go | 12 +++--- ethutil/reactor.go | 87 --------------------------------------- ethutil/reactor_test.go | 30 -------------- 6 files changed, 18 insertions(+), 129 deletions(-) delete mode 100644 ethutil/reactor.go delete mode 100644 ethutil/reactor_test.go diff --git a/ethchain/dagger.go b/ethchain/dagger.go index 4dda21ff5d..adf1c2f05e 100644 --- a/ethchain/dagger.go +++ b/ethchain/dagger.go @@ -3,6 +3,7 @@ package ethchain import ( "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" + "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethutil" "github.com/obscuren/sha3" "hash" @@ -14,7 +15,7 @@ import ( var powlogger = ethlog.NewLogger("POW") type PoW interface { - Search(block *Block, reactChan chan ethutil.React) []byte + Search(block *Block, reactChan chan ethreact.Event) []byte Verify(hash []byte, diff *big.Int, nonce []byte) bool } @@ -22,7 +23,7 @@ type EasyPow struct { hash *big.Int } -func (pow *EasyPow) Search(block *Block, reactChan chan ethutil.React) []byte { +func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte { r := rand.New(rand.NewSource(time.Now().UnixNano())) hash := block.HashNoNonce() diff := block.Difficulty diff --git a/ethchain/state_manager.go b/ethchain/state_manager.go index 62fcda8a5c..3eafd2d6e8 100644 --- a/ethchain/state_manager.go +++ b/ethchain/state_manager.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" + "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethtrie" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" @@ -36,7 +37,7 @@ type EthManager interface { BlockChain() *BlockChain TxPool() *TxPool Broadcast(msgType ethwire.MsgType, data []interface{}) - Reactor() *ethutil.ReactorEngine + Reactor() *ethreact.ReactorEngine PeerCount() int IsMining() bool IsListening() bool diff --git a/ethereum.go b/ethereum.go index 2806dfd9dd..c2d2f52419 100644 --- a/ethereum.go +++ b/ethereum.go @@ -6,6 +6,7 @@ import ( "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" + "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethrpc" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" @@ -73,7 +74,7 @@ type Ethereum struct { listening bool - reactor *ethutil.ReactorEngine + reactor *ethreact.ReactorEngine RpcServer *ethrpc.JsonRpcServer @@ -108,7 +109,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager keyManager: keyManager, clientIdentity: clientIdentity, } - ethereum.reactor = ethutil.NewReactorEngine() + ethereum.reactor = ethreact.New() ethereum.txPool = ethchain.NewTxPool(ethereum) ethereum.blockChain = ethchain.NewBlockChain(ethereum) @@ -120,7 +121,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager return ethereum, nil } -func (s *Ethereum) Reactor() *ethutil.ReactorEngine { +func (s *Ethereum) Reactor() *ethreact.ReactorEngine { return s.reactor } @@ -352,6 +353,7 @@ func (s *Ethereum) ReapDeadPeerHandler() { // Start the ethereum func (s *Ethereum) Start(seed bool) { + s.reactor.Start() // Bind to addr and port ln, err := net.Listen("tcp", ":"+s.Port) if err != nil { @@ -462,6 +464,8 @@ func (s *Ethereum) Stop() { } s.txPool.Stop() s.stateManager.Stop() + s.reactor.Flush() + s.reactor.Stop() ethlogger.Infoln("Server stopped") close(s.shutdownChan) diff --git a/ethminer/miner.go b/ethminer/miner.go index 71d4b2428a..8224c5441b 100644 --- a/ethminer/miner.go +++ b/ethminer/miner.go @@ -4,7 +4,7 @@ import ( "bytes" "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethlog" - "github.com/ethereum/eth-go/ethutil" + "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethwire" "sort" ) @@ -15,19 +15,19 @@ type Miner struct { pow ethchain.PoW ethereum ethchain.EthManager coinbase []byte - reactChan chan ethutil.React + reactChan chan ethreact.Event txs ethchain.Transactions uncles []*ethchain.Block block *ethchain.Block powChan chan []byte - powQuitChan chan ethutil.React + powQuitChan chan ethreact.Event quitChan chan bool } func NewDefaultMiner(coinbase []byte, ethereum ethchain.EthManager) Miner { - reactChan := make(chan ethutil.React, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in - powChan := make(chan []byte, 1) // This is the channel that receives valid sha hases for a given block - powQuitChan := make(chan ethutil.React, 1) // This is the channel that can exit the miner thread + reactChan := make(chan ethreact.Event, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in + powChan := make(chan []byte, 1) // This is the channel that receives valid sha hases for a given block + powQuitChan := make(chan ethreact.Event, 1) // This is the channel that can exit the miner thread quitChan := make(chan bool, 1) ethereum.Reactor().Subscribe("newBlock", reactChan) diff --git a/ethutil/reactor.go b/ethutil/reactor.go deleted file mode 100644 index 7cf1452456..0000000000 --- a/ethutil/reactor.go +++ /dev/null @@ -1,87 +0,0 @@ -package ethutil - -import ( - "sync" -) - -type ReactorEvent struct { - mut sync.Mutex - event string - chans []chan React -} - -// Post the specified reactor resource on the channels -// currently subscribed -func (e *ReactorEvent) Post(react React) { - e.mut.Lock() - defer e.mut.Unlock() - - for _, ch := range e.chans { - go func(ch chan React) { - ch <- react - }(ch) - } -} - -// Add a subscriber to this event -func (e *ReactorEvent) Add(ch chan React) { - e.mut.Lock() - defer e.mut.Unlock() - - e.chans = append(e.chans, ch) -} - -// Remove a subscriber -func (e *ReactorEvent) Remove(ch chan React) { - e.mut.Lock() - defer e.mut.Unlock() - - for i, c := range e.chans { - if c == ch { - e.chans = append(e.chans[:i], e.chans[i+1:]...) - } - } -} - -// Basic reactor resource -type React struct { - Resource interface{} - Event string -} - -// The reactor basic engine. Acts as bridge -// between the events and the subscribers/posters -type ReactorEngine struct { - patterns map[string]*ReactorEvent -} - -func NewReactorEngine() *ReactorEngine { - return &ReactorEngine{patterns: make(map[string]*ReactorEvent)} -} - -// Subscribe a channel to the specified event -func (reactor *ReactorEngine) Subscribe(event string, ch chan React) { - ev := reactor.patterns[event] - // Create a new event if one isn't available - if ev == nil { - ev = &ReactorEvent{event: event} - reactor.patterns[event] = ev - } - - // Add the channel to reactor event handler - ev.Add(ch) -} - -func (reactor *ReactorEngine) Unsubscribe(event string, ch chan React) { - ev := reactor.patterns[event] - if ev != nil { - ev.Remove(ch) - } -} - -func (reactor *ReactorEngine) Post(event string, resource interface{}) { - ev := reactor.patterns[event] - if ev != nil { - ev.Post(React{Resource: resource, Event: event}) - } -} diff --git a/ethutil/reactor_test.go b/ethutil/reactor_test.go deleted file mode 100644 index 48c2f0df35..0000000000 --- a/ethutil/reactor_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package ethutil - -import "testing" - -func TestReactorAdd(t *testing.T) { - engine := NewReactorEngine() - ch := make(chan React) - engine.Subscribe("test", ch) - if len(engine.patterns) != 1 { - t.Error("Expected patterns to be 1, got", len(engine.patterns)) - } -} - -func TestReactorEvent(t *testing.T) { - engine := NewReactorEngine() - - // Buffer 1, so it doesn't block for this test - ch := make(chan React, 1) - engine.Subscribe("test", ch) - engine.Post("test", "hello") - - value := <-ch - if val, ok := value.Resource.(string); ok { - if val != "hello" { - t.Error("Expected Resource to be 'hello', got", val) - } - } else { - t.Error("Unable to cast") - } -} From 017d36e6b2e127084448dfb38bd1b8de7424e1c9 Mon Sep 17 00:00:00 2001 From: zelig Date: Tue, 15 Jul 2014 01:12:45 +0100 Subject: [PATCH 7/9] properly unsubscribe react channels when miner stops - fixes write on closed chan crash --- ethminer/miner.go | 58 +++++++++++++++++++++++++---------------------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/ethminer/miner.go b/ethminer/miner.go index 8224c5441b..5151ee8852 100644 --- a/ethminer/miner.go +++ b/ethminer/miner.go @@ -25,30 +25,10 @@ type Miner struct { } func NewDefaultMiner(coinbase []byte, ethereum ethchain.EthManager) Miner { - reactChan := make(chan ethreact.Event, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in - powChan := make(chan []byte, 1) // This is the channel that receives valid sha hases for a given block - powQuitChan := make(chan ethreact.Event, 1) // This is the channel that can exit the miner thread - quitChan := make(chan bool, 1) - - ethereum.Reactor().Subscribe("newBlock", reactChan) - ethereum.Reactor().Subscribe("newTx:pre", reactChan) - - // We need the quit chan to be a Reactor event. - // The POW search method is actually blocking and if we don't - // listen to the reactor events inside of the pow itself - // The miner overseer will never get the reactor events themselves - // Only after the miner will find the sha - ethereum.Reactor().Subscribe("newBlock", powQuitChan) - ethereum.Reactor().Subscribe("newTx:pre", powQuitChan) - miner := Miner{ - pow: ðchain.EasyPow{}, - ethereum: ethereum, - coinbase: coinbase, - reactChan: reactChan, - powChan: powChan, - powQuitChan: powQuitChan, - quitChan: quitChan, + pow: ðchain.EasyPow{}, + ethereum: ethereum, + coinbase: coinbase, } // Insert initial TXs in our little miner 'pool' @@ -59,9 +39,27 @@ func NewDefaultMiner(coinbase []byte, ethereum ethchain.EthManager) Miner { } func (miner *Miner) Start() { + miner.reactChan = make(chan ethreact.Event, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in + miner.powChan = make(chan []byte, 1) // This is the channel that receives valid sha hashes for a given block + miner.powQuitChan = make(chan ethreact.Event, 1) // This is the channel that can exit the miner thread + miner.quitChan = make(chan bool, 1) + // Prepare inital block //miner.ethereum.StateManager().Prepare(miner.block.State(), miner.block.State()) go miner.listener() + + reactor := miner.ethereum.Reactor() + reactor.Subscribe("newBlock", miner.reactChan) + reactor.Subscribe("newTx:pre", miner.reactChan) + + // We need the quit chan to be a Reactor event. + // The POW search method is actually blocking and if we don't + // listen to the reactor events inside of the pow itself + // The miner overseer will never get the reactor events themselves + // Only after the miner will find the sha + reactor.Subscribe("newBlock", miner.powQuitChan) + reactor.Subscribe("newTx:pre", miner.powQuitChan) + logger.Infoln("Started") } @@ -127,12 +125,18 @@ out: } } -func (self *Miner) Stop() { +func (miner *Miner) Stop() { logger.Infoln("Stopping...") - self.quitChan <- true + miner.quitChan <- true + + reactor := miner.ethereum.Reactor() + reactor.Unsubscribe("newBlock", miner.powQuitChan) + reactor.Unsubscribe("newTx:pre", miner.powQuitChan) + reactor.Unsubscribe("newBlock", miner.reactChan) + reactor.Unsubscribe("newTx:pre", miner.reactChan) - close(self.powQuitChan) - close(self.quitChan) + close(miner.powQuitChan) + close(miner.quitChan) } func (self *Miner) mineNewBlock() { From 67528cf9709519458ab5500cb7cf54664dd20167 Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 21 Jul 2014 15:10:56 +0100 Subject: [PATCH 8/9] ethreact/README.md --- ethreact/README.md | 28 ++++++++++++++++++++++++++++ ethreact/reactor.go | 2 +- 2 files changed, 29 insertions(+), 1 deletion(-) create mode 100644 ethreact/README.md diff --git a/ethreact/README.md b/ethreact/README.md new file mode 100644 index 0000000000..61af8a5729 --- /dev/null +++ b/ethreact/README.md @@ -0,0 +1,28 @@ +## Reactor + +Reactor is the internal broadcast engine that allows components to be notified of ethereum stack events such as finding new blocks or change in state. +Event notification is handled via subscription: + + var blockChan = make(chan ethreact.Event, 10) + reactor.Subscribe("newBlock", blockChan) + +ethreact.Event broadcast on the channel are + + type Event struct { + Resource interface{} + Name string + } + +Resource is polimorphic depending on the event type and should be typecast before use, e.g: + + b := <-blockChan: + block := b.Resource.(*ethchain.Block) + +Events are guaranteed to be broadcast in order but the broadcast never blocks or leaks which means while the subscribing event channel is blocked (e.g., full if buffered) further messages will be skipped. + +The engine allows arbitrary events to be posted and subscribed to. + + ethereum.Reactor().Post("newBlock", newBlock) + + + \ No newline at end of file diff --git a/ethreact/reactor.go b/ethreact/reactor.go index a26f82a97f..7fe2356db7 100644 --- a/ethreact/reactor.go +++ b/ethreact/reactor.go @@ -58,7 +58,7 @@ func (e *EventHandler) Remove(ch chan Event) int { return len(e.chans) } -// Basic reactor resource +// Basic reactor event type Event struct { Resource interface{} Name string From 194c58858cd230a9a08b0eb14650720341a5580e Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 21 Jul 2014 19:12:04 +0100 Subject: [PATCH 9/9] send zero event to miner.powQuitChan fixes miner hanging --- ethminer/miner.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ethminer/miner.go b/ethminer/miner.go index 8659d28894..e51b37e057 100644 --- a/ethminer/miner.go +++ b/ethminer/miner.go @@ -133,6 +133,9 @@ func (miner *Miner) listener() { func (miner *Miner) Stop() { logger.Infoln("Stopping...") + + miner.powQuitChan <- ethreact.Event{} + status := make(chan error) miner.quitChan <- status <-status @@ -143,9 +146,6 @@ func (miner *Miner) Stop() { reactor.Unsubscribe("newBlock", miner.reactChan) reactor.Unsubscribe("newTx:pre", miner.reactChan) - close(miner.powQuitChan) - close(miner.quitChan) - reactor.Post("miner:stop", miner) }