From 7cf56d6f064869cb62b1673f9ee437020c595391 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Thu, 10 Sep 2020 19:27:42 +0200 Subject: [PATCH] miner: use channels instead of atomics in update loop (#21536) This PR changes several different things: - Adds test cases for the miner loop - Stops the worker if it wasn't already stopped in worker.Close() - Uses channels instead of atomics in the miner.update() loop Co-authored-by: Felix Lange --- miner/miner.go | 57 +++++++-------- miner/miner_test.go | 170 ++++++++++++++++++++++++++++++++++++++++++++ miner/worker.go | 1 + 3 files changed, 196 insertions(+), 32 deletions(-) create mode 100644 miner/miner_test.go diff --git a/miner/miner.go b/miner/miner.go index 5249118cae..ec5b4cc085 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -20,7 +20,6 @@ package miner import ( "fmt" "math/big" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -61,19 +60,19 @@ type Miner struct { eth Backend engine consensus.Engine exitCh chan struct{} - - canStart int32 // can start indicates whether we can start the mining operation - shouldStart int32 // should start indicates whether we should start after sync + startCh chan common.Address + stopCh chan struct{} } func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, isLocalBlock func(block *types.Block) bool) *Miner { miner := &Miner{ - eth: eth, - mux: mux, - engine: engine, - exitCh: make(chan struct{}), - worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true), - canStart: 1, + eth: eth, + mux: mux, + engine: engine, + exitCh: make(chan struct{}), + startCh: make(chan common.Address), + stopCh: make(chan struct{}), + worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true), } go miner.update() @@ -88,6 +87,7 @@ func (miner *Miner) update() { events := miner.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{}) defer events.Unsubscribe() + shouldStart := false for { select { case ev := <-events.Chan(): @@ -96,47 +96,40 @@ func (miner *Miner) update() { } switch ev.Data.(type) { case downloader.StartEvent: - atomic.StoreInt32(&miner.canStart, 0) - if miner.Mining() { - miner.Stop() - atomic.StoreInt32(&miner.shouldStart, 1) + wasMining := miner.Mining() + miner.worker.stop() + if wasMining { + // Resume mining after sync was finished + shouldStart = true log.Info("Mining aborted due to sync") } case downloader.DoneEvent, downloader.FailedEvent: - shouldStart := atomic.LoadInt32(&miner.shouldStart) == 1 - - atomic.StoreInt32(&miner.canStart, 1) - atomic.StoreInt32(&miner.shouldStart, 0) if shouldStart { - miner.Start(miner.coinbase) + miner.SetEtherbase(miner.coinbase) + miner.worker.start() } - // stop immediately and ignore all further pending events - return } + case addr := <-miner.startCh: + miner.SetEtherbase(addr) + miner.worker.start() + case <-miner.stopCh: + miner.worker.stop() case <-miner.exitCh: + miner.worker.close() return } } } func (miner *Miner) Start(coinbase common.Address) { - atomic.StoreInt32(&miner.shouldStart, 1) - miner.SetEtherbase(coinbase) - - if atomic.LoadInt32(&miner.canStart) == 0 { - log.Info("Network syncing, will start miner afterwards") - return - } - miner.worker.start() + miner.startCh <- coinbase } func (miner *Miner) Stop() { - miner.worker.stop() - atomic.StoreInt32(&miner.shouldStart, 0) + miner.stopCh <- struct{}{} } func (miner *Miner) Close() { - miner.worker.close() close(miner.exitCh) } diff --git a/miner/miner_test.go b/miner/miner_test.go new file mode 100644 index 0000000000..447892b198 --- /dev/null +++ b/miner/miner_test.go @@ -0,0 +1,170 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package miner implements Ethereum block creation and mining. +package miner + +import ( + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/ethdb/memorydb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/trie" +) + +type mockBackend struct { + bc *core.BlockChain + txPool *core.TxPool +} + +func NewMockBackend(bc *core.BlockChain, txPool *core.TxPool) *mockBackend { + return &mockBackend{ + bc: bc, + txPool: txPool, + } +} + +func (m *mockBackend) BlockChain() *core.BlockChain { + return m.bc +} + +func (m *mockBackend) TxPool() *core.TxPool { + return m.txPool +} + +type testBlockChain struct { + statedb *state.StateDB + gasLimit uint64 + chainHeadFeed *event.Feed +} + +func (bc *testBlockChain) CurrentBlock() *types.Block { + return types.NewBlock(&types.Header{ + GasLimit: bc.gasLimit, + }, nil, nil, nil, new(trie.Trie)) +} + +func (bc *testBlockChain) GetBlock(hash common.Hash, number uint64) *types.Block { + return bc.CurrentBlock() +} + +func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) { + return bc.statedb, nil +} + +func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { + return bc.chainHeadFeed.Subscribe(ch) +} + +func TestMiner(t *testing.T) { + miner, mux := createMiner(t) + miner.Start(common.HexToAddress("0x12345")) + waitForMiningState(t, miner, true) + // Start the downloader + mux.Post(downloader.StartEvent{}) + waitForMiningState(t, miner, false) + // Stop the downloader and wait for the update loop to run + mux.Post(downloader.DoneEvent{}) + waitForMiningState(t, miner, true) + // Start the downloader and wait for the update loop to run + mux.Post(downloader.StartEvent{}) + waitForMiningState(t, miner, false) + // Stop the downloader and wait for the update loop to run + mux.Post(downloader.FailedEvent{}) + waitForMiningState(t, miner, true) +} + +func TestStartStopMiner(t *testing.T) { + miner, _ := createMiner(t) + waitForMiningState(t, miner, false) + miner.Start(common.HexToAddress("0x12345")) + waitForMiningState(t, miner, true) + miner.Stop() + waitForMiningState(t, miner, false) +} + +func TestCloseMiner(t *testing.T) { + miner, _ := createMiner(t) + waitForMiningState(t, miner, false) + miner.Start(common.HexToAddress("0x12345")) + waitForMiningState(t, miner, true) + // Terminate the miner and wait for the update loop to run + miner.Close() + waitForMiningState(t, miner, false) +} + +// waitForMiningState waits until either +// * the desired mining state was reached +// * a timeout was reached which fails the test +func waitForMiningState(t *testing.T, m *Miner, mining bool) { + t.Helper() + + var state bool + for i := 0; i < 100; i++ { + if state = m.Mining(); state == mining { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("Mining() == %t, want %t", state, mining) +} + +func createMiner(t *testing.T) (*Miner, *event.TypeMux) { + // Create Ethash config + config := Config{ + Etherbase: common.HexToAddress("123456789"), + } + // Create chainConfig + memdb := memorydb.New() + chainDB := rawdb.NewDatabase(memdb) + genesis := core.DeveloperGenesisBlock(15, common.HexToAddress("12345")) + chainConfig, _, err := core.SetupGenesisBlock(chainDB, genesis) + if err != nil { + t.Fatalf("can't create new chain config: %v", err) + } + // Create event Mux + mux := new(event.TypeMux) + // Create consensus engine + engine := ethash.New(ethash.Config{}, []string{}, false) + engine.SetThreads(-1) + // Create isLocalBlock + isLocalBlock := func(block *types.Block) bool { + return true + } + // Create Ethereum backend + limit := uint64(1000) + bc, err := core.NewBlockChain(chainDB, new(core.CacheConfig), chainConfig, engine, vm.Config{}, isLocalBlock, &limit) + if err != nil { + t.Fatalf("can't create new chain %v", err) + } + statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) + blockchain := &testBlockChain{statedb, 10000000, new(event.Feed)} + + pool := core.NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + backend := NewMockBackend(bc, pool) + // Create Miner + return New(backend, &config, chainConfig, mux, engine, isLocalBlock), mux +} diff --git a/miner/worker.go b/miner/worker.go index f042fd8e33..16f4c1c313 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -303,6 +303,7 @@ func (w *worker) isRunning() bool { // close terminates all background threads maintained by the worker. // Note the worker does not support being closed multiple times. func (w *worker) close() { + atomic.StoreInt32(&w.running, 0) close(w.exitCh) }