From a1783d169732dd34aa8c7d68f411ce741c1a5015 Mon Sep 17 00:00:00 2001 From: gary rong Date: Tue, 14 Aug 2018 23:34:33 +0800 Subject: [PATCH] miner: move agent logic to worker (#17351) * miner: move agent logic to worker * miner: polish * core: persist block before reorg --- core/blockchain.go | 7 +- miner/agent.go | 116 ------- miner/miner.go | 64 ++-- miner/worker.go | 721 ++++++++++++++++++++++++------------------- miner/worker_test.go | 212 +++++++++++++ 5 files changed, 646 insertions(+), 474 deletions(-) delete mode 100644 miner/agent.go create mode 100644 miner/worker_test.go diff --git a/core/blockchain.go b/core/blockchain.go index 62dc261250..0461da7fd9 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -899,9 +899,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil { return NonStatTy, err } - // Write other block data using a batch. - batch := bc.db.NewBatch() - rawdb.WriteBlock(batch, block) + rawdb.WriteBlock(bc.db, block) root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number())) if err != nil { @@ -955,6 +953,9 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. } } } + + // Write other block data using a batch. + batch := bc.db.NewBatch() rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts) // If the total difficulty is higher than our known, add it to the canonical chain diff --git a/miner/agent.go b/miner/agent.go deleted file mode 100644 index e922ea153c..0000000000 --- a/miner/agent.go +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright 2015 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package miner - -import ( - "sync" - "sync/atomic" - - "github.com/ethereum/go-ethereum/consensus" - "github.com/ethereum/go-ethereum/log" -) - -type CpuAgent struct { - mu sync.Mutex - - taskCh chan *Package - returnCh chan<- *Package - stop chan struct{} - quitCurrentOp chan struct{} - - chain consensus.ChainReader - engine consensus.Engine - - started int32 // started indicates whether the agent is currently started -} - -func NewCpuAgent(chain consensus.ChainReader, engine consensus.Engine) *CpuAgent { - agent := &CpuAgent{ - chain: chain, - engine: engine, - stop: make(chan struct{}, 1), - taskCh: make(chan *Package, 1), - } - return agent -} - -func (self *CpuAgent) AssignTask(p *Package) { - if atomic.LoadInt32(&self.started) == 1 { - self.taskCh <- p - } -} -func (self *CpuAgent) DeliverTo(ch chan<- *Package) { self.returnCh = ch } - -func (self *CpuAgent) Start() { - if !atomic.CompareAndSwapInt32(&self.started, 0, 1) { - return // agent already started - } - go self.update() -} - -func (self *CpuAgent) Stop() { - if !atomic.CompareAndSwapInt32(&self.started, 1, 0) { - return // agent already stopped - } - self.stop <- struct{}{} -done: - // Empty work channel - for { - select { - case <-self.taskCh: - default: - break done - } - } -} - -func (self *CpuAgent) update() { -out: - for { - select { - case p := <-self.taskCh: - self.mu.Lock() - if self.quitCurrentOp != nil { - close(self.quitCurrentOp) - } - self.quitCurrentOp = make(chan struct{}) - go self.mine(p, self.quitCurrentOp) - self.mu.Unlock() - case <-self.stop: - self.mu.Lock() - if self.quitCurrentOp != nil { - close(self.quitCurrentOp) - self.quitCurrentOp = nil - } - self.mu.Unlock() - break out - } - } -} - -func (self *CpuAgent) mine(p *Package, stop <-chan struct{}) { - var err error - if p.Block, err = self.engine.Seal(self.chain, p.Block, stop); p.Block != nil { - log.Info("Successfully sealed new block", "number", p.Block.Number(), "hash", p.Block.Hash()) - self.returnCh <- p - } else { - if err != nil { - log.Warn("Block sealing failed", "err", err) - } - self.returnCh <- nil - } -} diff --git a/miner/miner.go b/miner/miner.go index 4c5717c8ad..e350e456e9 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -21,14 +21,12 @@ import ( "fmt" "sync/atomic" - "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" @@ -36,10 +34,8 @@ import ( // Backend wraps all methods required for mining. type Backend interface { - AccountManager() *accounts.Manager BlockChain() *core.BlockChain TxPool() *core.TxPool - ChainDb() ethdb.Database } // Miner creates blocks and searches for proof-of-work values. @@ -49,6 +45,7 @@ type Miner struct { coinbase common.Address eth Backend engine consensus.Engine + exitCh chan struct{} canStart int32 // can start indicates whether we can start the mining operation shouldStart int32 // should start indicates whether we should start after sync @@ -59,10 +56,10 @@ func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine con eth: eth, mux: mux, engine: engine, + exitCh: make(chan struct{}), worker: newWorker(config, engine, eth, mux), canStart: 1, } - miner.Register(NewCpuAgent(eth.BlockChain(), engine)) go miner.update() return miner @@ -74,28 +71,35 @@ func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine con // and halt your mining operation for as long as the DOS continues. func (self *Miner) update() { events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{}) -out: - for ev := range events.Chan() { - switch ev.Data.(type) { - case downloader.StartEvent: - atomic.StoreInt32(&self.canStart, 0) - if self.Mining() { - self.Stop() - atomic.StoreInt32(&self.shouldStart, 1) - log.Info("Mining aborted due to sync") - } - case downloader.DoneEvent, downloader.FailedEvent: - shouldStart := atomic.LoadInt32(&self.shouldStart) == 1 + defer events.Unsubscribe() - atomic.StoreInt32(&self.canStart, 1) - atomic.StoreInt32(&self.shouldStart, 0) - if shouldStart { - self.Start(self.coinbase) + for { + select { + case ev := <-events.Chan(): + if ev == nil { + return + } + switch ev.Data.(type) { + case downloader.StartEvent: + atomic.StoreInt32(&self.canStart, 0) + if self.Mining() { + self.Stop() + atomic.StoreInt32(&self.shouldStart, 1) + log.Info("Mining aborted due to sync") + } + case downloader.DoneEvent, downloader.FailedEvent: + shouldStart := atomic.LoadInt32(&self.shouldStart) == 1 + + atomic.StoreInt32(&self.canStart, 1) + atomic.StoreInt32(&self.shouldStart, 0) + if shouldStart { + self.Start(self.coinbase) + } + // stop immediately and ignore all further pending events + return } - // unsubscribe. we're only interested in this event once - events.Unsubscribe() - // stop immediately and ignore all further pending events - break out + case <-self.exitCh: + return } } } @@ -109,7 +113,6 @@ func (self *Miner) Start(coinbase common.Address) { return } self.worker.start() - self.worker.commitNewWork() } func (self *Miner) Stop() { @@ -117,12 +120,9 @@ func (self *Miner) Stop() { atomic.StoreInt32(&self.shouldStart, 0) } -func (self *Miner) Register(agent Agent) { - self.worker.register(agent) -} - -func (self *Miner) Unregister(agent Agent) { - self.worker.unregister(agent) +func (self *Miner) Close() { + self.worker.close() + close(self.exitCh) } func (self *Miner) Mining() bool { diff --git a/miner/worker.go b/miner/worker.go index ae695f0198..81a63c29a4 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -32,16 +32,14 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" ) const ( - resultQueueSize = 10 - miningLogAtDepth = 5 - + // resultQueueSize is the size of channel listening to sealing result. + resultQueueSize = 10 // txChanSize is the size of channel listening to NewTxsEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 @@ -49,17 +47,10 @@ const ( chainHeadChanSize = 10 // chainSideChanSize is the size of channel listening to ChainSideEvent. chainSideChanSize = 10 + miningLogAtDepth = 5 ) -// Agent can register themselves with the worker -type Agent interface { - AssignTask(*Package) - DeliverTo(chan<- *Package) - Start() - Stop() -} - -// Env is the workers current environment and holds all of the current state information. +// Env is the worker's current environment and holds all of the current state information. type Env struct { config *params.ChainConfig signer types.Signer @@ -74,25 +65,124 @@ type Env struct { header *types.Header txs []*types.Transaction receipts []*types.Receipt +} - createdAt time.Time +func (env *Env) commitTransaction(tx *types.Transaction, bc *core.BlockChain, coinbase common.Address, gp *core.GasPool) (error, []*types.Log) { + snap := env.state.Snapshot() + + receipt, _, err := core.ApplyTransaction(env.config, bc, &coinbase, gp, env.state, env.header, tx, &env.header.GasUsed, vm.Config{}) + if err != nil { + env.state.RevertToSnapshot(snap) + return err, nil + } + env.txs = append(env.txs, tx) + env.receipts = append(env.receipts, receipt) + + return nil, receipt.Logs } -// Package contains all information for consensus engine sealing and result submitting. -type Package struct { - Receipts []*types.Receipt - State *state.StateDB - Block *types.Block +func (env *Env) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) { + if env.gasPool == nil { + env.gasPool = new(core.GasPool).AddGas(env.header.GasLimit) + } + + var coalescedLogs []*types.Log + + for { + // If we don't have enough gas for any further transactions then we're done + if env.gasPool.Gas() < params.TxGas { + log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) + break + } + // Retrieve the next transaction and abort if all done + tx := txs.Peek() + if tx == nil { + break + } + // Error may be ignored here. The error has already been checked + // during transaction acceptance is the transaction pool. + // + // We use the eip155 signer regardless of the current hf. + from, _ := types.Sender(env.signer, tx) + // Check whether the tx is replay protected. If we're not in the EIP155 hf + // phase, start ignoring the sender until we do. + if tx.Protected() && !env.config.IsEIP155(env.header.Number) { + log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", env.config.EIP155Block) + + txs.Pop() + continue + } + // Start executing the transaction + env.state.Prepare(tx.Hash(), common.Hash{}, env.tcount) + + err, logs := env.commitTransaction(tx, bc, coinbase, env.gasPool) + switch err { + case core.ErrGasLimitReached: + // Pop the current out-of-gas transaction without shifting in the next from the account + log.Trace("Gas limit exceeded for current block", "sender", from) + txs.Pop() + + case core.ErrNonceTooLow: + // New head notification data race between the transaction pool and miner, shift + log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce()) + txs.Shift() + + case core.ErrNonceTooHigh: + // Reorg notification data race between the transaction pool and miner, skip account = + log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce()) + txs.Pop() + + case nil: + // Everything ok, collect the logs and shift in the next transaction from the same account + coalescedLogs = append(coalescedLogs, logs...) + env.tcount++ + txs.Shift() + + default: + // Strange error, discard the transaction and get the next in line (note, the + // nonce-too-high clause will prevent us from executing in vain). + log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) + txs.Shift() + } + } + + if len(coalescedLogs) > 0 || env.tcount > 0 { + // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined + // logs by filling in the block hash when the block was mined by the local miner. This can + // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. + cpy := make([]*types.Log, len(coalescedLogs)) + for i, l := range coalescedLogs { + cpy[i] = new(types.Log) + *cpy[i] = *l + } + go func(logs []*types.Log, tcount int) { + if len(logs) > 0 { + mux.Post(core.PendingLogsEvent{Logs: logs}) + } + if tcount > 0 { + mux.Post(core.PendingStateEvent{}) + } + }(cpy, env.tcount) + } } -// worker is the main object which takes care of applying messages to the new state +// task contains all information for consensus engine sealing and result submitting. +type task struct { + receipts []*types.Receipt + state *state.StateDB + block *types.Block + createdAt time.Time +} + +// worker is the main object which takes care of submitting new work to consensus engine +// and gathering the sealing result. type worker struct { config *params.ChainConfig engine consensus.Engine + eth Backend + chain *core.BlockChain - mu sync.Mutex - - // update loop + // Subscriptions mux *event.TypeMux txsCh chan core.NewTxsEvent txsSub event.Subscription @@ -101,31 +191,30 @@ type worker struct { chainSideCh chan core.ChainSideEvent chainSideSub event.Subscription - agents map[Agent]struct{} - recv chan *Package + // Channels + newWork chan struct{} + taskCh chan *task + resultCh chan *task + exitCh chan struct{} - eth Backend - chain *core.BlockChain - proc core.Validator - chainDb ethdb.Database + current *Env // An environment for current running cycle. + possibleUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks. + unconfirmed *unconfirmedBlocks // A set of locally mined blocks pending canonicalness confirmations. + mu sync.RWMutex // The lock used to protect the coinbase and extra fields coinbase common.Address extra []byte - currentMu sync.Mutex - current *Env - - snapshotMu sync.RWMutex + snapshotMu sync.RWMutex // The lock used to protect the block snapshot and state snapshot snapshotBlock *types.Block snapshotState *state.StateDB - uncleMu sync.Mutex - possibleUncles map[common.Hash]*types.Block - - unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations - // atomic status counters running int32 // The indicator whether the consensus engine is running or not. + + // Test hooks + newTaskHook func(*task) // Method to call upon receiving a new sealing task + fullTaskInterval func() // Method to call before pushing the full sealing task } func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux) *worker { @@ -134,220 +223,274 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, engine: engine, eth: eth, mux: mux, - txsCh: make(chan core.NewTxsEvent, txChanSize), - chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), - chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), - chainDb: eth.ChainDb(), - recv: make(chan *Package, resultQueueSize), chain: eth.BlockChain(), - proc: eth.BlockChain().Validator(), possibleUncles: make(map[common.Hash]*types.Block), - agents: make(map[Agent]struct{}), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), + txsCh: make(chan core.NewTxsEvent, txChanSize), + chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), + chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), + newWork: make(chan struct{}, 1), + taskCh: make(chan *task), + resultCh: make(chan *task, resultQueueSize), + exitCh: make(chan struct{}), } // Subscribe NewTxsEvent for tx pool worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) // Subscribe events for blockchain worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) - go worker.update() - go worker.wait() - worker.commitNewWork() + go worker.mainLoop() + go worker.resultLoop() + go worker.taskLoop() + // Submit first work to initialize pending state. + worker.newWork <- struct{}{} return worker } -func (self *worker) setEtherbase(addr common.Address) { - self.mu.Lock() - defer self.mu.Unlock() - self.coinbase = addr +// setEtherbase sets the etherbase used to initialize the block coinbase field. +func (w *worker) setEtherbase(addr common.Address) { + w.mu.Lock() + defer w.mu.Unlock() + w.coinbase = addr } -func (self *worker) setExtra(extra []byte) { - self.mu.Lock() - defer self.mu.Unlock() - self.extra = extra +// setExtra sets the content used to initialize the block extra field. +func (w *worker) setExtra(extra []byte) { + w.mu.Lock() + defer w.mu.Unlock() + w.extra = extra } -func (self *worker) pending() (*types.Block, *state.StateDB) { +// pending returns the pending state and corresponding block. +func (w *worker) pending() (*types.Block, *state.StateDB) { // return a snapshot to avoid contention on currentMu mutex - self.snapshotMu.RLock() - defer self.snapshotMu.RUnlock() - return self.snapshotBlock, self.snapshotState.Copy() + w.snapshotMu.RLock() + defer w.snapshotMu.RUnlock() + if w.snapshotState == nil { + return nil, nil + } + return w.snapshotBlock, w.snapshotState.Copy() } -func (self *worker) pendingBlock() *types.Block { +// pendingBlock returns pending block. +func (w *worker) pendingBlock() *types.Block { // return a snapshot to avoid contention on currentMu mutex - self.snapshotMu.RLock() - defer self.snapshotMu.RUnlock() - return self.snapshotBlock + w.snapshotMu.RLock() + defer w.snapshotMu.RUnlock() + return w.snapshotBlock } -func (self *worker) start() { - self.mu.Lock() - defer self.mu.Unlock() - atomic.StoreInt32(&self.running, 1) - for agent := range self.agents { - agent.Start() - } +// start sets the running status as 1 and triggers new work submitting. +func (w *worker) start() { + atomic.StoreInt32(&w.running, 1) + w.newWork <- struct{}{} } -func (self *worker) stop() { - self.mu.Lock() - defer self.mu.Unlock() - - atomic.StoreInt32(&self.running, 0) - for agent := range self.agents { - agent.Stop() - } +// stop sets the running status as 0. +func (w *worker) stop() { + atomic.StoreInt32(&w.running, 0) } -func (self *worker) isRunning() bool { - return atomic.LoadInt32(&self.running) == 1 +// isRunning returns an indicator whether worker is running or not. +func (w *worker) isRunning() bool { + return atomic.LoadInt32(&w.running) == 1 } -func (self *worker) register(agent Agent) { - self.mu.Lock() - defer self.mu.Unlock() - self.agents[agent] = struct{}{} - agent.DeliverTo(self.recv) - if self.isRunning() { - agent.Start() +// close terminates all background threads maintained by the worker and cleans up buffered channels. +// Note the worker does not support being closed multiple times. +func (w *worker) close() { + close(w.exitCh) + // Clean up buffered channels + for empty := false; !empty; { + select { + case <-w.resultCh: + default: + empty = true + } } } -func (self *worker) unregister(agent Agent) { - self.mu.Lock() - defer self.mu.Unlock() - delete(self.agents, agent) - agent.Stop() -} - -func (self *worker) update() { - defer self.txsSub.Unsubscribe() - defer self.chainHeadSub.Unsubscribe() - defer self.chainSideSub.Unsubscribe() +// mainLoop is a standalone goroutine to regenerate the sealing task based on the received event. +func (w *worker) mainLoop() { + defer w.txsSub.Unsubscribe() + defer w.chainHeadSub.Unsubscribe() + defer w.chainSideSub.Unsubscribe() for { - // A real event arrived, process interesting content select { - // Handle ChainHeadEvent - case <-self.chainHeadCh: - self.commitNewWork() - - // Handle ChainSideEvent - case ev := <-self.chainSideCh: - self.uncleMu.Lock() - self.possibleUncles[ev.Block.Hash()] = ev.Block - self.uncleMu.Unlock() - - // Handle NewTxsEvent - case ev := <-self.txsCh: + case <-w.newWork: + // Submit a work when the worker is created or started. + w.commitNewWork() + + case <-w.chainHeadCh: + // Resubmit a work for new cycle once worker receives chain head event. + w.commitNewWork() + + case ev := <-w.chainSideCh: + // Add side block to possible uncle block set. + w.possibleUncles[ev.Block.Hash()] = ev.Block + + case ev := <-w.txsCh: // Apply transactions to the pending state if we're not mining. // // Note all transactions received may not be continuous with transactions // already included in the current mining block. These transactions will // be automatically eliminated. - if !self.isRunning() { - self.currentMu.Lock() + if !w.isRunning() && w.current != nil { + w.mu.Lock() + coinbase := w.coinbase + w.mu.Unlock() + txs := make(map[common.Address]types.Transactions) for _, tx := range ev.Txs { - acc, _ := types.Sender(self.current.signer, tx) + acc, _ := types.Sender(w.current.signer, tx) txs[acc] = append(txs[acc], tx) } - txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs) - self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase) - self.updateSnapshot() - self.currentMu.Unlock() + txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs) + w.current.commitTransactions(w.mux, txset, w.chain, coinbase) + w.updateSnapshot() } else { // If we're mining, but nothing is being processed, wake on new transactions - if self.config.Clique != nil && self.config.Clique.Period == 0 { - self.commitNewWork() + if w.config.Clique != nil && w.config.Clique.Period == 0 { + w.commitNewWork() } } // System stopped - case <-self.txsSub.Err(): + case <-w.exitCh: return - case <-self.chainHeadSub.Err(): + case <-w.txsSub.Err(): return - case <-self.chainSideSub.Err(): + case <-w.chainHeadSub.Err(): + return + case <-w.chainSideSub.Err(): return } } } -func (self *worker) wait() { +// seal pushes a sealing task to consensus engine and submits the result. +func (w *worker) seal(t *task, stop <-chan struct{}) { + var ( + err error + res *task + ) + + if t.block, err = w.engine.Seal(w.chain, t.block, stop); t.block != nil { + log.Info("Successfully sealed new block", "number", t.block.Number(), "hash", t.block.Hash(), + "elapsed", common.PrettyDuration(time.Since(t.createdAt))) + res = t + } else { + if err != nil { + log.Warn("Block sealing failed", "err", err) + } + res = nil + } + select { + case w.resultCh <- res: + case <-w.exitCh: + } +} + +// taskLoop is a standalone goroutine to fetch sealing task from the generator and +// push them to consensus engine. +func (w *worker) taskLoop() { + var stopCh chan struct{} + + // interrupt aborts the in-flight sealing task. + interrupt := func() { + if stopCh != nil { + close(stopCh) + stopCh = nil + } + } for { - for result := range self.recv { + select { + case task := <-w.taskCh: + if w.newTaskHook != nil { + w.newTaskHook(task) + } + interrupt() + stopCh = make(chan struct{}) + go w.seal(task, stopCh) + case <-w.exitCh: + interrupt() + return + } + } +} +// resultLoop is a standalone goroutine to handle sealing result submitting +// and flush relative data to the database. +func (w *worker) resultLoop() { + for { + select { + case result := <-w.resultCh: if result == nil { continue } - block := result.Block + block := result.block // Update the block hash in all logs since it is now available and not when the // receipt/log of individual transactions were created. - for _, r := range result.Receipts { + for _, r := range result.receipts { for _, l := range r.Logs { l.BlockHash = block.Hash() } } - for _, log := range result.State.Logs() { + for _, log := range result.state.Logs() { log.BlockHash = block.Hash() } - self.currentMu.Lock() - stat, err := self.chain.WriteBlockWithState(block, result.Receipts, result.State) - self.currentMu.Unlock() + // Commit block and state to database. + stat, err := w.chain.WriteBlockWithState(block, result.receipts, result.state) if err != nil { log.Error("Failed writing block to chain", "err", err) continue } // Broadcast the block and announce chain insertion event - self.mux.Post(core.NewMinedBlockEvent{Block: block}) + w.mux.Post(core.NewMinedBlockEvent{Block: block}) var ( events []interface{} - logs = result.State.Logs() + logs = result.state.Logs() ) - events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) - if stat == core.CanonStatTy { + switch stat { + case core.CanonStatTy: + events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) events = append(events, core.ChainHeadEvent{Block: block}) + case core.SideStatTy: + events = append(events, core.ChainSideEvent{Block: block}) } - self.chain.PostChainEvents(events, logs) + w.chain.PostChainEvents(events, logs) - // Insert the block into the set of pending ones to wait for confirmations - self.unconfirmed.Insert(block.NumberU64(), block.Hash()) - } - } -} + // Insert the block into the set of pending ones to resultLoop for confirmations + w.unconfirmed.Insert(block.NumberU64(), block.Hash()) -// push sends a new work task to currently live miner agents. -func (self *worker) push(p *Package) { - for agent := range self.agents { - agent.AssignTask(p) + case <-w.exitCh: + return + } } } // makeCurrent creates a new environment for the current cycle. -func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error { - state, err := self.chain.StateAt(parent.Root()) +func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error { + state, err := w.chain.StateAt(parent.Root()) if err != nil { return err } env := &Env{ - config: self.config, - signer: types.NewEIP155Signer(self.config.ChainID), + config: w.config, + signer: types.NewEIP155Signer(w.config.ChainID), state: state, ancestors: mapset.NewSet(), family: mapset.NewSet(), uncles: mapset.NewSet(), header: header, - createdAt: time.Now(), } // when 08 is processed ancestors contain 07 (quick block) - for _, ancestor := range self.chain.GetBlocksFromHash(parent.Hash(), 7) { + for _, ancestor := range w.chain.GetBlocksFromHash(parent.Hash(), 7) { for _, uncle := range ancestor.Uncles() { env.family.Add(uncle.Hash()) } @@ -357,20 +500,63 @@ func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error // Keep track of transactions which return errors so they can be removed env.tcount = 0 - self.current = env + w.current = env return nil } -func (self *worker) commitNewWork() { - self.mu.Lock() - defer self.mu.Unlock() - self.uncleMu.Lock() - defer self.uncleMu.Unlock() - self.currentMu.Lock() - defer self.currentMu.Unlock() +// commitUncle adds the given block to uncle block set, returns error if failed to add. +func (w *worker) commitUncle(env *Env, uncle *types.Header) error { + hash := uncle.Hash() + if env.uncles.Contains(hash) { + return fmt.Errorf("uncle not unique") + } + if !env.ancestors.Contains(uncle.ParentHash) { + return fmt.Errorf("uncle's parent unknown (%x)", uncle.ParentHash[0:4]) + } + if env.family.Contains(hash) { + return fmt.Errorf("uncle already in family (%x)", hash) + } + env.uncles.Add(uncle.Hash()) + return nil +} + +// updateSnapshot updates pending snapshot block and state. +// Note this function assumes the current variable is thread safe. +func (w *worker) updateSnapshot() { + w.snapshotMu.Lock() + defer w.snapshotMu.Unlock() + + var uncles []*types.Header + w.current.uncles.Each(func(item interface{}) bool { + hash, ok := item.(common.Hash) + if !ok { + return false + } + uncle, exist := w.possibleUncles[hash] + if !exist { + return false + } + uncles = append(uncles, uncle.Header()) + return true + }) + + w.snapshotBlock = types.NewBlock( + w.current.header, + w.current.txs, + uncles, + w.current.receipts, + ) + + w.snapshotState = w.current.state.Copy() +} + +// commitNewWork generates several new sealing tasks based on the parent block. +func (w *worker) commitNewWork() { + w.mu.RLock() + defer w.mu.RUnlock() tstart := time.Now() - parent := self.chain.CurrentBlock() + parent := w.chain.CurrentBlock() tstamp := tstart.Unix() if parent.Time().Cmp(new(big.Int).SetInt64(tstamp)) >= 0 { @@ -388,28 +574,28 @@ func (self *worker) commitNewWork() { ParentHash: parent.Hash(), Number: num.Add(num, common.Big1), GasLimit: core.CalcGasLimit(parent), - Extra: self.extra, + Extra: w.extra, Time: big.NewInt(tstamp), } // Only set the coinbase if our consensus engine is running (avoid spurious block rewards) - if self.isRunning() { - if self.coinbase == (common.Address{}) { + if w.isRunning() { + if w.coinbase == (common.Address{}) { log.Error("Refusing to mine without etherbase") return } - header.Coinbase = self.coinbase + header.Coinbase = w.coinbase } - if err := self.engine.Prepare(self.chain, header); err != nil { + if err := w.engine.Prepare(w.chain, header); err != nil { log.Error("Failed to prepare header for mining", "err", err) return } // If we are care about TheDAO hard-fork check whether to override the extra-data or not - if daoBlock := self.config.DAOForkBlock; daoBlock != nil { + if daoBlock := w.config.DAOForkBlock; daoBlock != nil { // Check whether the block is among the fork extra-override range limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange) if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 { // Depending whether we support or oppose the fork, override differently - if self.config.DAOForkSupport { + if w.config.DAOForkSupport { header.Extra = common.CopyBytes(params.DAOForkBlockExtra) } else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) { header.Extra = []byte{} // If miner opposes, don't let it use the reserved extra-data @@ -417,14 +603,14 @@ func (self *worker) commitNewWork() { } } // Could potentially happen if starting to mine in an odd state. - err := self.makeCurrent(parent, header) + err := w.makeCurrent(parent, header) if err != nil { log.Error("Failed to create mining context", "err", err) return } // Create the current work task and check any fork transitions needed - env := self.current - if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 { + env := w.current + if w.config.DAOForkSupport && w.config.DAOForkBlock != nil && w.config.DAOForkBlock.Cmp(header.Number) == 0 { misc.ApplyDAOHardFork(env.state) } @@ -433,11 +619,11 @@ func (self *worker) commitNewWork() { uncles []*types.Header badUncles []common.Hash ) - for hash, uncle := range self.possibleUncles { + for hash, uncle := range w.possibleUncles { if len(uncles) == 2 { break } - if err := self.commitUncle(env, uncle.Header()); err != nil { + if err := w.commitUncle(env, uncle.Header()); err != nil { log.Trace("Bad uncle found and will be removed", "hash", hash) log.Trace(fmt.Sprint(uncle)) @@ -448,184 +634,73 @@ func (self *worker) commitNewWork() { } } for _, hash := range badUncles { - delete(self.possibleUncles, hash) + delete(w.possibleUncles, hash) } var ( - emptyBlock *types.Block - fullBlock *types.Block + emptyBlock, fullBlock *types.Block + emptyState, fullState *state.StateDB ) // Create an empty block based on temporary copied state for sealing in advance without waiting block // execution finished. - emptyState := env.state.Copy() - if emptyBlock, err = self.engine.Finalize(self.chain, header, emptyState, nil, uncles, nil); err != nil { + emptyState = env.state.Copy() + if emptyBlock, err = w.engine.Finalize(w.chain, header, emptyState, nil, uncles, nil); err != nil { log.Error("Failed to finalize block for temporary sealing", "err", err) } else { // Push empty work in advance without applying pending transaction. // The reason is transactions execution can cost a lot and sealer need to // take advantage of this part time. - if self.isRunning() { - log.Info("Commit new empty mining work", "number", emptyBlock.Number(), "uncles", len(uncles)) - self.push(&Package{nil, emptyState, emptyBlock}) + if w.isRunning() { + select { + case w.taskCh <- &task{receipts: nil, state: emptyState, block: emptyBlock, createdAt: time.Now()}: + log.Info("Commit new empty mining work", "number", emptyBlock.Number(), "uncles", len(uncles)) + case <-w.exitCh: + log.Info("Worker has exited") + return + } } } // Fill the block with all available pending transactions. - pending, err := self.eth.TxPool().Pending() + pending, err := w.eth.TxPool().Pending() if err != nil { log.Error("Failed to fetch pending transactions", "err", err) return } - txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending) - env.commitTransactions(self.mux, txs, self.chain, self.coinbase) + // Short circuit if there is no available pending transactions + if len(pending) == 0 { + w.updateSnapshot() + return + } + txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pending) + env.commitTransactions(w.mux, txs, w.chain, w.coinbase) // Create the full block to seal with the consensus engine - if fullBlock, err = self.engine.Finalize(self.chain, header, env.state, env.txs, uncles, env.receipts); err != nil { + fullState = env.state.Copy() + if fullBlock, err = w.engine.Finalize(w.chain, header, fullState, env.txs, uncles, env.receipts); err != nil { log.Error("Failed to finalize block for sealing", "err", err) return } - // We only care about logging if we're actually mining. - if self.isRunning() { - log.Info("Commit new full mining work", "number", fullBlock.Number(), "txs", env.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) - self.unconfirmed.Shift(fullBlock.NumberU64() - 1) - self.push(&Package{env.receipts, env.state, fullBlock}) - } - self.updateSnapshot() -} - -func (self *worker) commitUncle(env *Env, uncle *types.Header) error { - hash := uncle.Hash() - if env.uncles.Contains(hash) { - return fmt.Errorf("uncle not unique") - } - if !env.ancestors.Contains(uncle.ParentHash) { - return fmt.Errorf("uncle's parent unknown (%x)", uncle.ParentHash[0:4]) - } - if env.family.Contains(hash) { - return fmt.Errorf("uncle already in family (%x)", hash) - } - env.uncles.Add(uncle.Hash()) - return nil -} - -func (self *worker) updateSnapshot() { - self.snapshotMu.Lock() - defer self.snapshotMu.Unlock() - - var uncles []*types.Header - self.current.uncles.Each(func(item interface{}) bool { - if header, ok := item.(*types.Header); ok { - uncles = append(uncles, header) - return true - } - return false - }) - - self.snapshotBlock = types.NewBlock( - self.current.header, - self.current.txs, - uncles, - self.current.receipts, - ) - self.snapshotState = self.current.state.Copy() -} - -func (env *Env) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) { - if env.gasPool == nil { - env.gasPool = new(core.GasPool).AddGas(env.header.GasLimit) + // Deep copy receipts here to avoid interaction between different tasks. + cpy := make([]*types.Receipt, len(env.receipts)) + for i, l := range env.receipts { + cpy[i] = new(types.Receipt) + *cpy[i] = *l } - - var coalescedLogs []*types.Log - - for { - // If we don't have enough gas for any further transactions then we're done - if env.gasPool.Gas() < params.TxGas { - log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) - break - } - // Retrieve the next transaction and abort if all done - tx := txs.Peek() - if tx == nil { - break - } - // Error may be ignored here. The error has already been checked - // during transaction acceptance is the transaction pool. - // - // We use the eip155 signer regardless of the current hf. - from, _ := types.Sender(env.signer, tx) - // Check whether the tx is replay protected. If we're not in the EIP155 hf - // phase, start ignoring the sender until we do. - if tx.Protected() && !env.config.IsEIP155(env.header.Number) { - log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", env.config.EIP155Block) - - txs.Pop() - continue - } - // Start executing the transaction - env.state.Prepare(tx.Hash(), common.Hash{}, env.tcount) - - err, logs := env.commitTransaction(tx, bc, coinbase, env.gasPool) - switch err { - case core.ErrGasLimitReached: - // Pop the current out-of-gas transaction without shifting in the next from the account - log.Trace("Gas limit exceeded for current block", "sender", from) - txs.Pop() - - case core.ErrNonceTooLow: - // New head notification data race between the transaction pool and miner, shift - log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce()) - txs.Shift() - - case core.ErrNonceTooHigh: - // Reorg notification data race between the transaction pool and miner, skip account = - log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce()) - txs.Pop() - - case nil: - // Everything ok, collect the logs and shift in the next transaction from the same account - coalescedLogs = append(coalescedLogs, logs...) - env.tcount++ - txs.Shift() - - default: - // Strange error, discard the transaction and get the next in line (note, the - // nonce-too-high clause will prevent us from executing in vain). - log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) - txs.Shift() + // We only care about logging if we're actually mining. + if w.isRunning() { + if w.fullTaskInterval != nil { + w.fullTaskInterval() } - } - if len(coalescedLogs) > 0 || env.tcount > 0 { - // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined - // logs by filling in the block hash when the block was mined by the local miner. This can - // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. - cpy := make([]*types.Log, len(coalescedLogs)) - for i, l := range coalescedLogs { - cpy[i] = new(types.Log) - *cpy[i] = *l + select { + case w.taskCh <- &task{receipts: cpy, state: fullState, block: fullBlock, createdAt: time.Now()}: + w.unconfirmed.Shift(fullBlock.NumberU64() - 1) + log.Info("Commit new full mining work", "number", fullBlock.Number(), "txs", env.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) + case <-w.exitCh: + log.Info("Worker has exited") } - go func(logs []*types.Log, tcount int) { - if len(logs) > 0 { - mux.Post(core.PendingLogsEvent{Logs: logs}) - } - if tcount > 0 { - mux.Post(core.PendingStateEvent{}) - } - }(cpy, env.tcount) - } -} - -func (env *Env) commitTransaction(tx *types.Transaction, bc *core.BlockChain, coinbase common.Address, gp *core.GasPool) (error, []*types.Log) { - snap := env.state.Snapshot() - - receipt, _, err := core.ApplyTransaction(env.config, bc, &coinbase, gp, env.state, env.header, tx, &env.header.GasUsed, vm.Config{}) - if err != nil { - env.state.RevertToSnapshot(snap) - return err, nil } - env.txs = append(env.txs, tx) - env.receipts = append(env.receipts, receipt) - - return nil, receipt.Logs + w.updateSnapshot() } diff --git a/miner/worker_test.go b/miner/worker_test.go new file mode 100644 index 0000000000..5823a608ef --- /dev/null +++ b/miner/worker_test.go @@ -0,0 +1,212 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package miner + +import ( + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/consensus/clique" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/params" +) + +var ( + // Test chain configurations + testTxPoolConfig core.TxPoolConfig + ethashChainConfig *params.ChainConfig + cliqueChainConfig *params.ChainConfig + + // Test accounts + testBankKey, _ = crypto.GenerateKey() + testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey) + testBankFunds = big.NewInt(1000000000000000000) + + acc1Key, _ = crypto.GenerateKey() + acc1Addr = crypto.PubkeyToAddress(acc1Key.PublicKey) + + // Test transactions + pendingTxs []*types.Transaction + newTxs []*types.Transaction +) + +func init() { + testTxPoolConfig = core.DefaultTxPoolConfig + testTxPoolConfig.Journal = "" + ethashChainConfig = params.TestChainConfig + cliqueChainConfig = params.TestChainConfig + cliqueChainConfig.Clique = ¶ms.CliqueConfig{ + Period: 1, + Epoch: 30000, + } + tx1, _ := types.SignTx(types.NewTransaction(0, acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), types.HomesteadSigner{}, testBankKey) + pendingTxs = append(pendingTxs, tx1) + tx2, _ := types.SignTx(types.NewTransaction(1, acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), types.HomesteadSigner{}, testBankKey) + newTxs = append(newTxs, tx2) +} + +// testWorkerBackend implements worker.Backend interfaces and wraps all information needed during the testing. +type testWorkerBackend struct { + db ethdb.Database + txPool *core.TxPool + chain *core.BlockChain + testTxFeed event.Feed +} + +func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) *testWorkerBackend { + var ( + db = ethdb.NewMemDatabase() + gspec = core.Genesis{ + Config: chainConfig, + Alloc: core.GenesisAlloc{testBankAddress: {Balance: testBankFunds}}, + } + ) + + switch engine.(type) { + case *clique.Clique: + gspec.ExtraData = make([]byte, 32+common.AddressLength+65) + copy(gspec.ExtraData[32:], testBankAddress[:]) + case *ethash.Ethash: + default: + t.Fatal("unexpect consensus engine type") + } + gspec.MustCommit(db) + + chain, _ := core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{}) + txpool := core.NewTxPool(testTxPoolConfig, chainConfig, chain) + + return &testWorkerBackend{ + db: db, + chain: chain, + txPool: txpool, + } +} + +func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain } +func (b *testWorkerBackend) TxPool() *core.TxPool { return b.txPool } +func (b *testWorkerBackend) PostChainEvents(events []interface{}) { + b.chain.PostChainEvents(events, nil) +} + +func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) (*worker, *testWorkerBackend) { + backend := newTestWorkerBackend(t, chainConfig, engine) + backend.txPool.AddLocals(pendingTxs) + w := newWorker(chainConfig, engine, backend, new(event.TypeMux)) + w.setEtherbase(testBankAddress) + return w, backend +} + +func TestPendingStateAndBlockEthash(t *testing.T) { + testPendingStateAndBlock(t, ethashChainConfig, ethash.NewFaker()) +} +func TestPendingStateAndBlockClique(t *testing.T) { + testPendingStateAndBlock(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, ethdb.NewMemDatabase())) +} + +func testPendingStateAndBlock(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { + defer engine.Close() + + w, b := newTestWorker(t, chainConfig, engine) + defer w.close() + + // Ensure snapshot has been updated. + time.Sleep(100 * time.Millisecond) + block, state := w.pending() + if block.NumberU64() != 1 { + t.Errorf("block number mismatch, has %d, want %d", block.NumberU64(), 1) + } + if balance := state.GetBalance(acc1Addr); balance.Cmp(big.NewInt(1000)) != 0 { + t.Errorf("account balance mismatch, has %d, want %d", balance, 1000) + } + b.txPool.AddLocals(newTxs) + // Ensure the new tx events has been processed + time.Sleep(100 * time.Millisecond) + block, state = w.pending() + if balance := state.GetBalance(acc1Addr); balance.Cmp(big.NewInt(2000)) != 0 { + t.Errorf("account balance mismatch, has %d, want %d", balance, 2000) + } +} + +func TestEmptyWorkEthash(t *testing.T) { + testEmptyWork(t, ethashChainConfig, ethash.NewFaker()) +} +func TestEmptyWorkClique(t *testing.T) { + testEmptyWork(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, ethdb.NewMemDatabase())) +} + +func testEmptyWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { + defer engine.Close() + + w, _ := newTestWorker(t, chainConfig, engine) + defer w.close() + + var ( + taskCh = make(chan struct{}, 2) + taskIndex int + ) + + checkEqual := func(t *testing.T, task *task, index int) { + receiptLen, balance := 0, big.NewInt(0) + if index == 1 { + receiptLen, balance = 1, big.NewInt(1000) + } + if len(task.receipts) != receiptLen { + t.Errorf("receipt number mismatch has %d, want %d", len(task.receipts), receiptLen) + } + if task.state.GetBalance(acc1Addr).Cmp(balance) != 0 { + t.Errorf("account balance mismatch has %d, want %d", task.state.GetBalance(acc1Addr), balance) + } + } + + w.newTaskHook = func(task *task) { + if task.block.NumberU64() == 1 { + checkEqual(t, task, taskIndex) + taskIndex += 1 + taskCh <- struct{}{} + } + } + w.fullTaskInterval = func() { + time.Sleep(100 * time.Millisecond) + } + + // Ensure worker has finished initialization + for { + b := w.pendingBlock() + if b != nil && b.NumberU64() == 1 { + break + } + } + + w.start() + for i := 0; i < 2; i += 1 { + to := time.NewTimer(time.Second) + select { + case <-taskCh: + case <-to.C: + t.Error("new task timeout") + } + } +}