From bf1e2631281e1e439533f2abcf1e99a7b2f9552a Mon Sep 17 00:00:00 2001 From: Miya Chen Date: Fri, 18 Aug 2017 18:58:36 +0800 Subject: [PATCH] core, light: send chain events using event.Feed (#14865) --- accounts/abi/bind/backends/simulated.go | 3 +- cmd/utils/flags.go | 3 +- core/bench_test.go | 6 +- core/block_validator_test.go | 9 +- core/blockchain.go | 92 +++++++++--- core/blockchain_test.go | 53 +++---- core/chain_makers.go | 3 +- core/chain_makers_test.go | 5 +- core/dao_test.go | 17 +-- core/genesis_test.go | 4 +- core/tx_pool.go | 101 ++++++++----- core/tx_pool_test.go | 181 +++++++++++++++--------- eth/api_backend.go | 28 ++++ eth/backend.go | 4 +- eth/filters/filter.go | 4 + eth/filters/filter_system.go | 98 +++++++++---- eth/filters/filter_system_test.go | 117 +++++++++++---- eth/filters/filter_test.go | 34 +++-- eth/handler.go | 23 ++- eth/handler_test.go | 2 +- eth/helper_test.go | 11 +- eth/protocol.go | 6 + ethstats/ethstats.go | 63 ++++++--- internal/ethapi/backend.go | 4 + les/api_backend.go | 24 ++++ les/backend.go | 4 +- les/handler.go | 1 + les/helper_test.go | 4 +- les/server.go | 9 +- light/lightchain.go | 57 ++++++-- light/lightchain_test.go | 7 +- light/odr_test.go | 6 +- light/trie_test.go | 3 +- light/txpool.go | 95 ++++++++----- light/txpool_test.go | 8 +- miner/worker.go | 68 +++++++-- tests/block_test_util.go | 3 +- 37 files changed, 787 insertions(+), 373 deletions(-) diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 14bf7bd752..e0ee06a0a1 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -33,7 +33,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -61,7 +60,7 @@ func NewSimulatedBackend(alloc core.GenesisAlloc) *SimulatedBackend { database, _ := ethdb.NewMemDatabase() genesis := core.Genesis{Config: params.AllProtocolChanges, Alloc: alloc} genesis.MustCommit(database) - blockchain, _ := core.NewBlockChain(database, genesis.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + blockchain, _ := core.NewBlockChain(database, genesis.Config, ethash.NewFaker(), vm.Config{}) backend := &SimulatedBackend{database: database, blockchain: blockchain, config: genesis.Config} backend.rollback() return backend diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 04728b5c62..b257084719 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -41,7 +41,6 @@ import ( "github.com/ethereum/go-ethereum/eth/gasprice" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethstats" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/les" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" @@ -1103,7 +1102,7 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chai Fatalf("%v", err) } vmcfg := vm.Config{EnablePreimageRecording: ctx.GlobalBool(VMEnableDebugFlag.Name)} - chain, err = core.NewBlockChain(chainDb, config, engine, new(event.TypeMux), vmcfg) + chain, err = core.NewBlockChain(chainDb, config, engine, vmcfg) if err != nil { Fatalf("Can't create BlockChain: %v", err) } diff --git a/core/bench_test.go b/core/bench_test.go index b9250f7d39..ab25c27d39 100644 --- a/core/bench_test.go +++ b/core/bench_test.go @@ -30,7 +30,6 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -175,8 +174,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) { // Time the insertion of the new chain. // State and blocks are stored in the same DB. - evmux := new(event.TypeMux) - chainman, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), evmux, vm.Config{}) + chainman, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) defer chainman.Stop() b.ReportAllocs() b.ResetTimer() @@ -286,7 +284,7 @@ func benchReadChain(b *testing.B, full bool, count uint64) { if err != nil { b.Fatalf("error opening database at %v: %v", dir, err) } - chain, err := NewBlockChain(db, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + chain, err := NewBlockChain(db, params.TestChainConfig, ethash.NewFaker(), vm.Config{}) if err != nil { b.Fatalf("error creating chain: %v", err) } diff --git a/core/block_validator_test.go b/core/block_validator_test.go index c0afc2955f..6d54c2b932 100644 --- a/core/block_validator_test.go +++ b/core/block_validator_test.go @@ -25,7 +25,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -43,7 +42,7 @@ func TestHeaderVerification(t *testing.T) { headers[i] = block.Header() } // Run the header checker for blocks one-by-one, checking for both valid and invalid nonces - chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), vm.Config{}) defer chain.Stop() for i := 0; i < len(blocks); i++ { @@ -107,11 +106,11 @@ func testHeaderConcurrentVerification(t *testing.T, threads int) { var results <-chan error if valid { - chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), vm.Config{}) _, results = chain.engine.VerifyHeaders(chain, headers, seals) chain.Stop() } else { - chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeFailer(uint64(len(headers)-1)), new(event.TypeMux), vm.Config{}) + chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeFailer(uint64(len(headers)-1)), vm.Config{}) _, results = chain.engine.VerifyHeaders(chain, headers, seals) chain.Stop() } @@ -174,7 +173,7 @@ func testHeaderConcurrentAbortion(t *testing.T, threads int) { defer runtime.GOMAXPROCS(old) // Start the verifications and immediately abort - chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeDelayer(time.Millisecond), new(event.TypeMux), vm.Config{}) + chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeDelayer(time.Millisecond), vm.Config{}) defer chain.Stop() abort, results := chain.engine.VerifyHeaders(chain, headers, seals) diff --git a/core/blockchain.go b/core/blockchain.go index 3fb8be15f4..f3ca4e08cf 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -79,10 +79,16 @@ const ( type BlockChain struct { config *params.ChainConfig // chain & network configuration - hc *HeaderChain - chainDb ethdb.Database - eventMux *event.TypeMux - genesisBlock *types.Block + hc *HeaderChain + chainDb ethdb.Database + rmTxFeed event.Feed + rmLogsFeed event.Feed + chainFeed event.Feed + chainSideFeed event.Feed + chainHeadFeed event.Feed + logsFeed event.Feed + scope event.SubscriptionScope + genesisBlock *types.Block mu sync.RWMutex // global mutex for locking chain operations chainmu sync.RWMutex // blockchain insertion lock @@ -115,7 +121,7 @@ type BlockChain struct { // NewBlockChain returns a fully initialised block chain using information // available in the database. It initialises the default Ethereum Validator and // Processor. -func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, mux *event.TypeMux, vmConfig vm.Config) (*BlockChain, error) { +func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) { bodyCache, _ := lru.New(bodyCacheLimit) bodyRLPCache, _ := lru.New(bodyCacheLimit) blockCache, _ := lru.New(blockCacheLimit) @@ -126,7 +132,6 @@ func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine co config: config, chainDb: chainDb, stateCache: state.NewDatabase(chainDb), - eventMux: mux, quit: make(chan struct{}), bodyCache: bodyCache, bodyRLPCache: bodyRLPCache, @@ -594,6 +599,8 @@ func (bc *BlockChain) Stop() { if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) { return } + // Unsubscribe all subscriptions registered from blockchain + bc.scope.Close() close(bc.quit) atomic.StoreInt32(&bc.procInterrupt, 1) @@ -1000,6 +1007,12 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { blockInsertTimer.UpdateSince(bstart) events = append(events, ChainEvent{block, block.Hash(), logs}) + // We need some control over the mining operation. Acquiring locks and waiting + // for the miner to create new block takes too long and in most cases isn't + // even necessary. + if bc.LastBlockHash() == block.Hash() { + events = append(events, ChainHeadEvent{block}) + } // Write the positional metadata for transaction and receipt lookups if err := WriteTxLookupEntries(bc.chainDb, block); err != nil { @@ -1024,7 +1037,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { stats.usedGas += usedGas.Uint64() stats.report(chain, i) } - go bc.postChainEvents(events, coalescedLogs) + go bc.PostChainEvents(events, coalescedLogs) return 0, nil } @@ -1184,16 +1197,16 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // Must be posted in a goroutine because of the transaction pool trying // to acquire the chain manager lock if len(diff) > 0 { - go bc.eventMux.Post(RemovedTransactionEvent{diff}) + go bc.rmTxFeed.Send(RemovedTransactionEvent{diff}) } if len(deletedLogs) > 0 { - go bc.eventMux.Post(RemovedLogsEvent{deletedLogs}) + go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) } if len(oldChain) > 0 { go func() { for _, block := range oldChain { - bc.eventMux.Post(ChainSideEvent{Block: block}) + bc.chainSideFeed.Send(ChainSideEvent{Block: block}) } }() } @@ -1201,22 +1214,25 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { return nil } -// postChainEvents iterates over the events generated by a chain insertion and -// posts them into the event mux. -func (bc *BlockChain) postChainEvents(events []interface{}, logs []*types.Log) { +// PostChainEvents iterates over the events generated by a chain insertion and +// posts them into the event feed. +// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock. +func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) { // post event logs for further processing - bc.eventMux.Post(logs) + if logs != nil { + bc.logsFeed.Send(logs) + } for _, event := range events { - if event, ok := event.(ChainEvent); ok { - // We need some control over the mining operation. Acquiring locks and waiting - // for the miner to create new block takes too long and in most cases isn't - // even necessary. - if bc.LastBlockHash() == event.Hash { - bc.eventMux.Post(ChainHeadEvent{event.Block}) - } + switch ev := event.(type) { + case ChainEvent: + bc.chainFeed.Send(ev) + + case ChainHeadEvent: + bc.chainHeadFeed.Send(ev) + + case ChainSideEvent: + bc.chainSideFeed.Send(ev) } - // Fire the insertion events individually too - bc.eventMux.Post(event) } } @@ -1384,3 +1400,33 @@ func (bc *BlockChain) Config() *params.ChainConfig { return bc.config } // Engine retrieves the blockchain's consensus engine. func (bc *BlockChain) Engine() consensus.Engine { return bc.engine } + +// SubscribeRemovedTxEvent registers a subscription of RemovedTransactionEvent. +func (bc *BlockChain) SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription { + return bc.scope.Track(bc.rmTxFeed.Subscribe(ch)) +} + +// SubscribeRemovedLogsEvent registers a subscription of RemovedLogsEvent. +func (bc *BlockChain) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) event.Subscription { + return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch)) +} + +// SubscribeChainEvent registers a subscription of ChainEvent. +func (bc *BlockChain) SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription { + return bc.scope.Track(bc.chainFeed.Subscribe(ch)) +} + +// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent. +func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription { + return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch)) +} + +// SubscribeChainSideEvent registers a subscription of ChainSideEvent. +func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription { + return bc.scope.Track(bc.chainSideFeed.Subscribe(ch)) +} + +// SubscribeLogsEvent registers a subscription of []*types.Log. +func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return bc.scope.Track(bc.logsFeed.Subscribe(ch)) +} diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 4a0f449408..470974a0a7 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -31,7 +31,6 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -47,7 +46,7 @@ func newTestBlockChain(fake bool) *BlockChain { if !fake { engine = ethash.NewTester() } - blockchain, err := NewBlockChain(db, gspec.Config, engine, new(event.TypeMux), vm.Config{}) + blockchain, err := NewBlockChain(db, gspec.Config, engine, vm.Config{}) if err != nil { panic(err) } @@ -497,7 +496,7 @@ func testReorgBadHashes(t *testing.T, full bool) { } // Create a new BlockChain and check that it rolled back the state. - ncm, err := NewBlockChain(bc.chainDb, bc.config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + ncm, err := NewBlockChain(bc.chainDb, bc.config, ethash.NewFaker(), vm.Config{}) if err != nil { t.Fatalf("failed to create new chain manager: %v", err) } @@ -610,7 +609,7 @@ func TestFastVsFullChains(t *testing.T) { // Import the chain as an archive node for the comparison baseline archiveDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(archiveDb) - archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), vm.Config{}) defer archive.Stop() if n, err := archive.InsertChain(blocks); err != nil { @@ -619,7 +618,7 @@ func TestFastVsFullChains(t *testing.T) { // Fast import the chain as a non-archive node to test fastDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(fastDb) - fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), vm.Config{}) defer fast.Stop() headers := make([]*types.Header, len(blocks)) @@ -697,7 +696,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { archiveDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(archiveDb) - archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), vm.Config{}) if n, err := archive.InsertChain(blocks); err != nil { t.Fatalf("failed to process block %d: %v", n, err) } @@ -710,7 +709,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { // Import the chain as a non-archive node and ensure all pointers are updated fastDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(fastDb) - fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), vm.Config{}) defer fast.Stop() headers := make([]*types.Header, len(blocks)) @@ -731,7 +730,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { lightDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(lightDb) - light, _ := NewBlockChain(lightDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + light, _ := NewBlockChain(lightDb, gspec.Config, ethash.NewFaker(), vm.Config{}) if n, err := light.InsertHeaderChain(headers, 1); err != nil { t.Fatalf("failed to insert header %d: %v", n, err) } @@ -800,8 +799,7 @@ func TestChainTxReorgs(t *testing.T) { } }) // Import the chain. This runs all block validation rules. - evmux := &event.TypeMux{} - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), evmux, vm.Config{}) + blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) if i, err := blockchain.InsertChain(chain); err != nil { t.Fatalf("failed to insert original chain[%d]: %v", i, err) } @@ -872,11 +870,11 @@ func TestLogReorgs(t *testing.T) { signer = types.NewEIP155Signer(gspec.Config.ChainId) ) - var evmux event.TypeMux - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), &evmux, vm.Config{}) + blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() - subs := evmux.Subscribe(RemovedLogsEvent{}) + rmLogsCh := make(chan RemovedLogsEvent) + blockchain.SubscribeRemovedLogsEvent(rmLogsCh) chain, _ := GenerateChain(params.TestChainConfig, genesis, db, 2, func(i int, gen *BlockGen) { if i == 1 { tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), big.NewInt(1000000), new(big.Int), code), signer, key1) @@ -895,9 +893,14 @@ func TestLogReorgs(t *testing.T) { t.Fatalf("failed to insert forked chain: %v", err) } - ev := <-subs.Chan() - if len(ev.Data.(RemovedLogsEvent).Logs) == 0 { - t.Error("expected logs") + timeout := time.NewTimer(1 * time.Second) + select { + case ev := <-rmLogsCh: + if len(ev.Logs) == 0 { + t.Error("expected logs") + } + case <-timeout.C: + t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.") } } @@ -914,8 +917,7 @@ func TestReorgSideEvent(t *testing.T) { signer = types.NewEIP155Signer(gspec.Config.ChainId) ) - evmux := &event.TypeMux{} - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), evmux, vm.Config{}) + blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() chain, _ := GenerateChain(gspec.Config, genesis, db, 3, func(i int, gen *BlockGen) {}) @@ -933,7 +935,8 @@ func TestReorgSideEvent(t *testing.T) { } gen.AddTx(tx) }) - subs := evmux.Subscribe(ChainSideEvent{}) + chainSideCh := make(chan ChainSideEvent) + blockchain.SubscribeChainSideEvent(chainSideCh) if _, err := blockchain.InsertChain(replacementBlocks); err != nil { t.Fatalf("failed to insert chain: %v", err) } @@ -956,8 +959,8 @@ func TestReorgSideEvent(t *testing.T) { done: for { select { - case ev := <-subs.Chan(): - block := ev.Data.(ChainSideEvent).Block + case ev := <-chainSideCh: + block := ev.Block if _, ok := expectedSideHashes[block.Hash()]; !ok { t.Errorf("%d: didn't expect %x to be in side chain", i, block.Hash()) } @@ -977,7 +980,7 @@ done: // make sure no more events are fired select { - case e := <-subs.Chan(): + case e := <-chainSideCh: t.Errorf("unexpected event fired: %v", e) case <-time.After(250 * time.Millisecond): } @@ -1038,10 +1041,9 @@ func TestEIP155Transition(t *testing.T) { Alloc: GenesisAlloc{address: {Balance: funds}, deleteAddr: {Balance: new(big.Int)}}, } genesis = gspec.MustCommit(db) - mux event.TypeMux ) - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), &mux, vm.Config{}) + blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() blocks, _ := GenerateChain(gspec.Config, genesis, db, 4, func(i int, block *BlockGen) { @@ -1148,9 +1150,8 @@ func TestEIP161AccountRemoval(t *testing.T) { Alloc: GenesisAlloc{address: {Balance: funds}}, } genesis = gspec.MustCommit(db) - mux event.TypeMux ) - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), &mux, vm.Config{}) + blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() blocks, _ := GenerateChain(gspec.Config, genesis, db, 3, func(i int, block *BlockGen) { diff --git a/core/chain_makers.go b/core/chain_makers.go index 976a8114d3..cb5825d187 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -27,7 +27,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -236,7 +235,7 @@ func newCanonical(n int, full bool) (ethdb.Database, *BlockChain, error) { db, _ := ethdb.NewMemDatabase() genesis := gspec.MustCommit(db) - blockchain, _ := NewBlockChain(db, params.AllProtocolChanges, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + blockchain, _ := NewBlockChain(db, params.AllProtocolChanges, ethash.NewFaker(), vm.Config{}) // Create and inject the requested chain if n == 0 { return db, blockchain, nil diff --git a/core/chain_makers_test.go b/core/chain_makers_test.go index 28eb76c634..2260c62fbd 100644 --- a/core/chain_makers_test.go +++ b/core/chain_makers_test.go @@ -25,7 +25,6 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -80,9 +79,7 @@ func ExampleGenerateChain() { }) // Import the chain. This runs all block validation rules. - evmux := &event.TypeMux{} - - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), evmux, vm.Config{}) + blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() if i, err := blockchain.InsertChain(chain); err != nil { diff --git a/core/dao_test.go b/core/dao_test.go index 99bf1ecaef..d6e11d78ae 100644 --- a/core/dao_test.go +++ b/core/dao_test.go @@ -23,7 +23,6 @@ import ( "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -42,13 +41,13 @@ func TestDAOForkRangeExtradata(t *testing.T) { proDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(proDb) proConf := ¶ms.ChainConfig{HomesteadBlock: big.NewInt(0), DAOForkBlock: forkBlock, DAOForkSupport: true} - proBc, _ := NewBlockChain(proDb, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + proBc, _ := NewBlockChain(proDb, proConf, ethash.NewFaker(), vm.Config{}) defer proBc.Stop() conDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(conDb) conConf := ¶ms.ChainConfig{HomesteadBlock: big.NewInt(0), DAOForkBlock: forkBlock, DAOForkSupport: false} - conBc, _ := NewBlockChain(conDb, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + conBc, _ := NewBlockChain(conDb, conConf, ethash.NewFaker(), vm.Config{}) defer conBc.Stop() if _, err := proBc.InsertChain(prefix); err != nil { @@ -62,8 +61,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Create a pro-fork block, and try to feed into the no-fork chain db, _ = ethdb.NewMemDatabase() gspec.MustCommit(db) - - bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), vm.Config{}) defer bc.Stop() blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64())) @@ -85,8 +83,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Create a no-fork block, and try to feed into the pro-fork chain db, _ = ethdb.NewMemDatabase() gspec.MustCommit(db) - - bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), vm.Config{}) defer bc.Stop() blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64())) @@ -109,8 +106,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Verify that contra-forkers accept pro-fork extra-datas after forking finishes db, _ = ethdb.NewMemDatabase() gspec.MustCommit(db) - - bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), vm.Config{}) defer bc.Stop() blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64())) @@ -127,8 +123,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Verify that pro-forkers accept contra-fork extra-datas after forking finishes db, _ = ethdb.NewMemDatabase() gspec.MustCommit(db) - - bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), vm.Config{}) defer bc.Stop() blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64())) diff --git a/core/genesis_test.go b/core/genesis_test.go index 8b193759fc..482f861904 100644 --- a/core/genesis_test.go +++ b/core/genesis_test.go @@ -26,7 +26,6 @@ import ( "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -119,9 +118,8 @@ func TestSetupGenesis(t *testing.T) { // Commit the 'old' genesis block with Homestead transition at #2. // Advance to block #4, past the homestead transition block of customg. genesis := oldcustomg.MustCommit(db) - bc, _ := NewBlockChain(db, oldcustomg.Config, ethash.NewFullFaker(), new(event.TypeMux), vm.Config{}) + bc, _ := NewBlockChain(db, oldcustomg.Config, ethash.NewFullFaker(), vm.Config{}) defer bc.Stop() - bc.SetValidator(bproc{}) bc.InsertChain(makeBlockChainWithDiff(genesis, []int{2, 3, 4, 5}, 0)) bc.CurrentBlock() diff --git a/core/tx_pool.go b/core/tx_pool.go index b0c251f921..d835c94d1a 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -34,6 +34,13 @@ import ( "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) +const ( + // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + chainHeadChanSize = 10 + // rmTxChanSize is the size of channel listening to RemovedTransactionEvent. + rmTxChanSize = 10 +) + var ( // ErrInvalidSender is returned if the transaction contains an invalid signature. ErrInvalidSender = errors.New("invalid sender") @@ -95,7 +102,14 @@ var ( underpricedTxCounter = metrics.NewCounter("txpool/underpriced") ) -type stateFn func() (*state.StateDB, error) +// blockChain provides the state of blockchain and current gas limit to do +// some pre checks in tx pool and event subscribers. +type blockChain interface { + State() (*state.StateDB, error) + GasLimit() *big.Int + SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription + SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription +} // TxPoolConfig are the configuration parameters of the transaction pool. type TxPoolConfig struct { @@ -160,12 +174,15 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig { type TxPool struct { config TxPoolConfig chainconfig *params.ChainConfig - currentState stateFn // The state function which will allow us to do some pre checks + blockChain blockChain pendingState *state.ManagedState - gasLimit func() *big.Int // The current gas limit function callback gasPrice *big.Int - eventMux *event.TypeMux - events *event.TypeMuxSubscription + txFeed event.Feed + scope event.SubscriptionScope + chainHeadCh chan ChainHeadEvent + chainHeadSub event.Subscription + rmTxCh chan RemovedTransactionEvent + rmTxSub event.Subscription signer types.Signer mu sync.RWMutex @@ -185,7 +202,7 @@ type TxPool struct { // NewTxPool creates a new transaction pool to gather, sort and filter inbound // trnsactions from the network. -func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { +func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, blockChain blockChain) *TxPool { // Sanitize the input to ensure no vulnerable gas prices are set config = (&config).sanitize() @@ -193,17 +210,16 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e pool := &TxPool{ config: config, chainconfig: chainconfig, + blockChain: blockChain, signer: types.NewEIP155Signer(chainconfig.ChainId), pending: make(map[common.Address]*txList), queue: make(map[common.Address]*txList), beats: make(map[common.Address]time.Time), all: make(map[common.Hash]*types.Transaction), - eventMux: eventMux, - currentState: currentStateFn, - gasLimit: gasLimitFn, + chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), + rmTxCh: make(chan RemovedTransactionEvent, rmTxChanSize), gasPrice: new(big.Int).SetUint64(config.PriceLimit), pendingState: nil, - events: eventMux.Subscribe(ChainHeadEvent{}, RemovedTransactionEvent{}), } pool.locals = newAccountSet(pool.signer) pool.priced = newTxPricedList(&pool.all) @@ -220,6 +236,9 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e log.Warn("Failed to rotate transaction journal", "err", err) } } + // Subscribe events from blockchain + pool.chainHeadSub = pool.blockChain.SubscribeChainHeadEvent(pool.chainHeadCh) + pool.rmTxSub = pool.blockChain.SubscribeRemovedTxEvent(pool.rmTxCh) // Start the event loop and return pool.wg.Add(1) go pool.loop() @@ -248,25 +267,27 @@ func (pool *TxPool) loop() { // Keep waiting for and reacting to the various events for { select { - // Handle any events fired by the system - case ev, ok := <-pool.events.Chan(): - if !ok { - return - } - switch ev := ev.Data.(type) { - case ChainHeadEvent: - pool.mu.Lock() - if ev.Block != nil { - if pool.chainconfig.IsHomestead(ev.Block.Number()) { - pool.homestead = true - } + // Handle ChainHeadEvent + case ev := <-pool.chainHeadCh: + pool.mu.Lock() + if ev.Block != nil { + if pool.chainconfig.IsHomestead(ev.Block.Number()) { + pool.homestead = true } - pool.reset() - pool.mu.Unlock() - case RemovedTransactionEvent: - pool.addTxs(ev.Txs, false) } + pool.reset() + pool.mu.Unlock() + // Be unsubscribed due to system stopped + case <-pool.chainHeadSub.Err(): + return + + // Handle RemovedTransactionEvent + case ev := <-pool.rmTxCh: + pool.addTxs(ev.Txs, false) + // Be unsubscribed due to system stopped + case <-pool.rmTxSub.Err(): + return // Handle stats reporting ticks case <-report.C: @@ -322,7 +343,7 @@ func (pool *TxPool) lockedReset() { // reset retrieves the current state of the blockchain and ensures the content // of the transaction pool is valid with regard to the chain state. func (pool *TxPool) reset() { - currentState, err := pool.currentState() + currentState, err := pool.blockChain.State() if err != nil { log.Error("Failed reset txpool state", "err", err) return @@ -347,7 +368,11 @@ func (pool *TxPool) reset() { // Stop terminates the transaction pool. func (pool *TxPool) Stop() { - pool.events.Unsubscribe() + // Unsubscribe all subscriptions registered from txpool + pool.scope.Close() + // Unsubscribe subscriptions registered from blockchain + pool.chainHeadSub.Unsubscribe() + pool.rmTxSub.Unsubscribe() pool.wg.Wait() if pool.journal != nil { @@ -356,6 +381,12 @@ func (pool *TxPool) Stop() { log.Info("Transaction pool stopped") } +// SubscribeTxPreEvent registers a subscription of TxPreEvent and +// starts sending event to the given channel. +func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxPreEvent) event.Subscription { + return pool.scope.Track(pool.txFeed.Subscribe(ch)) +} + // GasPrice returns the current gas price enforced by the transaction pool. func (pool *TxPool) GasPrice() *big.Int { pool.mu.RLock() @@ -468,7 +499,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { return ErrNegativeValue } // Ensure the transaction doesn't exceed the current block limit gas. - if pool.gasLimit().Cmp(tx.Gas()) < 0 { + if pool.blockChain.GasLimit().Cmp(tx.Gas()) < 0 { return ErrGasLimit } // Make sure the transaction is signed properly @@ -482,7 +513,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { return ErrUnderpriced } // Ensure the transaction adheres to nonce ordering - currentState, err := pool.currentState() + currentState, err := pool.blockChain.State() if err != nil { return err } @@ -647,7 +678,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T // Set the potentially new pending nonce and notify any subsystems of the new tx pool.beats[addr] = time.Now() pool.pendingState.SetNonce(addr, tx.Nonce()+1) - go pool.eventMux.Post(TxPreEvent{tx}) + go pool.txFeed.Send(TxPreEvent{tx}) } // AddLocal enqueues a single transaction into the pool if it is valid, marking @@ -690,7 +721,7 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { } // If we added a new transaction, run promotion checks and return if !replace { - state, err := pool.currentState() + state, err := pool.blockChain.State() if err != nil { return err } @@ -717,7 +748,7 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error { } // Only reprocess the internal state if something was actually added if len(dirty) > 0 { - state, err := pool.currentState() + state, err := pool.blockChain.State() if err != nil { return err } @@ -804,7 +835,7 @@ func (pool *TxPool) removeTx(hash common.Hash) { // future queue to the set of pending transactions. During this process, all // invalidated transactions (low nonce, low balance) are deleted. func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.Address) { - gaslimit := pool.gasLimit() + gaslimit := pool.blockChain.GasLimit() // Gather all the accounts potentially needing updates if accounts == nil { @@ -973,7 +1004,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A // executable/pending queue and any subsequent transactions that become unexecutable // are moved back into the future queue. func (pool *TxPool) demoteUnexecutables(state *state.StateDB) { - gaslimit := pool.gasLimit() + gaslimit := pool.blockChain.GasLimit() // Iterate over all accounts and demote any non-executable transactions for addr, list := range pool.pending { diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index fcb330051c..ee1ddd4bbc 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -44,6 +44,29 @@ func init() { testTxPoolConfig.Journal = "" } +type testBlockChain struct { + statedb *state.StateDB + gasLimit *big.Int + chainHeadFeed *event.Feed + rmTxFeed *event.Feed +} + +func (bc *testBlockChain) State() (*state.StateDB, error) { + return bc.statedb, nil +} + +func (bc *testBlockChain) GasLimit() *big.Int { + return new(big.Int).Set(bc.gasLimit) +} + +func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription { + return bc.chainHeadFeed.Subscribe(ch) +} + +func (bc *testBlockChain) SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription { + return bc.rmTxFeed.Subscribe(ch) +} + func transaction(nonce uint64, gaslimit *big.Int, key *ecdsa.PrivateKey) *types.Transaction { return pricedTransaction(nonce, gaslimit, big.NewInt(1), key) } @@ -56,9 +79,10 @@ func pricedTransaction(nonce uint64, gaslimit, gasprice *big.Int, key *ecdsa.Pri func setupTxPool() (*TxPool, *ecdsa.PrivateKey) { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} key, _ := crypto.GenerateKey() - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) return pool, key } @@ -96,6 +120,31 @@ func deriveSender(tx *types.Transaction) (common.Address, error) { return types.Sender(types.HomesteadSigner{}, tx) } +type testChain struct { + *testBlockChain + address common.Address + trigger *bool +} + +// testChain.State() is used multiple times to reset the pending state. +// when simulate is true it will create a state that indicates +// that tx0 and tx1 are included in the chain. +func (c *testChain) State() (*state.StateDB, error) { + // delay "state change" by one. The tx pool fetches the + // state multiple times and by delaying it a bit we simulate + // a state change between those fetches. + stdb := c.statedb + if *c.trigger { + db, _ := ethdb.NewMemDatabase() + c.statedb, _ = state.New(common.Hash{}, state.NewDatabase(db)) + // simulate that the new head block included tx0 and tx1 + c.statedb.SetNonce(c.address, 2) + c.statedb.SetBalance(c.address, new(big.Int).SetUint64(params.Ether)) + *c.trigger = false + } + return stdb, nil +} + // This test simulates a scenario where a new block is imported during a // state reset and tests whether the pending state is in sync with the // block head event that initiated the resetState(). @@ -104,38 +153,18 @@ func TestStateChangeDuringPoolReset(t *testing.T) { db, _ = ethdb.NewMemDatabase() key, _ = crypto.GenerateKey() address = crypto.PubkeyToAddress(key.PublicKey) - mux = new(event.TypeMux) statedb, _ = state.New(common.Hash{}, state.NewDatabase(db)) trigger = false ) // setup pool with 2 transaction in it statedb.SetBalance(address, new(big.Int).SetUint64(params.Ether)) + blockchain := &testChain{&testBlockChain{statedb, big.NewInt(1000000000), new(event.Feed), new(event.Feed)}, address, &trigger} tx0 := transaction(0, big.NewInt(100000), key) tx1 := transaction(1, big.NewInt(100000), key) - // stateFunc is used multiple times to reset the pending state. - // when simulate is true it will create a state that indicates - // that tx0 and tx1 are included in the chain. - stateFunc := func() (*state.StateDB, error) { - // delay "state change" by one. The tx pool fetches the - // state multiple times and by delaying it a bit we simulate - // a state change between those fetches. - stdb := statedb - if trigger { - statedb, _ = state.New(common.Hash{}, state.NewDatabase(db)) - // simulate that the new head block included tx0 and tx1 - statedb.SetNonce(address, 2) - statedb.SetBalance(address, new(big.Int).SetUint64(params.Ether)) - trigger = false - } - return stdb, nil - } - - gasLimitFunc := func() *big.Int { return big.NewInt(1000000000) } - - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, mux, stateFunc, gasLimitFunc) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() nonce := pool.State().GetNonce(address) @@ -176,7 +205,7 @@ func TestInvalidTransactions(t *testing.T) { tx := transaction(0, big.NewInt(100), key) from, _ := deriveSender(tx) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.AddBalance(from, big.NewInt(1)) if err := pool.AddRemote(tx); err != ErrInsufficientFunds { t.Error("expected", ErrInsufficientFunds) @@ -211,7 +240,7 @@ func TestTransactionQueue(t *testing.T) { tx := transaction(0, big.NewInt(100), key) from, _ := deriveSender(tx) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.AddBalance(from, big.NewInt(1000)) pool.lockedReset() pool.enqueueTx(tx.Hash(), tx) @@ -241,7 +270,7 @@ func TestTransactionQueue(t *testing.T) { tx2 := transaction(10, big.NewInt(100), key) tx3 := transaction(11, big.NewInt(100), key) from, _ = deriveSender(tx1) - currentState, _ = pool.currentState() + currentState, _ = pool.blockChain.State() currentState.AddBalance(from, big.NewInt(1000)) pool.lockedReset() @@ -264,7 +293,7 @@ func TestRemoveTx(t *testing.T) { defer pool.Stop() addr := crypto.PubkeyToAddress(key.PublicKey) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.AddBalance(addr, big.NewInt(1)) tx1 := transaction(0, big.NewInt(100), key) @@ -296,7 +325,7 @@ func TestNegativeValue(t *testing.T) { tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(-1), big.NewInt(100), big.NewInt(1), nil), types.HomesteadSigner{}, key) from, _ := deriveSender(tx) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.AddBalance(from, big.NewInt(1)) if err := pool.AddRemote(tx); err != ErrNegativeValue { t.Error("expected", ErrNegativeValue, "got", err) @@ -311,8 +340,8 @@ func TestTransactionChainFork(t *testing.T) { resetState := func() { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - pool.currentState = func() (*state.StateDB, error) { return statedb, nil } - currentState, _ := pool.currentState() + pool.blockChain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + currentState, _ := pool.blockChain.State() currentState.AddBalance(addr, big.NewInt(100000000000000)) pool.lockedReset() } @@ -339,8 +368,8 @@ func TestTransactionDoubleNonce(t *testing.T) { resetState := func() { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - pool.currentState = func() (*state.StateDB, error) { return statedb, nil } - currentState, _ := pool.currentState() + pool.blockChain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + currentState, _ := pool.blockChain.State() currentState.AddBalance(addr, big.NewInt(100000000000000)) pool.lockedReset() } @@ -358,7 +387,7 @@ func TestTransactionDoubleNonce(t *testing.T) { if replace, err := pool.add(tx2, false); err != nil || !replace { t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace) } - state, _ := pool.currentState() + state, _ := pool.blockChain.State() pool.promoteExecutables(state, []common.Address{addr}) if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) @@ -386,7 +415,7 @@ func TestMissingNonce(t *testing.T) { defer pool.Stop() addr := crypto.PubkeyToAddress(key.PublicKey) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.AddBalance(addr, big.NewInt(100000000000000)) tx := transaction(1, big.NewInt(100000), key) if _, err := pool.add(tx, false); err != nil { @@ -409,7 +438,7 @@ func TestNonceRecovery(t *testing.T) { defer pool.Stop() addr := crypto.PubkeyToAddress(key.PublicKey) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.SetNonce(addr, n) currentState.AddBalance(addr, big.NewInt(100000000000000)) pool.lockedReset() @@ -431,11 +460,14 @@ func TestRemovedTxEvent(t *testing.T) { tx := transaction(0, big.NewInt(1000000), key) from, _ := deriveSender(tx) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.AddBalance(from, big.NewInt(1000000000000)) pool.lockedReset() - pool.eventMux.Post(RemovedTransactionEvent{types.Transactions{tx}}) - pool.eventMux.Post(ChainHeadEvent{nil}) + blockChain, _ := pool.blockChain.(*testBlockChain) + blockChain.rmTxFeed.Send(RemovedTransactionEvent{types.Transactions{tx}}) + blockChain.chainHeadFeed.Send(ChainHeadEvent{nil}) + // wait for handling events + <-time.After(500 * time.Millisecond) if pool.pending[from].Len() != 1 { t.Error("expected 1 pending tx, got", pool.pending[from].Len()) } @@ -453,7 +485,7 @@ func TestTransactionDropping(t *testing.T) { account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000)) // Add some pending and some queued transactions @@ -518,7 +550,7 @@ func TestTransactionDropping(t *testing.T) { t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4) } // Reduce the block gas limit, check that invalidated transactions are dropped - pool.gasLimit = func() *big.Int { return big.NewInt(100) } + pool.blockChain.(*testBlockChain).gasLimit = big.NewInt(100) pool.lockedReset() if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { @@ -548,7 +580,7 @@ func TestTransactionPostponing(t *testing.T) { account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000)) // Add a batch consecutive pending transactions for validation @@ -624,7 +656,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) { account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000000)) pool.lockedReset() @@ -667,16 +699,17 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) { // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.NoLocals = nolocals config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible) - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them (last one will be the local) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() keys := make([]*ecdsa.PrivateKey, 5) for i := 0; i < len(keys); i++ { @@ -757,19 +790,20 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) { // Create the pool to test the non-expiration enforcement db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.Lifetime = time.Second config.NoLocals = nolocals - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() // Create two test accounts to ensure remotes expire but locals do not local, _ := crypto.GenerateKey() remote, _ := crypto.GenerateKey() - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000)) state.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000)) @@ -821,7 +855,7 @@ func TestTransactionPendingLimiting(t *testing.T) { account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000000)) pool.lockedReset() @@ -853,7 +887,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { defer pool1.Stop() account1, _ := deriveSender(transaction(0, big.NewInt(0), key1)) - state1, _ := pool1.currentState() + state1, _ := pool1.blockChain.State() state1.AddBalance(account1, big.NewInt(1000000)) for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { @@ -866,7 +900,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { defer pool2.Stop() account2, _ := deriveSender(transaction(0, big.NewInt(0), key2)) - state2, _ := pool2.currentState() + state2, _ := pool2.blockChain.State() state2.AddBalance(account2, big.NewInt(1000000)) txns := []*types.Transaction{} @@ -900,15 +934,16 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.GlobalSlots = config.AccountSlots * 10 - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.currentState() + state, _ := pool.blockChain.State() keys := make([]*ecdsa.PrivateKey, 5) for i := 0; i < len(keys); i++ { @@ -946,17 +981,18 @@ func TestTransactionCapClearsFromAll(t *testing.T) { // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.AccountSlots = 2 config.AccountQueue = 2 config.GlobalSlots = 8 - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.currentState() + state, _ := pool.blockChain.State() key, _ := crypto.GenerateKey() addr := crypto.PubkeyToAddress(key.PublicKey) @@ -980,15 +1016,16 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) { // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.GlobalSlots = 0 - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.currentState() + state, _ := pool.blockChain.State() keys := make([]*ecdsa.PrivateKey, 5) for i := 0; i < len(keys); i++ { @@ -1028,12 +1065,13 @@ func TestTransactionPoolRepricing(t *testing.T) { // Create the pool to test the pricing enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.currentState() + state, _ := pool.blockChain.State() keys := make([]*ecdsa.PrivateKey, 3) for i := 0; i < len(keys); i++ { @@ -1112,16 +1150,17 @@ func TestTransactionPoolUnderpricing(t *testing.T) { // Create the pool to test the pricing enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.GlobalSlots = 2 config.GlobalQueue = 2 - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.currentState() + state, _ := pool.blockChain.State() keys := make([]*ecdsa.PrivateKey, 3) for i := 0; i < len(keys); i++ { @@ -1199,14 +1238,15 @@ func TestTransactionReplacement(t *testing.T) { // Create the pool to test the pricing enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() // Create a test account to add transactions with key, _ := crypto.GenerateKey() - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(crypto.PubkeyToAddress(key.PublicKey), big.NewInt(1000000000)) // Add pending transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too) @@ -1278,19 +1318,20 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { // Create the original pool to inject transaction into the journal db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.NoLocals = nolocals config.Journal = journal config.Rejournal = time.Second - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) // Create two test accounts to ensure remotes expire but locals do not local, _ := crypto.GenerateKey() remote, _ := crypto.GenerateKey() - statedb, _ = pool.currentState() + statedb, _ = pool.blockChain.State() statedb.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000)) statedb.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000)) @@ -1320,7 +1361,8 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { // Terminate the old pool, bump the local nonce, create a new pool and ensure relevant transaction survive pool.Stop() statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) - pool = NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + pool = NewTxPool(config, params.TestChainConfig, blockchain) pending, queued = pool.Stats() if queued != 0 { @@ -1344,7 +1386,8 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { time.Sleep(2 * config.Rejournal) pool.Stop() statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) - pool = NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + pool = NewTxPool(config, params.TestChainConfig, blockchain) pending, queued = pool.Stats() if pending != 0 { @@ -1377,7 +1420,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000000)) for i := 0; i < size; i++ { @@ -1403,7 +1446,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000000)) for i := 0; i < size; i++ { @@ -1424,7 +1467,7 @@ func BenchmarkPoolInsert(b *testing.B) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000000)) txs := make(types.Transactions, b.N) @@ -1449,7 +1492,7 @@ func benchmarkPoolBatchInsert(b *testing.B, size int) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000000)) batches := make([]types.Transactions, b.N) diff --git a/eth/api_backend.go b/eth/api_backend.go index 7ef7c030dd..abf52326b6 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -115,6 +115,30 @@ func (b *EthApiBackend) GetEVM(ctx context.Context, msg core.Message, state *sta return vm.NewEVM(context, state, b.eth.chainConfig, vmCfg), vmError, nil } +func (b *EthApiBackend) SubscribeRemovedTxEvent(ch chan<- core.RemovedTransactionEvent) event.Subscription { + return b.eth.BlockChain().SubscribeRemovedTxEvent(ch) +} + +func (b *EthApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { + return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch) +} + +func (b *EthApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { + return b.eth.BlockChain().SubscribeChainEvent(ch) +} + +func (b *EthApiBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { + return b.eth.BlockChain().SubscribeChainHeadEvent(ch) +} + +func (b *EthApiBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription { + return b.eth.BlockChain().SubscribeChainSideEvent(ch) +} + +func (b *EthApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return b.eth.BlockChain().SubscribeLogsEvent(ch) +} + func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { return b.eth.txPool.AddLocal(signedTx) } @@ -151,6 +175,10 @@ func (b *EthApiBackend) TxPoolContent() (map[common.Address]types.Transactions, return b.eth.TxPool().Content() } +func (b *EthApiBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { + return b.eth.TxPool().SubscribeTxPreEvent(ch) +} + func (b *EthApiBackend) Downloader() *downloader.Downloader { return b.eth.Downloader() } diff --git a/eth/backend.go b/eth/backend.go index 8a837f7b88..5f4f2097a7 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -137,7 +137,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { } vmConfig := vm.Config{EnablePreimageRecording: config.EnablePreimageRecording} - eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, eth.eventMux, vmConfig) + eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, vmConfig) if err != nil { return nil, err } @@ -151,7 +151,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { if config.TxPool.Journal != "" { config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal) } - eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.EventMux(), eth.blockchain.State, eth.blockchain.GasLimit) + eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain) maxPeers := config.MaxPeers if config.LightServ > 0 { diff --git a/eth/filters/filter.go b/eth/filters/filter.go index f27b76929e..f848bc6af9 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -34,6 +34,10 @@ type Backend interface { EventMux() *event.TypeMux HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) + SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription + SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription + SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription + SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription } // Filter can be used to retrieve and filter logs. diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index ab0b7473ea..00ade0ffb2 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -54,6 +54,19 @@ const ( LastIndexSubscription ) +const ( + + // txChanSize is the size of channel listening to TxPreEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 + // rmLogsChanSize is the size of channel listening to RemovedLogsEvent. + rmLogsChanSize = 10 + // logsChanSize is the size of channel listening to LogsEvent. + logsChanSize = 10 + // chainEvChanSize is the size of channel listening to ChainEvent. + chainEvChanSize = 10 +) + var ( ErrInvalidSubscriptionID = errors.New("invalid id") ) @@ -276,57 +289,50 @@ func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscr type filterIndex map[Type]map[rpc.ID]*subscription // broadcast event to filters that match criteria. -func (es *EventSystem) broadcast(filters filterIndex, ev *event.TypeMuxEvent) { +func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { if ev == nil { return } - switch e := ev.Data.(type) { + switch e := ev.(type) { case []*types.Log: if len(e) > 0 { for _, f := range filters[LogsSubscription] { - if ev.Time.After(f.created) { - if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } + if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } } case core.RemovedLogsEvent: for _, f := range filters[LogsSubscription] { - if ev.Time.After(f.created) { - if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } + if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } - case core.PendingLogsEvent: - for _, f := range filters[PendingLogsSubscription] { - if ev.Time.After(f.created) { - if matchedLogs := filterLogs(e.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs + case *event.TypeMuxEvent: + switch muxe := e.Data.(type) { + case core.PendingLogsEvent: + for _, f := range filters[PendingLogsSubscription] { + if e.Time.After(f.created) { + if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs + } } } } case core.TxPreEvent: for _, f := range filters[PendingTransactionsSubscription] { - if ev.Time.After(f.created) { - f.hashes <- e.Tx.Hash() - } + f.hashes <- e.Tx.Hash() } case core.ChainEvent: for _, f := range filters[BlocksSubscription] { - if ev.Time.After(f.created) { - f.headers <- e.Block.Header() - } + f.headers <- e.Block.Header() } if es.lightMode && len(filters[LogsSubscription]) > 0 { es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) { for _, f := range filters[LogsSubscription] { - if ev.Time.After(f.created) { - if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } + if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } }) @@ -395,9 +401,28 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. func (es *EventSystem) eventLoop() { var ( index = make(filterIndex) - sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, []*types.Log{}, core.TxPreEvent{}, core.ChainEvent{}) + sub = es.mux.Subscribe(core.PendingLogsEvent{}) + // Subscribe TxPreEvent form txpool + txCh = make(chan core.TxPreEvent, txChanSize) + txSub = es.backend.SubscribeTxPreEvent(txCh) + // Subscribe RemovedLogsEvent + rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize) + rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh) + // Subscribe []*types.Log + logsCh = make(chan []*types.Log, logsChanSize) + logsSub = es.backend.SubscribeLogsEvent(logsCh) + // Subscribe ChainEvent + chainEvCh = make(chan core.ChainEvent, chainEvChanSize) + chainEvSub = es.backend.SubscribeChainEvent(chainEvCh) ) + // Unsubscribe all events + defer sub.Unsubscribe() + defer txSub.Unsubscribe() + defer rmLogsSub.Unsubscribe() + defer logsSub.Unsubscribe() + defer chainEvSub.Unsubscribe() + for i := UnknownSubscription; i < LastIndexSubscription; i++ { index[i] = make(map[rpc.ID]*subscription) } @@ -409,6 +434,17 @@ func (es *EventSystem) eventLoop() { return } es.broadcast(index, ev) + + // Handle subscribed events + case ev := <-txCh: + es.broadcast(index, ev) + case ev := <-rmLogsCh: + es.broadcast(index, ev) + case ev := <-logsCh: + es.broadcast(index, ev) + case ev := <-chainEvCh: + es.broadcast(index, ev) + case f := <-es.install: if f.typ == MinedAndPendingLogsSubscription { // the type are logs and pending logs subscriptions @@ -427,6 +463,16 @@ func (es *EventSystem) eventLoop() { delete(index[f.typ], f.id) } close(f.err) + + // System stopped + case <-txSub.Err(): + return + case <-rmLogsSub.Err(): + return + case <-logsSub.Err(): + return + case <-chainEvSub.Err(): + return } } } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 23e6d66e18..fcc888b8cf 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -34,8 +34,12 @@ import ( ) type testBackend struct { - mux *event.TypeMux - db ethdb.Database + mux *event.TypeMux + db ethdb.Database + txFeed *event.Feed + rmLogsFeed *event.Feed + logsFeed *event.Feed + chainFeed *event.Feed } func (b *testBackend) ChainDb() ethdb.Database { @@ -64,6 +68,22 @@ func (b *testBackend) GetReceipts(ctx context.Context, blockHash common.Hash) (t return core.GetBlockReceipts(b.db, blockHash, num), nil } +func (b *testBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { + return b.txFeed.Subscribe(ch) +} + +func (b *testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { + return b.rmLogsFeed.Subscribe(ch) +} + +func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return b.logsFeed.Subscribe(ch) +} + +func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { + return b.chainFeed.Subscribe(ch) +} + // TestBlockSubscription tests if a block subscription returns block hashes for posted chain events. // It creates multiple subscriptions: // - one at the start and should receive all posted chain events and a second (blockHashes) @@ -75,7 +95,11 @@ func TestBlockSubscription(t *testing.T) { var ( mux = new(event.TypeMux) db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} api = NewPublicFilterAPI(backend, false) genesis = new(core.Genesis).MustCommit(db) chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {}) @@ -114,7 +138,7 @@ func TestBlockSubscription(t *testing.T) { time.Sleep(1 * time.Second) for _, e := range chainEvents { - mux.Post(e) + chainFeed.Send(e) } <-sub0.Err() @@ -126,10 +150,14 @@ func TestPendingTxFilter(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), @@ -147,9 +175,10 @@ func TestPendingTxFilter(t *testing.T) { time.Sleep(1 * time.Second) for _, tx := range transactions { ev := core.TxPreEvent{Tx: tx} - mux.Post(ev) + txFeed.Send(ev) } + timeout := time.Now().Add(1 * time.Second) for { results, err := api.GetFilterChanges(fid0) if err != nil { @@ -161,10 +190,18 @@ func TestPendingTxFilter(t *testing.T) { if len(hashes) >= len(transactions) { break } + // check timeout + if time.Now().After(timeout) { + break + } time.Sleep(100 * time.Millisecond) } + if len(hashes) != len(transactions) { + t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes)) + return + } for i := range hashes { if hashes[i] != transactions[i].Hash() { t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i]) @@ -176,10 +213,14 @@ func TestPendingTxFilter(t *testing.T) { // If not it must return an error. func TestLogFilterCreation(t *testing.T) { var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) testCases = []struct { crit FilterCriteria @@ -221,10 +262,14 @@ func TestInvalidLogFilterCreation(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) ) // different situations where log filter creation should fail. @@ -242,15 +287,19 @@ func TestInvalidLogFilterCreation(t *testing.T) { } } -// TestLogFilter tests whether log filters match the correct logs that are posted to the event mux. +// TestLogFilter tests whether log filters match the correct logs that are posted to the event feed. func TestLogFilter(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -311,8 +360,8 @@ func TestLogFilter(t *testing.T) { // raise events time.Sleep(1 * time.Second) - if err := mux.Post(allLogs); err != nil { - t.Fatal(err) + if nsend := logsFeed.Send(allLogs); nsend == 0 { + t.Fatal("Shoud have at least one subscription") } if err := mux.Post(core.PendingLogsEvent{Logs: allLogs}); err != nil { t.Fatal(err) @@ -320,6 +369,7 @@ func TestLogFilter(t *testing.T) { for i, tt := range testCases { var fetched []*types.Log + timeout := time.Now().Add(1 * time.Second) for { // fetch all expected logs results, err := api.GetFilterChanges(tt.id) if err != nil { @@ -330,6 +380,10 @@ func TestLogFilter(t *testing.T) { if len(fetched) >= len(tt.expected) { break } + // check timeout + if time.Now().After(timeout) { + break + } time.Sleep(100 * time.Millisecond) } @@ -350,15 +404,19 @@ func TestLogFilter(t *testing.T) { } } -// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event mux. +// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event feed. func TestPendingLogsSubscription(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -456,6 +514,7 @@ func TestPendingLogsSubscription(t *testing.T) { // raise events time.Sleep(1 * time.Second) + // allLogs are type of core.PendingLogsEvent for _, l := range allLogs { if err := mux.Post(l); err != nil { t.Fatal(err) diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index b6cfd4bbc3..3244c04d74 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -49,14 +49,18 @@ func BenchmarkMipmaps(b *testing.B) { defer os.RemoveAll(dir) var ( - db, _ = ethdb.NewLDBDatabase(dir, 0, 0) - mux = new(event.TypeMux) - backend = &testBackend{mux, db} - key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - addr1 = crypto.PubkeyToAddress(key1.PublicKey) - addr2 = common.BytesToAddress([]byte("jeff")) - addr3 = common.BytesToAddress([]byte("ethereum")) - addr4 = common.BytesToAddress([]byte("random addresses please")) + db, _ = ethdb.NewLDBDatabase(dir, 0, 0) + mux = new(event.TypeMux) + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + addr2 = common.BytesToAddress([]byte("jeff")) + addr3 = common.BytesToAddress([]byte("ethereum")) + addr4 = common.BytesToAddress([]byte("random addresses please")) ) defer db.Close() @@ -119,11 +123,15 @@ func TestFilters(t *testing.T) { defer os.RemoveAll(dir) var ( - db, _ = ethdb.NewLDBDatabase(dir, 0, 0) - mux = new(event.TypeMux) - backend = &testBackend{mux, db} - key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - addr = crypto.PubkeyToAddress(key1.PublicKey) + db, _ = ethdb.NewLDBDatabase(dir, 0, 0) + mux = new(event.TypeMux) + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr = crypto.PubkeyToAddress(key1.PublicKey) hash1 = common.BytesToHash([]byte("topic1")) hash2 = common.BytesToHash([]byte("topic2")) diff --git a/eth/handler.go b/eth/handler.go index e6a9c86d79..9d230a4ad6 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -45,6 +45,10 @@ import ( const ( softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header + + // txChanSize is the size of channel listening to TxPreEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 ) var ( @@ -78,7 +82,8 @@ type ProtocolManager struct { SubProtocols []p2p.Protocol eventMux *event.TypeMux - txSub *event.TypeMuxSubscription + txCh chan core.TxPreEvent + txSub event.Subscription minedBlockSub *event.TypeMuxSubscription // channels for fetcher, syncer, txsyncLoop @@ -200,7 +205,8 @@ func (pm *ProtocolManager) removePeer(id string) { func (pm *ProtocolManager) Start() { // broadcast transactions - pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) + pm.txCh = make(chan core.TxPreEvent, txChanSize) + pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh) go pm.txBroadcastLoop() // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) @@ -724,10 +730,15 @@ func (self *ProtocolManager) minedBroadcastLoop() { } func (self *ProtocolManager) txBroadcastLoop() { - // automatically stops if unsubscribe - for obj := range self.txSub.Chan() { - event := obj.Data.(core.TxPreEvent) - self.BroadcastTx(event.Tx.Hash(), event.Tx) + for { + select { + case event := <-self.txCh: + self.BroadcastTx(event.Tx.Hash(), event.Tx) + + // Err() channel will be closed when unsubscribing. + case <-self.txSub.Err(): + return + } } } diff --git a/eth/handler_test.go b/eth/handler_test.go index ca9c9e1b40..aba2774444 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -474,7 +474,7 @@ func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool config = ¶ms.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked} gspec = &core.Genesis{Config: config} genesis = gspec.MustCommit(db) - blockchain, _ = core.NewBlockChain(db, config, pow, evmux, vm.Config{}) + blockchain, _ = core.NewBlockChain(db, config, pow, vm.Config{}) ) pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, 1000, evmux, new(testTxPool), pow, blockchain, db) if err != nil { diff --git a/eth/helper_test.go b/eth/helper_test.go index 546478a3ee..f1dab95283 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -59,7 +59,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func Alloc: core.GenesisAlloc{testBank: {Balance: big.NewInt(1000000)}}, } genesis = gspec.MustCommit(db) - blockchain, _ = core.NewBlockChain(db, gspec.Config, engine, evmux, vm.Config{}) + blockchain, _ = core.NewBlockChain(db, gspec.Config, engine, vm.Config{}) ) chain, _ := core.GenerateChain(gspec.Config, genesis, db, blocks, generator) if _, err := blockchain.InsertChain(chain); err != nil { @@ -88,8 +88,9 @@ func newTestProtocolManagerMust(t *testing.T, mode downloader.SyncMode, blocks i // testTxPool is a fake, helper transaction pool for testing purposes type testTxPool struct { - pool []*types.Transaction // Collection of all transactions - added chan<- []*types.Transaction // Notification channel for new transactions + txFeed event.Feed + pool []*types.Transaction // Collection of all transactions + added chan<- []*types.Transaction // Notification channel for new transactions lock sync.RWMutex // Protects the transaction pool } @@ -124,6 +125,10 @@ func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) { return batches, nil } +func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { + return p.txFeed.Subscribe(ch) +} + // newTestTransaction create a new dummy transaction. func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *types.Transaction { tx := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), big.NewInt(100000), big.NewInt(0), make([]byte, datasize)) diff --git a/eth/protocol.go b/eth/protocol.go index 376e4663e3..2c41376fa8 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -22,7 +22,9 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rlp" ) @@ -100,6 +102,10 @@ type txPool interface { // Pending should return pending transactions. // The slice should be modifiable by the caller. Pending() (map[common.Address]types.Transactions, error) + + // SubscribeTxPreEvent should return an event subscription of + // TxPreEvent and send events to the given channel. + SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription } // statusData is the network packet for the status message. diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index b75c5e6da4..bb03dc72b0 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -44,9 +44,27 @@ import ( "golang.org/x/net/websocket" ) -// historyUpdateRange is the number of blocks a node should report upon login or -// history request. -const historyUpdateRange = 50 +const ( + // historyUpdateRange is the number of blocks a node should report upon login or + // history request. + historyUpdateRange = 50 + + // txChanSize is the size of channel listening to TxPreEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 + // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + chainHeadChanSize = 10 +) + +type txPool interface { + // SubscribeTxPreEvent should return an event subscription of + // TxPreEvent and send events to the given channel. + SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription +} + +type blockChain interface { + SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription +} // Service implements an Ethereum netstats reporting daemon that pushes local // chain statistics up to a monitoring server. @@ -118,16 +136,22 @@ func (s *Service) Stop() error { // until termination. func (s *Service) loop() { // Subscribe to chain events to execute updates on - var emux *event.TypeMux + var blockchain blockChain + var txpool txPool if s.eth != nil { - emux = s.eth.EventMux() + blockchain = s.eth.BlockChain() + txpool = s.eth.TxPool() } else { - emux = s.les.EventMux() + blockchain = s.les.BlockChain() + txpool = s.les.TxPool() } - headSub := emux.Subscribe(core.ChainHeadEvent{}) + + chainHeadCh := make(chan core.ChainHeadEvent, chainHeadChanSize) + headSub := blockchain.SubscribeChainHeadEvent(chainHeadCh) defer headSub.Unsubscribe() - txSub := emux.Subscribe(core.TxPreEvent{}) + txEventCh := make(chan core.TxPreEvent, txChanSize) + txSub := txpool.SubscribeTxPreEvent(txEventCh) defer txSub.Unsubscribe() // Start a goroutine that exhausts the subsciptions to avoid events piling up @@ -139,25 +163,18 @@ func (s *Service) loop() { go func() { var lastTx mclock.AbsTime + HandleLoop: for { select { // Notify of chain head events, but drop if too frequent - case head, ok := <-headSub.Chan(): - if !ok { // node stopped - close(quitCh) - return - } + case head := <-chainHeadCh: select { - case headCh <- head.Data.(core.ChainHeadEvent).Block: + case headCh <- head.Block: default: } // Notify of new transaction events, but drop if too frequent - case _, ok := <-txSub.Chan(): - if !ok { // node stopped - close(quitCh) - return - } + case <-txEventCh: if time.Duration(mclock.Now()-lastTx) < time.Second { continue } @@ -167,8 +184,16 @@ func (s *Service) loop() { case txCh <- struct{}{}: default: } + + // node stopped + case <-txSub.Err(): + break HandleLoop + case <-headSub.Err(): + break HandleLoop } } + close(quitCh) + return }() // Loop reporting until termination for { diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index d122b7915b..be17ffeae4 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -53,6 +53,9 @@ type Backend interface { GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) GetTd(blockHash common.Hash) *big.Int GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header, vmCfg vm.Config) (*vm.EVM, func() error, error) + SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription + SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription + SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription // TxPool API SendTx(ctx context.Context, signedTx *types.Transaction) error @@ -62,6 +65,7 @@ type Backend interface { GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) Stats() (pending int, queued int) TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) + SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription ChainConfig() *params.ChainConfig CurrentBlock() *types.Block diff --git a/les/api_backend.go b/les/api_backend.go index 7a3c2447c9..1323e88644 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -124,6 +124,30 @@ func (b *LesApiBackend) TxPoolContent() (map[common.Address]types.Transactions, return b.eth.txPool.Content() } +func (b *LesApiBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { + return b.eth.txPool.SubscribeTxPreEvent(ch) +} + +func (b *LesApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { + return b.eth.blockchain.SubscribeChainEvent(ch) +} + +func (b *LesApiBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { + return b.eth.blockchain.SubscribeChainHeadEvent(ch) +} + +func (b *LesApiBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription { + return b.eth.blockchain.SubscribeChainSideEvent(ch) +} + +func (b *LesApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return b.eth.blockchain.SubscribeLogsEvent(ch) +} + +func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { + return b.eth.blockchain.SubscribeRemovedLogsEvent(ch) +} + func (b *LesApiBackend) Downloader() *downloader.Downloader { return b.eth.Downloader() } diff --git a/les/backend.go b/les/backend.go index a4e7726717..4c33417c03 100644 --- a/les/backend.go +++ b/les/backend.go @@ -103,7 +103,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { eth.serverPool = newServerPool(chainDb, quitSync, ð.wg) eth.retriever = newRetrieveManager(peers, eth.reqDist, eth.serverPool) eth.odr = NewLesOdr(chainDb, eth.retriever) - if eth.blockchain, err = light.NewLightChain(eth.odr, eth.chainConfig, eth.engine, eth.eventMux); err != nil { + if eth.blockchain, err = light.NewLightChain(eth.odr, eth.chainConfig, eth.engine); err != nil { return nil, err } // Rewind the chain in case of an incompatible config upgrade. @@ -113,7 +113,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { core.WriteChainConfig(chainDb, genesisHash, chainConfig) } - eth.txPool = light.NewTxPool(eth.chainConfig, eth.eventMux, eth.blockchain, eth.relay) + eth.txPool = light.NewTxPool(eth.chainConfig, eth.blockchain, eth.relay) if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, true, config.NetworkId, eth.eventMux, eth.engine, eth.peers, eth.blockchain, nil, chainDb, eth.odr, eth.relay, quitSync, ð.wg); err != nil { return nil, err } diff --git a/les/handler.go b/les/handler.go index 234b6e998b..1a75cd369d 100644 --- a/les/handler.go +++ b/les/handler.go @@ -82,6 +82,7 @@ type BlockChain interface { GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash LastBlockHash() common.Hash Genesis() *types.Block + SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription } type txPool interface { diff --git a/les/helper_test.go b/les/helper_test.go index 7dccfc4582..b33454e1d0 100644 --- a/les/helper_test.go +++ b/les/helper_test.go @@ -146,9 +146,9 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor } if lightSync { - chain, _ = light.NewLightChain(odr, gspec.Config, engine, evmux) + chain, _ = light.NewLightChain(odr, gspec.Config, engine) } else { - blockchain, _ := core.NewBlockChain(db, gspec.Config, engine, evmux, vm.Config{}) + blockchain, _ := core.NewBlockChain(db, gspec.Config, engine, vm.Config{}) gchain, _ := core.GenerateChain(gspec.Config, genesis, db, blocks, generator) if _, err := blockchain.InsertChain(gchain); err != nil { panic(err) diff --git a/les/server.go b/les/server.go index 39ef0efffe..8b27307141 100644 --- a/les/server.go +++ b/les/server.go @@ -271,7 +271,8 @@ func (s *requestCostStats) update(msgCode, reqCnt, cost uint64) { func (pm *ProtocolManager) blockLoop() { pm.wg.Add(1) - sub := pm.eventMux.Subscribe(core.ChainHeadEvent{}) + headCh := make(chan core.ChainHeadEvent, 10) + headSub := pm.blockchain.SubscribeChainHeadEvent(headCh) newCht := make(chan struct{}, 10) newCht <- struct{}{} go func() { @@ -280,10 +281,10 @@ func (pm *ProtocolManager) blockLoop() { lastBroadcastTd := common.Big0 for { select { - case ev := <-sub.Chan(): + case ev := <-headCh: peers := pm.peers.AllPeers() if len(peers) > 0 { - header := ev.Data.(core.ChainHeadEvent).Block.Header() + header := ev.Block.Header() hash := header.Hash() number := header.Number.Uint64() td := core.GetTd(pm.chainDb, hash, number) @@ -319,7 +320,7 @@ func (pm *ProtocolManager) blockLoop() { } }() case <-pm.quitSync: - sub.Unsubscribe() + headSub.Unsubscribe() pm.wg.Done() return } diff --git a/light/lightchain.go b/light/lightchain.go index a51043975f..df194ecadf 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -44,11 +44,14 @@ var ( // headers, downloading block bodies and receipts on demand through an ODR // interface. It only does header validation during chain insertion. type LightChain struct { - hc *core.HeaderChain - chainDb ethdb.Database - odr OdrBackend - eventMux *event.TypeMux - genesisBlock *types.Block + hc *core.HeaderChain + chainDb ethdb.Database + odr OdrBackend + chainFeed event.Feed + chainSideFeed event.Feed + chainHeadFeed event.Feed + scope event.SubscriptionScope + genesisBlock *types.Block mu sync.RWMutex chainmu sync.RWMutex @@ -69,7 +72,7 @@ type LightChain struct { // NewLightChain returns a fully initialised light chain using information // available in the database. It initialises the default Ethereum header // validator. -func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.Engine, mux *event.TypeMux) (*LightChain, error) { +func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.Engine) (*LightChain, error) { bodyCache, _ := lru.New(bodyCacheLimit) bodyRLPCache, _ := lru.New(bodyCacheLimit) blockCache, _ := lru.New(blockCacheLimit) @@ -77,7 +80,6 @@ func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus. bc := &LightChain{ chainDb: odr.Database(), odr: odr, - eventMux: mux, quit: make(chan struct{}), bodyCache: bodyCache, bodyRLPCache: bodyRLPCache, @@ -316,16 +318,18 @@ func (self *LightChain) Rollback(chain []common.Hash) { } // postChainEvents iterates over the events generated by a chain insertion and -// posts them into the event mux. +// posts them into the event feed. func (self *LightChain) postChainEvents(events []interface{}) { for _, event := range events { - if event, ok := event.(core.ChainEvent); ok { - if self.LastBlockHash() == event.Hash { - self.eventMux.Post(core.ChainHeadEvent{Block: event.Block}) + switch ev := event.(type) { + case core.ChainEvent: + if self.LastBlockHash() == ev.Hash { + self.chainHeadFeed.Send(core.ChainHeadEvent{Block: ev.Block}) } + self.chainFeed.Send(ev) + case core.ChainSideEvent: + self.chainSideFeed.Send(ev) } - // Fire the insertion events individually too - self.eventMux.Post(event) } } @@ -467,3 +471,30 @@ func (self *LightChain) LockChain() { func (self *LightChain) UnlockChain() { self.chainmu.RUnlock() } + +// SubscribeChainEvent registers a subscription of ChainEvent. +func (self *LightChain) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { + return self.scope.Track(self.chainFeed.Subscribe(ch)) +} + +// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent. +func (self *LightChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { + return self.scope.Track(self.chainHeadFeed.Subscribe(ch)) +} + +// SubscribeChainSideEvent registers a subscription of ChainSideEvent. +func (self *LightChain) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription { + return self.scope.Track(self.chainSideFeed.Subscribe(ch)) +} + +// SubscribeLogsEvent implements the interface of filters.Backend +// LightChain does not send logs events, so return an empty subscription. +func (self *LightChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return self.scope.Track(new(event.Feed).Subscribe(ch)) +} + +// SubscribeRemovedLogsEvent implements the interface of filters.Backend +// LightChain does not send core.RemovedLogsEvent, so return an empty subscription. +func (self *LightChain) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { + return self.scope.Track(new(event.Feed).Subscribe(ch)) +} diff --git a/light/lightchain_test.go b/light/lightchain_test.go index 0ad6405256..40a4d396a8 100644 --- a/light/lightchain_test.go +++ b/light/lightchain_test.go @@ -26,7 +26,6 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -55,7 +54,7 @@ func newCanonical(n int) (ethdb.Database, *LightChain, error) { db, _ := ethdb.NewMemDatabase() gspec := core.Genesis{Config: params.TestChainConfig} genesis := gspec.MustCommit(db) - blockchain, _ := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFaker(), new(event.TypeMux)) + blockchain, _ := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFaker()) // Create and inject the requested chain if n == 0 { @@ -75,7 +74,7 @@ func newTestLightChain() *LightChain { Config: params.TestChainConfig, } gspec.MustCommit(db) - lc, err := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFullFaker(), new(event.TypeMux)) + lc, err := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFullFaker()) if err != nil { panic(err) } @@ -339,7 +338,7 @@ func TestReorgBadHeaderHashes(t *testing.T) { defer func() { delete(core.BadHashes, headers[3].Hash()) }() // Create a new LightChain and check that it rolled back the state. - ncm, err := NewLightChain(&dummyOdr{db: bc.chainDb}, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux)) + ncm, err := NewLightChain(&dummyOdr{db: bc.chainDb}, params.TestChainConfig, ethash.NewFaker()) if err != nil { t.Fatalf("failed to create new chain manager: %v", err) } diff --git a/light/odr_test.go b/light/odr_test.go index 544b64effc..bd1e976e81 100644 --- a/light/odr_test.go +++ b/light/odr_test.go @@ -33,7 +33,6 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" @@ -233,7 +232,6 @@ func testChainGen(i int, block *core.BlockGen) { func testChainOdr(t *testing.T, protocol int, fn odrTestFn) { var ( - evmux = new(event.TypeMux) sdb, _ = ethdb.NewMemDatabase() ldb, _ = ethdb.NewMemDatabase() gspec = core.Genesis{Alloc: core.GenesisAlloc{testBankAddress: {Balance: testBankFunds}}} @@ -241,14 +239,14 @@ func testChainOdr(t *testing.T, protocol int, fn odrTestFn) { ) gspec.MustCommit(ldb) // Assemble the test environment - blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), evmux, vm.Config{}) + blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{}) gchain, _ := core.GenerateChain(params.TestChainConfig, genesis, sdb, 4, testChainGen) if _, err := blockchain.InsertChain(gchain); err != nil { t.Fatal(err) } odr := &testOdr{sdb: sdb, ldb: ldb} - lightchain, err := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker(), evmux) + lightchain, err := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker()) if err != nil { t.Fatal(err) } diff --git a/light/trie_test.go b/light/trie_test.go index 9b2cf7c2b1..5f45c01af9 100644 --- a/light/trie_test.go +++ b/light/trie_test.go @@ -28,7 +28,6 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" ) @@ -41,7 +40,7 @@ func TestNodeIterator(t *testing.T) { genesis = gspec.MustCommit(fulldb) ) gspec.MustCommit(lightdb) - blockchain, _ := core.NewBlockChain(fulldb, params.TestChainConfig, ethash.NewFullFaker(), new(event.TypeMux), vm.Config{}) + blockchain, _ := core.NewBlockChain(fulldb, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{}) gchain, _ := core.GenerateChain(params.TestChainConfig, genesis, fulldb, 4, testChainGen) if _, err := blockchain.InsertChain(gchain); err != nil { panic(err) diff --git a/light/txpool.go b/light/txpool.go index 7cbb991e85..bd215b9928 100644 --- a/light/txpool.go +++ b/light/txpool.go @@ -33,6 +33,11 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) +const ( + // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + chainHeadChanSize = 10 +) + // txPermanent is the number of mined blocks after a mined transaction is // considered permanent and no rollback is expected var txPermanent = uint64(500) @@ -43,21 +48,23 @@ var txPermanent = uint64(500) // always receive all locally signed transactions in the same order as they are // created. type TxPool struct { - config *params.ChainConfig - signer types.Signer - quit chan bool - eventMux *event.TypeMux - events *event.TypeMuxSubscription - mu sync.RWMutex - chain *LightChain - odr OdrBackend - chainDb ethdb.Database - relay TxRelayBackend - head common.Hash - nonce map[common.Address]uint64 // "pending" nonce - pending map[common.Hash]*types.Transaction // pending transactions by tx hash - mined map[common.Hash][]*types.Transaction // mined transactions by block hash - clearIdx uint64 // earliest block nr that can contain mined tx info + config *params.ChainConfig + signer types.Signer + quit chan bool + txFeed event.Feed + scope event.SubscriptionScope + chainHeadCh chan core.ChainHeadEvent + chainHeadSub event.Subscription + mu sync.RWMutex + chain *LightChain + odr OdrBackend + chainDb ethdb.Database + relay TxRelayBackend + head common.Hash + nonce map[common.Address]uint64 // "pending" nonce + pending map[common.Hash]*types.Transaction // pending transactions by tx hash + mined map[common.Hash][]*types.Transaction // mined transactions by block hash + clearIdx uint64 // earliest block nr that can contain mined tx info homestead bool } @@ -78,23 +85,24 @@ type TxRelayBackend interface { } // NewTxPool creates a new light transaction pool -func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, chain *LightChain, relay TxRelayBackend) *TxPool { +func NewTxPool(config *params.ChainConfig, chain *LightChain, relay TxRelayBackend) *TxPool { pool := &TxPool{ - config: config, - signer: types.NewEIP155Signer(config.ChainId), - nonce: make(map[common.Address]uint64), - pending: make(map[common.Hash]*types.Transaction), - mined: make(map[common.Hash][]*types.Transaction), - quit: make(chan bool), - eventMux: eventMux, - events: eventMux.Subscribe(core.ChainHeadEvent{}), - chain: chain, - relay: relay, - odr: chain.Odr(), - chainDb: chain.Odr().Database(), - head: chain.CurrentHeader().Hash(), - clearIdx: chain.CurrentHeader().Number.Uint64(), - } + config: config, + signer: types.NewEIP155Signer(config.ChainId), + nonce: make(map[common.Address]uint64), + pending: make(map[common.Hash]*types.Transaction), + mined: make(map[common.Hash][]*types.Transaction), + quit: make(chan bool), + chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), + chain: chain, + relay: relay, + odr: chain.Odr(), + chainDb: chain.Odr().Database(), + head: chain.CurrentHeader().Hash(), + clearIdx: chain.CurrentHeader().Number.Uint64(), + } + // Subscribe events from blockchain + pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh) go pool.eventLoop() return pool @@ -274,13 +282,17 @@ const blockCheckTimeout = time.Second * 3 // eventLoop processes chain head events and also notifies the tx relay backend // about the new head hash and tx state changes func (pool *TxPool) eventLoop() { - for ev := range pool.events.Chan() { - switch ev.Data.(type) { - case core.ChainHeadEvent: - pool.setNewHead(ev.Data.(core.ChainHeadEvent).Block.Header()) + for { + select { + case ev := <-pool.chainHeadCh: + pool.setNewHead(ev.Block.Header()) // hack in order to avoid hogging the lock; this part will // be replaced by a subsequent PR. time.Sleep(time.Millisecond) + + // System stopped + case <-pool.chainHeadSub.Err(): + return } } } @@ -301,11 +313,20 @@ func (pool *TxPool) setNewHead(head *types.Header) { // Stop stops the light transaction pool func (pool *TxPool) Stop() { + // Unsubscribe all subscriptions registered from txpool + pool.scope.Close() + // Unsubscribe subscriptions registered from blockchain + pool.chainHeadSub.Unsubscribe() close(pool.quit) - pool.events.Unsubscribe() log.Info("Transaction pool stopped") } +// SubscribeTxPreEvent registers a subscription of core.TxPreEvent and +// starts sending event to the given channel. +func (pool *TxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { + return pool.scope.Track(pool.txFeed.Subscribe(ch)) +} + // Stats returns the number of currently pending (locally created) transactions func (pool *TxPool) Stats() (pending int) { pool.mu.RLock() @@ -388,7 +409,7 @@ func (self *TxPool) add(ctx context.Context, tx *types.Transaction) error { // Notify the subscribers. This event is posted in a goroutine // because it's possible that somewhere during the post "Remove transaction" // gets called which will then wait for the global tx pool lock and deadlock. - go self.eventMux.Post(core.TxPreEvent{Tx: tx}) + go self.txFeed.Send(core.TxPreEvent{Tx: tx}) } // Print a log message if low enough level is set diff --git a/light/txpool_test.go b/light/txpool_test.go index f23832a417..fe7936ac28 100644 --- a/light/txpool_test.go +++ b/light/txpool_test.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -82,7 +81,6 @@ func TestTxPool(t *testing.T) { } var ( - evmux = new(event.TypeMux) sdb, _ = ethdb.NewMemDatabase() ldb, _ = ethdb.NewMemDatabase() gspec = core.Genesis{Alloc: core.GenesisAlloc{testBankAddress: {Balance: testBankFunds}}} @@ -90,7 +88,7 @@ func TestTxPool(t *testing.T) { ) gspec.MustCommit(ldb) // Assemble the test environment - blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), evmux, vm.Config{}) + blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{}) gchain, _ := core.GenerateChain(params.TestChainConfig, genesis, sdb, poolTestBlocks, txPoolTestChainGen) if _, err := blockchain.InsertChain(gchain); err != nil { panic(err) @@ -102,9 +100,9 @@ func TestTxPool(t *testing.T) { discard: make(chan int, 1), mined: make(chan int, 1), } - lightchain, _ := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker(), evmux) + lightchain, _ := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker()) txPermanent = 50 - pool := NewTxPool(params.TestChainConfig, evmux, lightchain, relay) + pool := NewTxPool(params.TestChainConfig, lightchain, relay) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() diff --git a/miner/worker.go b/miner/worker.go index dab192c24f..24e03be60a 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -41,6 +41,14 @@ import ( const ( resultQueueSize = 10 miningLogAtDepth = 5 + + // txChanSize is the size of channel listening to TxPreEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 + // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + chainHeadChanSize = 10 + // chainSideChanSize is the size of channel listening to ChainSideEvent. + chainSideChanSize = 10 ) // Agent can register themself with the worker @@ -87,9 +95,14 @@ type worker struct { mu sync.Mutex // update loop - mux *event.TypeMux - events *event.TypeMuxSubscription - wg sync.WaitGroup + mux *event.TypeMux + txCh chan core.TxPreEvent + txSub event.Subscription + chainHeadCh chan core.ChainHeadEvent + chainHeadSub event.Subscription + chainSideCh chan core.ChainSideEvent + chainSideSub event.Subscription + wg sync.WaitGroup agents map[Agent]struct{} recv chan *Result @@ -123,6 +136,9 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com engine: engine, eth: eth, mux: mux, + txCh: make(chan core.TxPreEvent, txChanSize), + chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), + chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), chainDb: eth.ChainDb(), recv: make(chan *Result, resultQueueSize), chain: eth.BlockChain(), @@ -133,7 +149,11 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), fullValidation: false, } - worker.events = worker.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{}) + // Subscribe TxPreEvent for tx pool + worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh) + // Subscribe events for blockchain + worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) + worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) go worker.update() go worker.wait() @@ -225,20 +245,28 @@ func (self *worker) unregister(agent Agent) { } func (self *worker) update() { - for event := range self.events.Chan() { + defer self.txSub.Unsubscribe() + defer self.chainHeadSub.Unsubscribe() + defer self.chainSideSub.Unsubscribe() + + for { // A real event arrived, process interesting content - switch ev := event.Data.(type) { - case core.ChainHeadEvent: + select { + // Handle ChainHeadEvent + case <-self.chainHeadCh: self.commitNewWork() - case core.ChainSideEvent: + + // Handle ChainSideEvent + case ev := <-self.chainSideCh: self.uncleMu.Lock() self.possibleUncles[ev.Block.Hash()] = ev.Block self.uncleMu.Unlock() - case core.TxPreEvent: + + // Handle TxPreEvent + case ev := <-self.txCh: // Apply transaction to the pending state if we're not mining if atomic.LoadInt32(&self.mining) == 0 { self.currentMu.Lock() - acc, _ := types.Sender(self.current.signer, ev.Tx) txs := map[common.Address]types.Transactions{acc: {ev.Tx}} txset := types.NewTransactionsByPriceAndNonce(txs) @@ -246,6 +274,14 @@ func (self *worker) update() { self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase) self.currentMu.Unlock() } + + // System stopped + case <-self.txSub.Err(): + return + case <-self.chainHeadSub.Err(): + return + case <-self.chainSideSub.Err(): + return } } } @@ -298,12 +334,18 @@ func (self *worker) wait() { // broadcast before waiting for validation go func(block *types.Block, logs []*types.Log, receipts []*types.Receipt) { self.mux.Post(core.NewMinedBlockEvent{Block: block}) - self.mux.Post(core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) + var ( + events []interface{} + coalescedLogs []*types.Log + ) + events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) if stat == core.CanonStatTy { - self.mux.Post(core.ChainHeadEvent{Block: block}) - self.mux.Post(logs) + events = append(events, core.ChainHeadEvent{Block: block}) + coalescedLogs = logs } + // post blockchain events + self.chain.PostChainEvents(events, coalescedLogs) if err := core.WriteBlockReceipts(self.chainDb, block.Hash(), block.NumberU64(), receipts); err != nil { log.Warn("Failed writing block receipts", "err", err) } diff --git a/tests/block_test_util.go b/tests/block_test_util.go index c1d433ff39..a789e6d887 100644 --- a/tests/block_test_util.go +++ b/tests/block_test_util.go @@ -33,7 +33,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" ) @@ -111,7 +110,7 @@ func (t *BlockTest) Run() error { return fmt.Errorf("genesis block state root does not match test: computed=%x, test=%x", gblock.Root().Bytes()[:6], t.json.Genesis.StateRoot[:6]) } - chain, err := core.NewBlockChain(db, config, ethash.NewShared(), new(event.TypeMux), vm.Config{}) + chain, err := core.NewBlockChain(db, config, ethash.NewShared(), vm.Config{}) if err != nil { return err }