Merge pull request #58 from fjl/feature/event

Blocking event package
pull/150/head
Jeffrey Wilcke 10 years ago
commit df2b70853f
  1. 7
      ethchain/dagger.go
  2. 10
      ethchain/events.go
  3. 2
      ethchain/filter_test.go
  4. 18
      ethchain/helper_test.go
  5. 55
      ethchain/state_manager.go
  6. 3
      ethchain/transaction_pool.go
  7. 73
      ethereum.go
  8. 137
      ethminer/miner.go
  9. 28
      ethreact/README.md
  10. 183
      ethreact/reactor.go
  11. 63
      ethreact/reactor_test.go
  12. 183
      event/event.go
  13. 176
      event/event_test.go
  14. 42
      event/example_test.go
  15. 83
      eventer/eventer.go
  16. 113
      eventer/eventer_test.go
  17. 11
      events.go
  18. 2
      peer.go

@ -8,7 +8,6 @@ import (
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethutil"
"github.com/obscuren/sha3"
)
@ -16,7 +15,7 @@ import (
var powlogger = ethlog.NewLogger("POW")
type PoW interface {
Search(block *Block, reactChan chan ethreact.Event) []byte
Search(block *Block, stop <-chan struct{}) []byte
Verify(hash []byte, diff *big.Int, nonce []byte) bool
GetHashrate() int64
Turbo(bool)
@ -36,7 +35,7 @@ func (pow *EasyPow) Turbo(on bool) {
pow.turbo = on
}
func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte {
func (pow *EasyPow) Search(block *Block, stop <-chan struct{}) []byte {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
hash := block.HashNoNonce()
diff := block.Difficulty
@ -46,7 +45,7 @@ func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte {
for {
select {
case <-reactChan:
case <-stop:
powlogger.Infoln("Breaking from mining")
return nil
default:

@ -0,0 +1,10 @@
package ethchain
type TxEvent struct {
Type int // TxPre || TxPost
Tx *Transaction
}
type NewBlockEvent struct {
Block *Block
}

@ -3,5 +3,5 @@ package ethchain
import "testing"
func TestFilter(t *testing.T) {
filter := NewFilter()
NewFilter(NewTestManager())
}

@ -6,16 +6,17 @@ import (
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethdb"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire"
"github.com/ethereum/eth-go/event"
)
// Implement our EthTest Manager
type TestManager struct {
stateManager *StateManager
reactor *ethreact.ReactorEngine
eventMux *event.TypeMux
db ethutil.Database
txPool *TxPool
blockChain *BlockChain
Blocks []*Block
@ -49,8 +50,8 @@ func (tm *TestManager) StateManager() *StateManager {
return tm.stateManager
}
func (tm *TestManager) Reactor() *ethreact.ReactorEngine {
return tm.reactor
func (tm *TestManager) EventMux() *event.TypeMux {
return tm.eventMux
}
func (tm *TestManager) Broadcast(msgType ethwire.MsgType, data []interface{}) {
fmt.Println("Broadcast not implemented")
@ -63,7 +64,10 @@ func (tm *TestManager) KeyManager() *ethcrypto.KeyManager {
return nil
}
func (tm *TestManager) Db() ethutil.Database { return nil }
func (tm *TestManager) Db() ethutil.Database {
return tm.db
}
func NewTestManager() *TestManager {
ethutil.ReadConfig(".ethtest", "/tmp/ethtest", "ETH")
@ -75,8 +79,8 @@ func NewTestManager() *TestManager {
ethutil.Config.Db = db
testManager := &TestManager{}
testManager.reactor = ethreact.New()
testManager.eventMux = new(event.TypeMux)
testManager.db = db
testManager.txPool = NewTxPool(testManager)
testManager.blockChain = NewBlockChain(testManager)
testManager.stateManager = NewStateManager(testManager)

@ -11,11 +11,10 @@ import (
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethstate"
"github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire"
"github.com/ethereum/eth-go/eventer"
"github.com/ethereum/eth-go/event"
)
var statelogger = ethlog.NewLogger("STATE")
@ -37,7 +36,6 @@ type EthManager interface {
BlockChain() *BlockChain
TxPool() *TxPool
Broadcast(msgType ethwire.MsgType, data []interface{})
Reactor() *ethreact.ReactorEngine
PeerCount() int
IsMining() bool
IsListening() bool
@ -45,7 +43,7 @@ type EthManager interface {
KeyManager() *ethcrypto.KeyManager
ClientIdentity() ethwire.ClientIdentity
Db() ethutil.Database
Eventer() *eventer.EventMachine
EventMux() *event.TypeMux
}
type StateManager struct {
@ -73,17 +71,15 @@ type StateManager struct {
// 'Process' & canonical validation.
lastAttemptedBlock *Block
// Quit chan
quit chan bool
events event.Subscription
}
func NewStateManager(ethereum EthManager) *StateManager {
sm := &StateManager{
mem: make(map[string]*big.Int),
Pow: &EasyPow{},
eth: ethereum,
bc: ethereum.BlockChain(),
quit: make(chan bool),
mem: make(map[string]*big.Int),
Pow: &EasyPow{},
eth: ethereum,
bc: ethereum.BlockChain(),
}
sm.transState = ethereum.BlockChain().CurrentBlock.State().Copy()
sm.miningState = ethereum.BlockChain().CurrentBlock.State().Copy()
@ -93,36 +89,25 @@ func NewStateManager(ethereum EthManager) *StateManager {
func (self *StateManager) Start() {
statelogger.Debugln("Starting state manager")
self.events = self.eth.EventMux().Subscribe(Blocks(nil))
go self.updateThread()
}
func (self *StateManager) Stop() {
statelogger.Debugln("Stopping state manager")
close(self.quit)
self.events.Unsubscribe()
}
func (self *StateManager) updateThread() {
blockChan := self.eth.Eventer().Register("blocks")
out:
for {
select {
case event := <-blockChan:
blocks := event.Data.(Blocks)
for _, block := range blocks {
err := self.Process(block, false)
if err != nil {
statelogger.Infoln(err)
statelogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4])
statelogger.Debugln(block)
break
}
for ev := range self.events.Chan() {
for _, block := range ev.(Blocks) {
err := self.Process(block, false)
if err != nil {
statelogger.Infoln(err)
statelogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4])
statelogger.Debugln(block)
break
}
case <-self.quit:
break out
}
}
}
@ -202,7 +187,7 @@ done:
}
// Notify all subscribers
self.eth.Reactor().Post("newTx:post", tx)
self.eth.EventMux().Post(TxEvent{TxPost, tx})
receipts = append(receipts, receipt)
handled = append(handled, tx)
@ -293,7 +278,7 @@ func (sm *StateManager) Process(block *Block, dontReact bool) (err error) {
statelogger.Infof("Imported block #%d (%x...)\n", block.Number, block.Hash()[0:4])
if dontReact == false {
sm.eth.Reactor().Post("newBlock", block)
sm.eth.EventMux().Post(NewBlockEvent{block})
state.Manifest().Reset()
}
@ -434,7 +419,7 @@ func (sm *StateManager) createBloomFilter(state *ethstate.State) *BloomFilter {
bloomf.Set(msg.From)
}
sm.eth.Reactor().Post("messages", state.Manifest().Messages)
sm.eth.EventMux().Post(state.Manifest().Messages)
return bloomf
}

@ -24,6 +24,7 @@ type TxMsgTy byte
const (
TxPre = iota
TxPost
minGasPrice = 1000000
)
@ -160,7 +161,7 @@ out:
txplogger.Debugf("(t) %x => %x (%v) %x\n", tx.Sender()[:4], tmp, tx.Value, tx.Hash())
// Notify the subscribers
pool.Ethereum.Reactor().Post("newTx:pre", tx)
pool.Ethereum.EventMux().Post(TxEvent{TxPre, tx})
}
case <-pool.quit:
break out

@ -17,12 +17,11 @@ import (
"github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethrpc"
"github.com/ethereum/eth-go/ethstate"
"github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire"
"github.com/ethereum/eth-go/eventer"
"github.com/ethereum/eth-go/event"
)
const (
@ -60,7 +59,7 @@ type Ethereum struct {
// The block pool
blockPool *BlockPool
// Eventer
eventer *eventer.EventMachine
eventMux event.TypeMux
// Peers
peers *list.List
// Nonce
@ -85,8 +84,6 @@ type Ethereum struct {
listening bool
reactor *ethreact.ReactorEngine
RpcServer *ethrpc.JsonRpcServer
keyManager *ethcrypto.KeyManager
@ -129,8 +126,6 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
isUpToDate: true,
filters: make(map[int]*ethchain.Filter),
}
ethereum.reactor = ethreact.New()
ethereum.eventer = eventer.New()
ethereum.blockPool = NewBlockPool(ethereum)
ethereum.txPool = ethchain.NewTxPool(ethereum)
@ -143,10 +138,6 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
return ethereum, nil
}
func (s *Ethereum) Reactor() *ethreact.ReactorEngine {
return s.reactor
}
func (s *Ethereum) KeyManager() *ethcrypto.KeyManager {
return s.keyManager
}
@ -169,8 +160,8 @@ func (s *Ethereum) TxPool() *ethchain.TxPool {
func (s *Ethereum) BlockPool() *BlockPool {
return s.blockPool
}
func (s *Ethereum) Eventer() *eventer.EventMachine {
return s.eventer
func (s *Ethereum) EventMux() *event.TypeMux {
return &s.eventMux
}
func (self *Ethereum) Db() ethutil.Database {
return self.db
@ -376,7 +367,7 @@ func (s *Ethereum) removePeerElement(e *list.Element) {
s.peers.Remove(e)
s.reactor.Post("peerList", s.peers)
s.eventMux.Post(PeerListEvent{s.peers})
}
func (s *Ethereum) RemovePeer(p *Peer) {
@ -400,7 +391,6 @@ func (s *Ethereum) reapDeadPeerHandler() {
// Start the ethereum
func (s *Ethereum) Start(seed bool) {
s.reactor.Start()
s.blockPool.Start()
s.stateManager.Start()
@ -524,8 +514,7 @@ func (s *Ethereum) Stop() {
}
s.txPool.Stop()
s.stateManager.Stop()
s.reactor.Flush()
s.reactor.Stop()
s.eventMux.Stop()
s.blockPool.Stop()
ethlogger.Infoln("Server stopped")
@ -584,10 +573,10 @@ out:
select {
case <-upToDateTimer.C:
if self.IsUpToDate() && !self.isUpToDate {
self.reactor.Post("chainSync", false)
self.eventMux.Post(ChainSyncEvent{false})
self.isUpToDate = true
} else if !self.IsUpToDate() && self.isUpToDate {
self.reactor.Post("chainSync", true)
self.eventMux.Post(ChainSyncEvent{true})
self.isUpToDate = false
}
case <-self.quit:
@ -623,40 +612,30 @@ func (self *Ethereum) GetFilter(id int) *ethchain.Filter {
}
func (self *Ethereum) filterLoop() {
blockChan := make(chan ethreact.Event, 5)
messageChan := make(chan ethreact.Event, 5)
// Subscribe to events
reactor := self.Reactor()
reactor.Subscribe("newBlock", blockChan)
reactor.Subscribe("messages", messageChan)
out:
for {
select {
case <-self.quit:
break out
case block := <-blockChan:
if block, ok := block.Resource.(*ethchain.Block); ok {
self.filterMu.RLock()
for _, filter := range self.filters {
if filter.BlockCallback != nil {
filter.BlockCallback(block)
}
events := self.eventMux.Subscribe(ethchain.NewBlockEvent{}, ethstate.Messages(nil))
for event := range events.Chan() {
switch event := event.(type) {
case ethchain.NewBlockEvent:
self.filterMu.RLock()
for _, filter := range self.filters {
if filter.BlockCallback != nil {
filter.BlockCallback(event.Block)
}
self.filterMu.RUnlock()
}
case msg := <-messageChan:
if messages, ok := msg.Resource.(ethstate.Messages); ok {
self.filterMu.RLock()
for _, filter := range self.filters {
if filter.MessageCallback != nil {
msgs := filter.FilterMessages(messages)
if len(msgs) > 0 {
filter.MessageCallback(msgs)
}
self.filterMu.RUnlock()
case ethstate.Messages:
self.filterMu.RLock()
for _, filter := range self.filters {
if filter.MessageCallback != nil {
msgs := filter.FilterMessages(event)
if len(msgs) > 0 {
filter.MessageCallback(msgs)
}
}
self.filterMu.RUnlock()
}
self.filterMu.RUnlock()
}
}
}

@ -6,27 +6,37 @@ import (
"github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethwire"
"github.com/ethereum/eth-go/event"
)
var logger = ethlog.NewLogger("MINER")
type Miner struct {
pow ethchain.PoW
ethereum ethchain.EthManager
coinbase []byte
reactChan chan ethreact.Event
txs ethchain.Transactions
uncles []*ethchain.Block
block *ethchain.Block
powChan chan []byte
powQuitChan chan ethreact.Event
quitChan chan chan error
pow ethchain.PoW
ethereum ethchain.EthManager
coinbase []byte
txs ethchain.Transactions
uncles []*ethchain.Block
block *ethchain.Block
events event.Subscription
powQuitChan chan struct{}
powDone chan struct{}
turbo bool
}
const (
Started = iota
Stopped
)
type Event struct {
Type int // Started || Stopped
Miner *Miner
}
func (self *Miner) GetPow() ethchain.PoW {
return self.pow
}
@ -48,46 +58,42 @@ func (self *Miner) ToggleTurbo() {
}
func (miner *Miner) Start() {
miner.reactChan = make(chan ethreact.Event, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in
miner.powChan = make(chan []byte, 1) // This is the channel that receives valid sha hashes for a given block
miner.powQuitChan = make(chan ethreact.Event, 1) // This is the channel that can exit the miner thread
miner.quitChan = make(chan chan error, 1)
// Insert initial TXs in our little miner 'pool'
miner.txs = miner.ethereum.TxPool().Flush()
miner.block = miner.ethereum.BlockChain().NewBlock(miner.coinbase)
mux := miner.ethereum.EventMux()
miner.events = mux.Subscribe(ethchain.NewBlockEvent{}, ethchain.TxEvent{})
// Prepare inital block
//miner.ethereum.StateManager().Prepare(miner.block.State(), miner.block.State())
go miner.listener()
reactor := miner.ethereum.Reactor()
reactor.Subscribe("newBlock", miner.reactChan)
reactor.Subscribe("newTx:pre", miner.reactChan)
// We need the quit chan to be a Reactor event.
// The POW search method is actually blocking and if we don't
// listen to the reactor events inside of the pow itself
// The miner overseer will never get the reactor events themselves
// Only after the miner will find the sha
reactor.Subscribe("newBlock", miner.powQuitChan)
reactor.Subscribe("newTx:pre", miner.powQuitChan)
logger.Infoln("Started")
mux.Post(Event{Started, miner})
}
reactor.Post("miner:start", miner)
func (miner *Miner) Stop() {
logger.Infoln("Stopping...")
miner.events.Unsubscribe()
miner.ethereum.EventMux().Post(Event{Stopped, miner})
}
func (miner *Miner) listener() {
for {
miner.startMining()
select {
case status := <-miner.quitChan:
logger.Infoln("Stopped")
status <- nil
return
case chanMessage := <-miner.reactChan:
case event, isopen := <-miner.events.Chan():
miner.stopMining()
if !isopen {
return
}
if block, ok := chanMessage.Resource.(*ethchain.Block); ok {
switch event := event.(type) {
case ethchain.NewBlockEvent:
block := event.Block
//logger.Infoln("Got new block via Reactor")
if bytes.Compare(miner.ethereum.BlockChain().CurrentBlock.Hash(), block.Hash()) == 0 {
// TODO: Perhaps continue mining to get some uncle rewards
@ -117,49 +123,44 @@ func (miner *Miner) listener() {
miner.uncles = append(miner.uncles, block)
}
}
}
if tx, ok := chanMessage.Resource.(*ethchain.Transaction); ok {
found := false
for _, ctx := range miner.txs {
if found = bytes.Compare(ctx.Hash(), tx.Hash()) == 0; found {
break
case ethchain.TxEvent:
if event.Type == ethchain.TxPre {
found := false
for _, ctx := range miner.txs {
if found = bytes.Compare(ctx.Hash(), event.Tx.Hash()) == 0; found {
break
}
}
if found == false {
// Undo all previous commits
miner.block.Undo()
// Apply new transactions
miner.txs = append(miner.txs, event.Tx)
}
}
if found == false {
// Undo all previous commits
miner.block.Undo()
// Apply new transactions
miner.txs = append(miner.txs, tx)
}
}
default:
miner.mineNewBlock()
case <-miner.powDone:
// next iteration will start mining again
}
}
}
func (miner *Miner) Stop() {
logger.Infoln("Stopping...")
miner.powQuitChan <- ethreact.Event{}
status := make(chan error)
miner.quitChan <- status
<-status
reactor := miner.ethereum.Reactor()
reactor.Unsubscribe("newBlock", miner.powQuitChan)
reactor.Unsubscribe("newTx:pre", miner.powQuitChan)
reactor.Unsubscribe("newBlock", miner.reactChan)
reactor.Unsubscribe("newTx:pre", miner.reactChan)
func (miner *Miner) startMining() {
if miner.powDone == nil {
miner.powDone = make(chan struct{})
}
miner.powQuitChan = make(chan struct{})
go miner.mineNewBlock()
}
reactor.Post("miner:stop", miner)
func (miner *Miner) stopMining() {
close(miner.powQuitChan)
<-miner.powDone
}
func (self *Miner) mineNewBlock() {
stateManager := self.ethereum.StateManager()
self.block = self.ethereum.BlockChain().NewBlock(self.coinbase)
@ -195,8 +196,9 @@ func (self *Miner) mineNewBlock() {
logger.Infof("Mining on block. Includes %v transactions", len(self.txs))
// Find a valid nonce
self.block.Nonce = self.pow.Search(self.block, self.powQuitChan)
if self.block.Nonce != nil {
nonce := self.pow.Search(self.block, self.powQuitChan)
if nonce != nil {
self.block.Nonce = nonce
err := self.ethereum.StateManager().Process(self.block, false)
if err != nil {
logger.Infoln(err)
@ -208,4 +210,5 @@ func (self *Miner) mineNewBlock() {
self.txs = self.ethereum.TxPool().CurrentTransactions()
}
}
self.powDone <- struct{}{}
}

@ -1,28 +0,0 @@
## Reactor
Reactor is the internal broadcast engine that allows components to be notified of ethereum stack events such as finding new blocks or change in state.
Event notification is handled via subscription:
var blockChan = make(chan ethreact.Event, 10)
reactor.Subscribe("newBlock", blockChan)
ethreact.Event broadcast on the channel are
type Event struct {
Resource interface{}
Name string
}
Resource is polimorphic depending on the event type and should be typecast before use, e.g:
b := <-blockChan:
block := b.Resource.(*ethchain.Block)
Events are guaranteed to be broadcast in order but the broadcast never blocks or leaks which means while the subscribing event channel is blocked (e.g., full if buffered) further messages will be skipped.
The engine allows arbitrary events to be posted and subscribed to.
ethereum.Reactor().Post("newBlock", newBlock)

@ -1,183 +0,0 @@
package ethreact
import (
"sync"
"github.com/ethereum/eth-go/ethlog"
)
var logger = ethlog.NewLogger("REACTOR")
const (
eventBufferSize int = 10
)
type EventHandler struct {
lock sync.RWMutex
name string
chans []chan Event
}
// Post the Event with the reactor resource on the channels
// currently subscribed to the event
func (e *EventHandler) Post(event Event) {
e.lock.RLock()
defer e.lock.RUnlock()
// if we want to preserve order pushing to subscibed channels
// dispatching should be syncrounous
// this means if subscribed event channel is blocked
// the reactor dispatch will be blocked, so we need to mitigate by skipping
// rogue blocking subscribers
for i, ch := range e.chans {
select {
case ch <- event:
default:
logger.Debugf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name)
}
}
}
// Add a subscriber to this event
func (e *EventHandler) Add(ch chan Event) {
e.lock.Lock()
defer e.lock.Unlock()
e.chans = append(e.chans, ch)
}
// Remove a subscriber
func (e *EventHandler) Remove(ch chan Event) int {
e.lock.Lock()
defer e.lock.Unlock()
for i, c := range e.chans {
if c == ch {
e.chans = append(e.chans[:i], e.chans[i+1:]...)
}
}
return len(e.chans)
}
// Basic reactor event
type Event struct {
Resource interface{}
Name string
}
// The reactor basic engine. Acts as bridge
// between the events and the subscribers/posters
type ReactorEngine struct {
lock sync.RWMutex
eventChannel chan Event
eventHandlers map[string]*EventHandler
quit chan chan error
running bool
drained chan bool
}
func New() *ReactorEngine {
return &ReactorEngine{
eventHandlers: make(map[string]*EventHandler),
eventChannel: make(chan Event, eventBufferSize),
quit: make(chan chan error, 1),
drained: make(chan bool, 1),
}
}
func (reactor *ReactorEngine) Start() {
reactor.lock.Lock()
defer reactor.lock.Unlock()
if !reactor.running {
go func() {
for {
select {
case status := <-reactor.quit:
reactor.lock.Lock()
defer reactor.lock.Unlock()
reactor.running = false
logger.Infoln("stopped")
status <- nil
return
case event := <-reactor.eventChannel:
// needs to be called syncronously to keep order of events
reactor.dispatch(event)
default:
reactor.drained <- true // blocking till message is coming in
}
}
}()
reactor.running = true
logger.Infoln("started")
}
}
func (reactor *ReactorEngine) Stop() {
if reactor.running {
status := make(chan error)
reactor.quit <- status
select {
case <-reactor.drained:
default:
}
<-status
}
}
func (reactor *ReactorEngine) Flush() {
<-reactor.drained
}
// Subscribe a channel to the specified event
func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) {
reactor.lock.Lock()
defer reactor.lock.Unlock()
eventHandler := reactor.eventHandlers[event]
// Create a new event handler if one isn't available
if eventHandler == nil {
eventHandler = &EventHandler{name: event}
reactor.eventHandlers[event] = eventHandler
}
// Add the events channel to reactor event handler
eventHandler.Add(eventChannel)
logger.Debugf("added new subscription to %s", event)
}
func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) {
reactor.lock.Lock()
defer reactor.lock.Unlock()
eventHandler := reactor.eventHandlers[event]
if eventHandler != nil {
len := eventHandler.Remove(eventChannel)
if len == 0 {
reactor.eventHandlers[event] = nil
}
logger.Debugf("removed subscription to %s", event)
}
}
func (reactor *ReactorEngine) Post(event string, resource interface{}) {
reactor.lock.Lock()
defer reactor.lock.Unlock()
if reactor.running {
reactor.eventChannel <- Event{Resource: resource, Name: event}
select {
case <-reactor.drained:
default:
}
}
}
func (reactor *ReactorEngine) dispatch(event Event) {
name := event.Name
eventHandler := reactor.eventHandlers[name]
// if no subscriptions to this event type - no event handler created
// then noone to notify
if eventHandler != nil {
// needs to be called syncronously
eventHandler.Post(event)
}
}

@ -1,63 +0,0 @@
package ethreact
import (
"fmt"
"testing"
)
func TestReactorAdd(t *testing.T) {
reactor := New()
ch := make(chan Event)
reactor.Subscribe("test", ch)
if reactor.eventHandlers["test"] == nil {
t.Error("Expected new eventHandler to be created")
}
reactor.Unsubscribe("test", ch)
if reactor.eventHandlers["test"] != nil {
t.Error("Expected eventHandler to be removed")
}
}
func TestReactorEvent(t *testing.T) {
var name string
reactor := New()
// Buffer the channel, so it doesn't block for this test
cap := 20
ch := make(chan Event, cap)
reactor.Subscribe("even", ch)
reactor.Subscribe("odd", ch)
reactor.Post("even", "disappears") // should not broadcast if engine not started
reactor.Start()
for i := 0; i < cap; i++ {
if i%2 == 0 {
name = "even"
} else {
name = "odd"
}
reactor.Post(name, i)
}
reactor.Post("test", cap) // this should not block
i := 0
reactor.Flush()
close(ch)
for event := range ch {
fmt.Printf("%d: %v", i, event)
if i%2 == 0 {
name = "even"
} else {
name = "odd"
}
if val, ok := event.Resource.(int); ok {
if i != val || event.Name != name {
t.Error("Expected event %d to be of type %s and resource %d, got ", i, name, i, val)
}
} else {
t.Error("Unable to cast")
}
i++
}
if i != cap {
t.Error("excpected exactly %d events, got ", i)
}
reactor.Stop()
}

@ -0,0 +1,183 @@
// Package event implements an event multiplexer.
package event
import (
"errors"
"fmt"
"reflect"
"sync"
)
// Subscription is implemented by event subscriptions.
type Subscription interface {
// Chan returns a channel that carries events.
// Implementations should return the same channel
// for any subsequent calls to Chan.
Chan() <-chan interface{}
// Unsubscribe stops delivery of events to a subscription.
// The event channel is closed.
// Unsubscribe can be called more than once.
Unsubscribe()
}
// A TypeMux dispatches events to registered receivers. Receivers can be
// registered to handle events of certain type. Any operation
// called after mux is stopped will return ErrMuxClosed.
//
// The zero value is ready to use.
type TypeMux struct {
mutex sync.RWMutex
subm map[reflect.Type][]*muxsub
stopped bool
}
// ErrMuxClosed is returned when Posting on a closed TypeMux.
var ErrMuxClosed = errors.New("event: mux closed")
// Subscribe creates a subscription for events of the given types. The
// subscription's channel is closed when it is unsubscribed
// or the mux is closed.
func (mux *TypeMux) Subscribe(types ...interface{}) Subscription {
sub := newsub(mux)
mux.mutex.Lock()
defer mux.mutex.Unlock()
if mux.stopped {
close(sub.postC)
} else {
if mux.subm == nil {
mux.subm = make(map[reflect.Type][]*muxsub)
}
for _, t := range types {
rtyp := reflect.TypeOf(t)
oldsubs := mux.subm[rtyp]
if find(oldsubs, sub) != -1 {
panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp))
}
subs := make([]*muxsub, len(oldsubs)+1)
copy(subs, oldsubs)
subs[len(oldsubs)] = sub
mux.subm[rtyp] = subs
}
}
return sub
}
// Post sends an event to all receivers registered for the given type.
// It returns ErrMuxClosed if the mux has been stopped.
func (mux *TypeMux) Post(ev interface{}) error {
rtyp := reflect.TypeOf(ev)
mux.mutex.RLock()
if mux.stopped {
mux.mutex.RUnlock()
return ErrMuxClosed
}
subs := mux.subm[rtyp]
mux.mutex.RUnlock()
for _, sub := range subs {
sub.deliver(ev)
}
return nil
}
// Stop closes a mux. The mux can no longer be used.
// Future Post calls will fail with ErrMuxClosed.
// Stop blocks until all current deliveries have finished.
func (mux *TypeMux) Stop() {
mux.mutex.Lock()
for _, subs := range mux.subm {
for _, sub := range subs {
sub.closewait()
}
}
mux.subm = nil
mux.stopped = true
mux.mutex.Unlock()
}
func (mux *TypeMux) del(s *muxsub) {
mux.mutex.Lock()
for typ, subs := range mux.subm {
if pos := find(subs, s); pos >= 0 {
if len(subs) == 1 {
delete(mux.subm, typ)
} else {
mux.subm[typ] = posdelete(subs, pos)
}
}
}
s.mux.mutex.Unlock()
}
func find(slice []*muxsub, item *muxsub) int {
for i, v := range slice {
if v == item {
return i
}
}
return -1
}
func posdelete(slice []*muxsub, pos int) []*muxsub {
news := make([]*muxsub, len(slice)-1)
copy(news[:pos], slice[:pos])
copy(news[pos:], slice[pos+1:])
return news
}
type muxsub struct {
mux *TypeMux
closeMu sync.Mutex
closing chan struct{}
closed bool
// these two are the same channel. they are stored separately so
// postC can be set to nil without affecting the return value of
// Chan.
postMu sync.RWMutex
readC <-chan interface{}
postC chan<- interface{}
}
func newsub(mux *TypeMux) *muxsub {
c := make(chan interface{})
return &muxsub{
mux: mux,
readC: c,
postC: c,
closing: make(chan struct{}),
}
}
func (s *muxsub) Chan() <-chan interface{} {
return s.readC
}
func (s *muxsub) Unsubscribe() {
s.mux.del(s)
s.closewait()
}
func (s *muxsub) closewait() {
s.closeMu.Lock()
defer s.closeMu.Unlock()
if s.closed {
return
}
close(s.closing)
s.closed = true
s.postMu.Lock()
close(s.postC)
s.postC = nil
s.postMu.Unlock()
}
func (s *muxsub) deliver(ev interface{}) {
s.postMu.RLock()
select {
case s.postC <- ev:
case <-s.closing:
}
s.postMu.RUnlock()
}

@ -0,0 +1,176 @@
package event
import (
"math/rand"
"sync"
"testing"
"time"
)
type testEvent int
func TestSub(t *testing.T) {
mux := new(TypeMux)
defer mux.Stop()
sub := mux.Subscribe(testEvent(0))
go func() {
if err := mux.Post(testEvent(5)); err != nil {
t.Errorf("Post returned unexpected error: %v", err)
}
}()
ev := <-sub.Chan()
if ev.(testEvent) != testEvent(5) {
t.Errorf("Got %v (%T), expected event %v (%T)",
ev, ev, testEvent(5), testEvent(5))
}
}
func TestMuxErrorAfterStop(t *testing.T) {
mux := new(TypeMux)
mux.Stop()
sub := mux.Subscribe(testEvent(0))
if _, isopen := <-sub.Chan(); isopen {
t.Errorf("subscription channel was not closed")
}
if err := mux.Post(testEvent(0)); err != ErrMuxClosed {
t.Errorf("Post error mismatch, got: %s, expected: %s", err, ErrMuxClosed)
}
}
func TestUnsubscribeUnblockPost(t *testing.T) {
mux := new(TypeMux)
defer mux.Stop()
sub := mux.Subscribe(testEvent(0))
unblocked := make(chan bool)
go func() {
mux.Post(testEvent(5))
unblocked <- true
}()
select {
case <-unblocked:
t.Errorf("Post returned before Unsubscribe")
default:
sub.Unsubscribe()
<-unblocked
}
}
func TestSubscribeDuplicateType(t *testing.T) {
mux := new(TypeMux)
expected := "event: duplicate type event.testEvent in Subscribe"
defer func() {
err := recover()
if err == nil {
t.Errorf("Subscribe didn't panic for duplicate type")
} else if err != expected {
t.Errorf("panic mismatch: got %#v, expected %#v", err, expected)
}
}()
mux.Subscribe(testEvent(1), testEvent(2))
}
func TestMuxConcurrent(t *testing.T) {
rand.Seed(time.Now().Unix())
mux := new(TypeMux)
defer mux.Stop()
recv := make(chan int)
poster := func() {
for {
err := mux.Post(testEvent(0))
if err != nil {
return
}
}
}
sub := func(i int) {
time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond)
sub := mux.Subscribe(testEvent(0))
<-sub.Chan()
sub.Unsubscribe()
recv <- i
}
go poster()
go poster()
go poster()
nsubs := 1000
for i := 0; i < nsubs; i++ {
go sub(i)
}
// wait until everyone has been served
counts := make(map[int]int, nsubs)
for i := 0; i < nsubs; i++ {
counts[<-recv]++
}
for i, count := range counts {
if count != 1 {
t.Errorf("receiver %d called %d times, expected only 1 call", i, count)
}
}
}
func emptySubscriber(mux *TypeMux, types ...interface{}) {
s := mux.Subscribe(testEvent(0))
go func() {
for _ = range s.Chan() {
}
}()
}
func BenchmarkPost3(b *testing.B) {
var mux = new(TypeMux)
defer mux.Stop()
emptySubscriber(mux, testEvent(0))
emptySubscriber(mux, testEvent(0))
emptySubscriber(mux, testEvent(0))
for i := 0; i < b.N; i++ {
mux.Post(testEvent(0))
}
}
func BenchmarkPostConcurrent(b *testing.B) {
var mux = new(TypeMux)
defer mux.Stop()
emptySubscriber(mux, testEvent(0))
emptySubscriber(mux, testEvent(0))
emptySubscriber(mux, testEvent(0))
var wg sync.WaitGroup
poster := func() {
for i := 0; i < b.N; i++ {
mux.Post(testEvent(0))
}
wg.Done()
}
wg.Add(5)
for i := 0; i < 5; i++ {
go poster()
}
wg.Wait()
}
// for comparison
func BenchmarkChanSend(b *testing.B) {
c := make(chan interface{})
closed := make(chan struct{})
go func() {
for _ = range c {
}
}()
for i := 0; i < b.N; i++ {
select {
case c <- i:
case <-closed:
}
}
}

@ -0,0 +1,42 @@
package event
import "fmt"
func ExampleTypeMux() {
type someEvent struct{ I int }
type otherEvent struct{ S string }
type yetAnotherEvent struct{ X, Y int }
var mux TypeMux
// Start a subscriber.
done := make(chan struct{})
sub := mux.Subscribe(someEvent{}, otherEvent{})
go func() {
for event := range sub.Chan() {
fmt.Printf("Received: %#v\n", event)
}
fmt.Println("done")
close(done)
}()
// Post some events.
mux.Post(someEvent{5})
mux.Post(yetAnotherEvent{X: 3, Y: 4})
mux.Post(someEvent{6})
mux.Post(otherEvent{"whoa"})
// Stop closes all subscription channels.
// The subscriber goroutine will print "done"
// and exit.
mux.Stop()
// Wait for subscriber to return.
<-done
// Output:
// Received: event.someEvent{I:5}
// Received: event.someEvent{I:6}
// Received: event.otherEvent{S:"whoa"}
// done
}

@ -1,83 +0,0 @@
package eventer
import "sync"
// Basic receiver interface.
type Receiver interface {
Send(Event)
}
// Receiver as channel
type Channel chan Event
func (self Channel) Send(ev Event) {
self <- ev
}
// Receiver as function
type Function func(ev Event)
func (self Function) Send(ev Event) {
self(ev)
}
type Event struct {
Type string
Data interface{}
}
type Channels map[string][]Receiver
type EventMachine struct {
mu sync.RWMutex
channels Channels
}
func New() *EventMachine {
return &EventMachine{channels: make(Channels)}
}
func (self *EventMachine) add(typ string, r Receiver) {
self.mu.Lock()
self.channels[typ] = append(self.channels[typ], r)
self.mu.Unlock()
}
// Generalised methods for the known receiver types
// * Channel
// * Function
func (self *EventMachine) On(typ string, r interface{}) {
if eventFunc, ok := r.(func(Event)); ok {
self.RegisterFunc(typ, eventFunc)
} else if eventChan, ok := r.(Channel); ok {
self.RegisterChannel(typ, eventChan)
} else {
panic("Invalid type for EventMachine::On")
}
}
func (self *EventMachine) RegisterChannel(typ string, c Channel) {
self.add(typ, c)
}
func (self *EventMachine) RegisterFunc(typ string, f Function) {
self.add(typ, f)
}
func (self *EventMachine) Register(typ string) Channel {
c := make(Channel, 1)
self.add(typ, c)
return c
}
func (self *EventMachine) Post(typ string, data interface{}) {
self.mu.RLock()
if self.channels[typ] != nil {
ev := Event{typ, data}
for _, receiver := range self.channels[typ] {
// Blocking is OK. These are internals and need to be handled
receiver.Send(ev)
}
}
self.mu.RUnlock()
}

@ -1,113 +0,0 @@
package eventer
import (
"math/rand"
"testing"
"time"
)
func TestChannel(t *testing.T) {
eventer := New()
c := make(Channel, 1)
eventer.RegisterChannel("test", c)
eventer.Post("test", "hello world")
res := <-c
if res.Data.(string) != "hello world" {
t.Error("Expected event with data 'hello world'. Got", res.Data)
}
}
func TestFunction(t *testing.T) {
eventer := New()
var data string
eventer.RegisterFunc("test", func(ev Event) {
data = ev.Data.(string)
})
eventer.Post("test", "hello world")
if data != "hello world" {
t.Error("Expected event with data 'hello world'. Got", data)
}
}
func TestRegister(t *testing.T) {
eventer := New()
c := eventer.Register("test")
eventer.Post("test", "hello world")
res := <-c
if res.Data.(string) != "hello world" {
t.Error("Expected event with data 'hello world'. Got", res.Data)
}
}
func TestOn(t *testing.T) {
eventer := New()
c := make(Channel, 1)
eventer.On("test", c)
var data string
eventer.On("test", func(ev Event) {
data = ev.Data.(string)
})
eventer.Post("test", "hello world")
res := <-c
if res.Data.(string) != "hello world" {
t.Error("Expected channel event with data 'hello world'. Got", res.Data)
}
if data != "hello world" {
t.Error("Expected function event with data 'hello world'. Got", data)
}
}
func TestConcurrentUsage(t *testing.T) {
rand.Seed(time.Now().Unix())
eventer := New()
stop := make(chan struct{})
recv := make(chan int)
poster := func() {
for {
select {
case <-stop:
return
default:
eventer.Post("test", "hi")
}
}
}
listener := func(i int) {
time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond)
c := eventer.Register("test")
// wait for the first event
<-c
recv <- i
// keep receiving to prevent deadlock
for {
select {
case <-stop:
return
case <-c:
}
}
}
nlisteners := 200
go poster()
for i := 0; i < nlisteners; i++ {
go listener(i)
}
// wait until everyone has been served
for i := 0; i < nlisteners; i++ {
<-recv
}
close(stop)
}

@ -0,0 +1,11 @@
package eth
import "container/list"
type PeerListEvent struct {
Peers *list.List
}
type ChainSyncEvent struct {
InSync bool
}

@ -802,7 +802,7 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
p.versionKnown = true
p.ethereum.PushPeer(p)
p.ethereum.reactor.Post("peerList", p.ethereum.Peers())
p.ethereum.eventMux.Post(PeerListEvent{p.ethereum.Peers()})
p.protocolCaps = caps
capsIt := caps.NewIterator()

Loading…
Cancel
Save