miner: move agent logic to worker (#17351)

* miner: move agent logic to worker

* miner: polish

* core: persist block before reorg
pull/17401/head
gary rong 6 years ago committed by Péter Szilágyi
parent e0e0e53401
commit a1783d1697
  1. 7
      core/blockchain.go
  2. 116
      miner/agent.go
  3. 64
      miner/miner.go
  4. 721
      miner/worker.go
  5. 212
      miner/worker_test.go

@ -899,9 +899,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil {
return NonStatTy, err
}
// Write other block data using a batch.
batch := bc.db.NewBatch()
rawdb.WriteBlock(batch, block)
rawdb.WriteBlock(bc.db, block)
root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
if err != nil {
@ -955,6 +953,9 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
}
}
}
// Write other block data using a batch.
batch := bc.db.NewBatch()
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts)
// If the total difficulty is higher than our known, add it to the canonical chain

@ -1,116 +0,0 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package miner
import (
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/log"
)
type CpuAgent struct {
mu sync.Mutex
taskCh chan *Package
returnCh chan<- *Package
stop chan struct{}
quitCurrentOp chan struct{}
chain consensus.ChainReader
engine consensus.Engine
started int32 // started indicates whether the agent is currently started
}
func NewCpuAgent(chain consensus.ChainReader, engine consensus.Engine) *CpuAgent {
agent := &CpuAgent{
chain: chain,
engine: engine,
stop: make(chan struct{}, 1),
taskCh: make(chan *Package, 1),
}
return agent
}
func (self *CpuAgent) AssignTask(p *Package) {
if atomic.LoadInt32(&self.started) == 1 {
self.taskCh <- p
}
}
func (self *CpuAgent) DeliverTo(ch chan<- *Package) { self.returnCh = ch }
func (self *CpuAgent) Start() {
if !atomic.CompareAndSwapInt32(&self.started, 0, 1) {
return // agent already started
}
go self.update()
}
func (self *CpuAgent) Stop() {
if !atomic.CompareAndSwapInt32(&self.started, 1, 0) {
return // agent already stopped
}
self.stop <- struct{}{}
done:
// Empty work channel
for {
select {
case <-self.taskCh:
default:
break done
}
}
}
func (self *CpuAgent) update() {
out:
for {
select {
case p := <-self.taskCh:
self.mu.Lock()
if self.quitCurrentOp != nil {
close(self.quitCurrentOp)
}
self.quitCurrentOp = make(chan struct{})
go self.mine(p, self.quitCurrentOp)
self.mu.Unlock()
case <-self.stop:
self.mu.Lock()
if self.quitCurrentOp != nil {
close(self.quitCurrentOp)
self.quitCurrentOp = nil
}
self.mu.Unlock()
break out
}
}
}
func (self *CpuAgent) mine(p *Package, stop <-chan struct{}) {
var err error
if p.Block, err = self.engine.Seal(self.chain, p.Block, stop); p.Block != nil {
log.Info("Successfully sealed new block", "number", p.Block.Number(), "hash", p.Block.Hash())
self.returnCh <- p
} else {
if err != nil {
log.Warn("Block sealing failed", "err", err)
}
self.returnCh <- nil
}
}

@ -21,14 +21,12 @@ import (
"fmt"
"sync/atomic"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
@ -36,10 +34,8 @@ import (
// Backend wraps all methods required for mining.
type Backend interface {
AccountManager() *accounts.Manager
BlockChain() *core.BlockChain
TxPool() *core.TxPool
ChainDb() ethdb.Database
}
// Miner creates blocks and searches for proof-of-work values.
@ -49,6 +45,7 @@ type Miner struct {
coinbase common.Address
eth Backend
engine consensus.Engine
exitCh chan struct{}
canStart int32 // can start indicates whether we can start the mining operation
shouldStart int32 // should start indicates whether we should start after sync
@ -59,10 +56,10 @@ func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine con
eth: eth,
mux: mux,
engine: engine,
exitCh: make(chan struct{}),
worker: newWorker(config, engine, eth, mux),
canStart: 1,
}
miner.Register(NewCpuAgent(eth.BlockChain(), engine))
go miner.update()
return miner
@ -74,28 +71,35 @@ func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine con
// and halt your mining operation for as long as the DOS continues.
func (self *Miner) update() {
events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
out:
for ev := range events.Chan() {
switch ev.Data.(type) {
case downloader.StartEvent:
atomic.StoreInt32(&self.canStart, 0)
if self.Mining() {
self.Stop()
atomic.StoreInt32(&self.shouldStart, 1)
log.Info("Mining aborted due to sync")
}
case downloader.DoneEvent, downloader.FailedEvent:
shouldStart := atomic.LoadInt32(&self.shouldStart) == 1
defer events.Unsubscribe()
atomic.StoreInt32(&self.canStart, 1)
atomic.StoreInt32(&self.shouldStart, 0)
if shouldStart {
self.Start(self.coinbase)
for {
select {
case ev := <-events.Chan():
if ev == nil {
return
}
switch ev.Data.(type) {
case downloader.StartEvent:
atomic.StoreInt32(&self.canStart, 0)
if self.Mining() {
self.Stop()
atomic.StoreInt32(&self.shouldStart, 1)
log.Info("Mining aborted due to sync")
}
case downloader.DoneEvent, downloader.FailedEvent:
shouldStart := atomic.LoadInt32(&self.shouldStart) == 1
atomic.StoreInt32(&self.canStart, 1)
atomic.StoreInt32(&self.shouldStart, 0)
if shouldStart {
self.Start(self.coinbase)
}
// stop immediately and ignore all further pending events
return
}
// unsubscribe. we're only interested in this event once
events.Unsubscribe()
// stop immediately and ignore all further pending events
break out
case <-self.exitCh:
return
}
}
}
@ -109,7 +113,6 @@ func (self *Miner) Start(coinbase common.Address) {
return
}
self.worker.start()
self.worker.commitNewWork()
}
func (self *Miner) Stop() {
@ -117,12 +120,9 @@ func (self *Miner) Stop() {
atomic.StoreInt32(&self.shouldStart, 0)
}
func (self *Miner) Register(agent Agent) {
self.worker.register(agent)
}
func (self *Miner) Unregister(agent Agent) {
self.worker.unregister(agent)
func (self *Miner) Close() {
self.worker.close()
close(self.exitCh)
}
func (self *Miner) Mining() bool {

@ -32,16 +32,14 @@ import (
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
)
const (
resultQueueSize = 10
miningLogAtDepth = 5
// resultQueueSize is the size of channel listening to sealing result.
resultQueueSize = 10
// txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
txChanSize = 4096
@ -49,17 +47,10 @@ const (
chainHeadChanSize = 10
// chainSideChanSize is the size of channel listening to ChainSideEvent.
chainSideChanSize = 10
miningLogAtDepth = 5
)
// Agent can register themselves with the worker
type Agent interface {
AssignTask(*Package)
DeliverTo(chan<- *Package)
Start()
Stop()
}
// Env is the workers current environment and holds all of the current state information.
// Env is the worker's current environment and holds all of the current state information.
type Env struct {
config *params.ChainConfig
signer types.Signer
@ -74,25 +65,124 @@ type Env struct {
header *types.Header
txs []*types.Transaction
receipts []*types.Receipt
}
createdAt time.Time
func (env *Env) commitTransaction(tx *types.Transaction, bc *core.BlockChain, coinbase common.Address, gp *core.GasPool) (error, []*types.Log) {
snap := env.state.Snapshot()
receipt, _, err := core.ApplyTransaction(env.config, bc, &coinbase, gp, env.state, env.header, tx, &env.header.GasUsed, vm.Config{})
if err != nil {
env.state.RevertToSnapshot(snap)
return err, nil
}
env.txs = append(env.txs, tx)
env.receipts = append(env.receipts, receipt)
return nil, receipt.Logs
}
// Package contains all information for consensus engine sealing and result submitting.
type Package struct {
Receipts []*types.Receipt
State *state.StateDB
Block *types.Block
func (env *Env) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) {
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(env.header.GasLimit)
}
var coalescedLogs []*types.Log
for {
// If we don't have enough gas for any further transactions then we're done
if env.gasPool.Gas() < params.TxGas {
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
break
}
// Retrieve the next transaction and abort if all done
tx := txs.Peek()
if tx == nil {
break
}
// Error may be ignored here. The error has already been checked
// during transaction acceptance is the transaction pool.
//
// We use the eip155 signer regardless of the current hf.
from, _ := types.Sender(env.signer, tx)
// Check whether the tx is replay protected. If we're not in the EIP155 hf
// phase, start ignoring the sender until we do.
if tx.Protected() && !env.config.IsEIP155(env.header.Number) {
log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", env.config.EIP155Block)
txs.Pop()
continue
}
// Start executing the transaction
env.state.Prepare(tx.Hash(), common.Hash{}, env.tcount)
err, logs := env.commitTransaction(tx, bc, coinbase, env.gasPool)
switch err {
case core.ErrGasLimitReached:
// Pop the current out-of-gas transaction without shifting in the next from the account
log.Trace("Gas limit exceeded for current block", "sender", from)
txs.Pop()
case core.ErrNonceTooLow:
// New head notification data race between the transaction pool and miner, shift
log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
txs.Shift()
case core.ErrNonceTooHigh:
// Reorg notification data race between the transaction pool and miner, skip account =
log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
txs.Pop()
case nil:
// Everything ok, collect the logs and shift in the next transaction from the same account
coalescedLogs = append(coalescedLogs, logs...)
env.tcount++
txs.Shift()
default:
// Strange error, discard the transaction and get the next in line (note, the
// nonce-too-high clause will prevent us from executing in vain).
log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
txs.Shift()
}
}
if len(coalescedLogs) > 0 || env.tcount > 0 {
// make a copy, the state caches the logs and these logs get "upgraded" from pending to mined
// logs by filling in the block hash when the block was mined by the local miner. This can
// cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed.
cpy := make([]*types.Log, len(coalescedLogs))
for i, l := range coalescedLogs {
cpy[i] = new(types.Log)
*cpy[i] = *l
}
go func(logs []*types.Log, tcount int) {
if len(logs) > 0 {
mux.Post(core.PendingLogsEvent{Logs: logs})
}
if tcount > 0 {
mux.Post(core.PendingStateEvent{})
}
}(cpy, env.tcount)
}
}
// worker is the main object which takes care of applying messages to the new state
// task contains all information for consensus engine sealing and result submitting.
type task struct {
receipts []*types.Receipt
state *state.StateDB
block *types.Block
createdAt time.Time
}
// worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
type worker struct {
config *params.ChainConfig
engine consensus.Engine
eth Backend
chain *core.BlockChain
mu sync.Mutex
// update loop
// Subscriptions
mux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
@ -101,31 +191,30 @@ type worker struct {
chainSideCh chan core.ChainSideEvent
chainSideSub event.Subscription
agents map[Agent]struct{}
recv chan *Package
// Channels
newWork chan struct{}
taskCh chan *task
resultCh chan *task
exitCh chan struct{}
eth Backend
chain *core.BlockChain
proc core.Validator
chainDb ethdb.Database
current *Env // An environment for current running cycle.
possibleUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks.
unconfirmed *unconfirmedBlocks // A set of locally mined blocks pending canonicalness confirmations.
mu sync.RWMutex // The lock used to protect the coinbase and extra fields
coinbase common.Address
extra []byte
currentMu sync.Mutex
current *Env
snapshotMu sync.RWMutex
snapshotMu sync.RWMutex // The lock used to protect the block snapshot and state snapshot
snapshotBlock *types.Block
snapshotState *state.StateDB
uncleMu sync.Mutex
possibleUncles map[common.Hash]*types.Block
unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations
// atomic status counters
running int32 // The indicator whether the consensus engine is running or not.
// Test hooks
newTaskHook func(*task) // Method to call upon receiving a new sealing task
fullTaskInterval func() // Method to call before pushing the full sealing task
}
func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux) *worker {
@ -134,220 +223,274 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend,
engine: engine,
eth: eth,
mux: mux,
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
chainDb: eth.ChainDb(),
recv: make(chan *Package, resultQueueSize),
chain: eth.BlockChain(),
proc: eth.BlockChain().Validator(),
possibleUncles: make(map[common.Hash]*types.Block),
agents: make(map[Agent]struct{}),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
newWork: make(chan struct{}, 1),
taskCh: make(chan *task),
resultCh: make(chan *task, resultQueueSize),
exitCh: make(chan struct{}),
}
// Subscribe NewTxsEvent for tx pool
worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
// Subscribe events for blockchain
worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
go worker.update()
go worker.wait()
worker.commitNewWork()
go worker.mainLoop()
go worker.resultLoop()
go worker.taskLoop()
// Submit first work to initialize pending state.
worker.newWork <- struct{}{}
return worker
}
func (self *worker) setEtherbase(addr common.Address) {
self.mu.Lock()
defer self.mu.Unlock()
self.coinbase = addr
// setEtherbase sets the etherbase used to initialize the block coinbase field.
func (w *worker) setEtherbase(addr common.Address) {
w.mu.Lock()
defer w.mu.Unlock()
w.coinbase = addr
}
func (self *worker) setExtra(extra []byte) {
self.mu.Lock()
defer self.mu.Unlock()
self.extra = extra
// setExtra sets the content used to initialize the block extra field.
func (w *worker) setExtra(extra []byte) {
w.mu.Lock()
defer w.mu.Unlock()
w.extra = extra
}
func (self *worker) pending() (*types.Block, *state.StateDB) {
// pending returns the pending state and corresponding block.
func (w *worker) pending() (*types.Block, *state.StateDB) {
// return a snapshot to avoid contention on currentMu mutex
self.snapshotMu.RLock()
defer self.snapshotMu.RUnlock()
return self.snapshotBlock, self.snapshotState.Copy()
w.snapshotMu.RLock()
defer w.snapshotMu.RUnlock()
if w.snapshotState == nil {
return nil, nil
}
return w.snapshotBlock, w.snapshotState.Copy()
}
func (self *worker) pendingBlock() *types.Block {
// pendingBlock returns pending block.
func (w *worker) pendingBlock() *types.Block {
// return a snapshot to avoid contention on currentMu mutex
self.snapshotMu.RLock()
defer self.snapshotMu.RUnlock()
return self.snapshotBlock
w.snapshotMu.RLock()
defer w.snapshotMu.RUnlock()
return w.snapshotBlock
}
func (self *worker) start() {
self.mu.Lock()
defer self.mu.Unlock()
atomic.StoreInt32(&self.running, 1)
for agent := range self.agents {
agent.Start()
}
// start sets the running status as 1 and triggers new work submitting.
func (w *worker) start() {
atomic.StoreInt32(&w.running, 1)
w.newWork <- struct{}{}
}
func (self *worker) stop() {
self.mu.Lock()
defer self.mu.Unlock()
atomic.StoreInt32(&self.running, 0)
for agent := range self.agents {
agent.Stop()
}
// stop sets the running status as 0.
func (w *worker) stop() {
atomic.StoreInt32(&w.running, 0)
}
func (self *worker) isRunning() bool {
return atomic.LoadInt32(&self.running) == 1
// isRunning returns an indicator whether worker is running or not.
func (w *worker) isRunning() bool {
return atomic.LoadInt32(&w.running) == 1
}
func (self *worker) register(agent Agent) {
self.mu.Lock()
defer self.mu.Unlock()
self.agents[agent] = struct{}{}
agent.DeliverTo(self.recv)
if self.isRunning() {
agent.Start()
// close terminates all background threads maintained by the worker and cleans up buffered channels.
// Note the worker does not support being closed multiple times.
func (w *worker) close() {
close(w.exitCh)
// Clean up buffered channels
for empty := false; !empty; {
select {
case <-w.resultCh:
default:
empty = true
}
}
}
func (self *worker) unregister(agent Agent) {
self.mu.Lock()
defer self.mu.Unlock()
delete(self.agents, agent)
agent.Stop()
}
func (self *worker) update() {
defer self.txsSub.Unsubscribe()
defer self.chainHeadSub.Unsubscribe()
defer self.chainSideSub.Unsubscribe()
// mainLoop is a standalone goroutine to regenerate the sealing task based on the received event.
func (w *worker) mainLoop() {
defer w.txsSub.Unsubscribe()
defer w.chainHeadSub.Unsubscribe()
defer w.chainSideSub.Unsubscribe()
for {
// A real event arrived, process interesting content
select {
// Handle ChainHeadEvent
case <-self.chainHeadCh:
self.commitNewWork()
// Handle ChainSideEvent
case ev := <-self.chainSideCh:
self.uncleMu.Lock()
self.possibleUncles[ev.Block.Hash()] = ev.Block
self.uncleMu.Unlock()
// Handle NewTxsEvent
case ev := <-self.txsCh:
case <-w.newWork:
// Submit a work when the worker is created or started.
w.commitNewWork()
case <-w.chainHeadCh:
// Resubmit a work for new cycle once worker receives chain head event.
w.commitNewWork()
case ev := <-w.chainSideCh:
// Add side block to possible uncle block set.
w.possibleUncles[ev.Block.Hash()] = ev.Block
case ev := <-w.txsCh:
// Apply transactions to the pending state if we're not mining.
//
// Note all transactions received may not be continuous with transactions
// already included in the current mining block. These transactions will
// be automatically eliminated.
if !self.isRunning() {
self.currentMu.Lock()
if !w.isRunning() && w.current != nil {
w.mu.Lock()
coinbase := w.coinbase
w.mu.Unlock()
txs := make(map[common.Address]types.Transactions)
for _, tx := range ev.Txs {
acc, _ := types.Sender(self.current.signer, tx)
acc, _ := types.Sender(w.current.signer, tx)
txs[acc] = append(txs[acc], tx)
}
txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs)
self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase)
self.updateSnapshot()
self.currentMu.Unlock()
txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs)
w.current.commitTransactions(w.mux, txset, w.chain, coinbase)
w.updateSnapshot()
} else {
// If we're mining, but nothing is being processed, wake on new transactions
if self.config.Clique != nil && self.config.Clique.Period == 0 {
self.commitNewWork()
if w.config.Clique != nil && w.config.Clique.Period == 0 {
w.commitNewWork()
}
}
// System stopped
case <-self.txsSub.Err():
case <-w.exitCh:
return
case <-self.chainHeadSub.Err():
case <-w.txsSub.Err():
return
case <-self.chainSideSub.Err():
case <-w.chainHeadSub.Err():
return
case <-w.chainSideSub.Err():
return
}
}
}
func (self *worker) wait() {
// seal pushes a sealing task to consensus engine and submits the result.
func (w *worker) seal(t *task, stop <-chan struct{}) {
var (
err error
res *task
)
if t.block, err = w.engine.Seal(w.chain, t.block, stop); t.block != nil {
log.Info("Successfully sealed new block", "number", t.block.Number(), "hash", t.block.Hash(),
"elapsed", common.PrettyDuration(time.Since(t.createdAt)))
res = t
} else {
if err != nil {
log.Warn("Block sealing failed", "err", err)
}
res = nil
}
select {
case w.resultCh <- res:
case <-w.exitCh:
}
}
// taskLoop is a standalone goroutine to fetch sealing task from the generator and
// push them to consensus engine.
func (w *worker) taskLoop() {
var stopCh chan struct{}
// interrupt aborts the in-flight sealing task.
interrupt := func() {
if stopCh != nil {
close(stopCh)
stopCh = nil
}
}
for {
for result := range self.recv {
select {
case task := <-w.taskCh:
if w.newTaskHook != nil {
w.newTaskHook(task)
}
interrupt()
stopCh = make(chan struct{})
go w.seal(task, stopCh)
case <-w.exitCh:
interrupt()
return
}
}
}
// resultLoop is a standalone goroutine to handle sealing result submitting
// and flush relative data to the database.
func (w *worker) resultLoop() {
for {
select {
case result := <-w.resultCh:
if result == nil {
continue
}
block := result.Block
block := result.block
// Update the block hash in all logs since it is now available and not when the
// receipt/log of individual transactions were created.
for _, r := range result.Receipts {
for _, r := range result.receipts {
for _, l := range r.Logs {
l.BlockHash = block.Hash()
}
}
for _, log := range result.State.Logs() {
for _, log := range result.state.Logs() {
log.BlockHash = block.Hash()
}
self.currentMu.Lock()
stat, err := self.chain.WriteBlockWithState(block, result.Receipts, result.State)
self.currentMu.Unlock()
// Commit block and state to database.
stat, err := w.chain.WriteBlockWithState(block, result.receipts, result.state)
if err != nil {
log.Error("Failed writing block to chain", "err", err)
continue
}
// Broadcast the block and announce chain insertion event
self.mux.Post(core.NewMinedBlockEvent{Block: block})
w.mux.Post(core.NewMinedBlockEvent{Block: block})
var (
events []interface{}
logs = result.State.Logs()
logs = result.state.Logs()
)
events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
if stat == core.CanonStatTy {
switch stat {
case core.CanonStatTy:
events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
events = append(events, core.ChainHeadEvent{Block: block})
case core.SideStatTy:
events = append(events, core.ChainSideEvent{Block: block})
}
self.chain.PostChainEvents(events, logs)
w.chain.PostChainEvents(events, logs)
// Insert the block into the set of pending ones to wait for confirmations
self.unconfirmed.Insert(block.NumberU64(), block.Hash())
}
}
}
// Insert the block into the set of pending ones to resultLoop for confirmations
w.unconfirmed.Insert(block.NumberU64(), block.Hash())
// push sends a new work task to currently live miner agents.
func (self *worker) push(p *Package) {
for agent := range self.agents {
agent.AssignTask(p)
case <-w.exitCh:
return
}
}
}
// makeCurrent creates a new environment for the current cycle.
func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error {
state, err := self.chain.StateAt(parent.Root())
func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
state, err := w.chain.StateAt(parent.Root())
if err != nil {
return err
}
env := &Env{
config: self.config,
signer: types.NewEIP155Signer(self.config.ChainID),
config: w.config,
signer: types.NewEIP155Signer(w.config.ChainID),
state: state,
ancestors: mapset.NewSet(),
family: mapset.NewSet(),
uncles: mapset.NewSet(),
header: header,
createdAt: time.Now(),
}
// when 08 is processed ancestors contain 07 (quick block)
for _, ancestor := range self.chain.GetBlocksFromHash(parent.Hash(), 7) {
for _, ancestor := range w.chain.GetBlocksFromHash(parent.Hash(), 7) {
for _, uncle := range ancestor.Uncles() {
env.family.Add(uncle.Hash())
}
@ -357,20 +500,63 @@ func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error
// Keep track of transactions which return errors so they can be removed
env.tcount = 0
self.current = env
w.current = env
return nil
}
func (self *worker) commitNewWork() {
self.mu.Lock()
defer self.mu.Unlock()
self.uncleMu.Lock()
defer self.uncleMu.Unlock()
self.currentMu.Lock()
defer self.currentMu.Unlock()
// commitUncle adds the given block to uncle block set, returns error if failed to add.
func (w *worker) commitUncle(env *Env, uncle *types.Header) error {
hash := uncle.Hash()
if env.uncles.Contains(hash) {
return fmt.Errorf("uncle not unique")
}
if !env.ancestors.Contains(uncle.ParentHash) {
return fmt.Errorf("uncle's parent unknown (%x)", uncle.ParentHash[0:4])
}
if env.family.Contains(hash) {
return fmt.Errorf("uncle already in family (%x)", hash)
}
env.uncles.Add(uncle.Hash())
return nil
}
// updateSnapshot updates pending snapshot block and state.
// Note this function assumes the current variable is thread safe.
func (w *worker) updateSnapshot() {
w.snapshotMu.Lock()
defer w.snapshotMu.Unlock()
var uncles []*types.Header
w.current.uncles.Each(func(item interface{}) bool {
hash, ok := item.(common.Hash)
if !ok {
return false
}
uncle, exist := w.possibleUncles[hash]
if !exist {
return false
}
uncles = append(uncles, uncle.Header())
return true
})
w.snapshotBlock = types.NewBlock(
w.current.header,
w.current.txs,
uncles,
w.current.receipts,
)
w.snapshotState = w.current.state.Copy()
}
// commitNewWork generates several new sealing tasks based on the parent block.
func (w *worker) commitNewWork() {
w.mu.RLock()
defer w.mu.RUnlock()
tstart := time.Now()
parent := self.chain.CurrentBlock()
parent := w.chain.CurrentBlock()
tstamp := tstart.Unix()
if parent.Time().Cmp(new(big.Int).SetInt64(tstamp)) >= 0 {
@ -388,28 +574,28 @@ func (self *worker) commitNewWork() {
ParentHash: parent.Hash(),
Number: num.Add(num, common.Big1),
GasLimit: core.CalcGasLimit(parent),
Extra: self.extra,
Extra: w.extra,
Time: big.NewInt(tstamp),
}
// Only set the coinbase if our consensus engine is running (avoid spurious block rewards)
if self.isRunning() {
if self.coinbase == (common.Address{}) {
if w.isRunning() {
if w.coinbase == (common.Address{}) {
log.Error("Refusing to mine without etherbase")
return
}
header.Coinbase = self.coinbase
header.Coinbase = w.coinbase
}
if err := self.engine.Prepare(self.chain, header); err != nil {
if err := w.engine.Prepare(w.chain, header); err != nil {
log.Error("Failed to prepare header for mining", "err", err)
return
}
// If we are care about TheDAO hard-fork check whether to override the extra-data or not
if daoBlock := self.config.DAOForkBlock; daoBlock != nil {
if daoBlock := w.config.DAOForkBlock; daoBlock != nil {
// Check whether the block is among the fork extra-override range
limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange)
if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 {
// Depending whether we support or oppose the fork, override differently
if self.config.DAOForkSupport {
if w.config.DAOForkSupport {
header.Extra = common.CopyBytes(params.DAOForkBlockExtra)
} else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) {
header.Extra = []byte{} // If miner opposes, don't let it use the reserved extra-data
@ -417,14 +603,14 @@ func (self *worker) commitNewWork() {
}
}
// Could potentially happen if starting to mine in an odd state.
err := self.makeCurrent(parent, header)
err := w.makeCurrent(parent, header)
if err != nil {
log.Error("Failed to create mining context", "err", err)
return
}
// Create the current work task and check any fork transitions needed
env := self.current
if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 {
env := w.current
if w.config.DAOForkSupport && w.config.DAOForkBlock != nil && w.config.DAOForkBlock.Cmp(header.Number) == 0 {
misc.ApplyDAOHardFork(env.state)
}
@ -433,11 +619,11 @@ func (self *worker) commitNewWork() {
uncles []*types.Header
badUncles []common.Hash
)
for hash, uncle := range self.possibleUncles {
for hash, uncle := range w.possibleUncles {
if len(uncles) == 2 {
break
}
if err := self.commitUncle(env, uncle.Header()); err != nil {
if err := w.commitUncle(env, uncle.Header()); err != nil {
log.Trace("Bad uncle found and will be removed", "hash", hash)
log.Trace(fmt.Sprint(uncle))
@ -448,184 +634,73 @@ func (self *worker) commitNewWork() {
}
}
for _, hash := range badUncles {
delete(self.possibleUncles, hash)
delete(w.possibleUncles, hash)
}
var (
emptyBlock *types.Block
fullBlock *types.Block
emptyBlock, fullBlock *types.Block
emptyState, fullState *state.StateDB
)
// Create an empty block based on temporary copied state for sealing in advance without waiting block
// execution finished.
emptyState := env.state.Copy()
if emptyBlock, err = self.engine.Finalize(self.chain, header, emptyState, nil, uncles, nil); err != nil {
emptyState = env.state.Copy()
if emptyBlock, err = w.engine.Finalize(w.chain, header, emptyState, nil, uncles, nil); err != nil {
log.Error("Failed to finalize block for temporary sealing", "err", err)
} else {
// Push empty work in advance without applying pending transaction.
// The reason is transactions execution can cost a lot and sealer need to
// take advantage of this part time.
if self.isRunning() {
log.Info("Commit new empty mining work", "number", emptyBlock.Number(), "uncles", len(uncles))
self.push(&Package{nil, emptyState, emptyBlock})
if w.isRunning() {
select {
case w.taskCh <- &task{receipts: nil, state: emptyState, block: emptyBlock, createdAt: time.Now()}:
log.Info("Commit new empty mining work", "number", emptyBlock.Number(), "uncles", len(uncles))
case <-w.exitCh:
log.Info("Worker has exited")
return
}
}
}
// Fill the block with all available pending transactions.
pending, err := self.eth.TxPool().Pending()
pending, err := w.eth.TxPool().Pending()
if err != nil {
log.Error("Failed to fetch pending transactions", "err", err)
return
}
txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending)
env.commitTransactions(self.mux, txs, self.chain, self.coinbase)
// Short circuit if there is no available pending transactions
if len(pending) == 0 {
w.updateSnapshot()
return
}
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pending)
env.commitTransactions(w.mux, txs, w.chain, w.coinbase)
// Create the full block to seal with the consensus engine
if fullBlock, err = self.engine.Finalize(self.chain, header, env.state, env.txs, uncles, env.receipts); err != nil {
fullState = env.state.Copy()
if fullBlock, err = w.engine.Finalize(w.chain, header, fullState, env.txs, uncles, env.receipts); err != nil {
log.Error("Failed to finalize block for sealing", "err", err)
return
}
// We only care about logging if we're actually mining.
if self.isRunning() {
log.Info("Commit new full mining work", "number", fullBlock.Number(), "txs", env.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart)))
self.unconfirmed.Shift(fullBlock.NumberU64() - 1)
self.push(&Package{env.receipts, env.state, fullBlock})
}
self.updateSnapshot()
}
func (self *worker) commitUncle(env *Env, uncle *types.Header) error {
hash := uncle.Hash()
if env.uncles.Contains(hash) {
return fmt.Errorf("uncle not unique")
}
if !env.ancestors.Contains(uncle.ParentHash) {
return fmt.Errorf("uncle's parent unknown (%x)", uncle.ParentHash[0:4])
}
if env.family.Contains(hash) {
return fmt.Errorf("uncle already in family (%x)", hash)
}
env.uncles.Add(uncle.Hash())
return nil
}
func (self *worker) updateSnapshot() {
self.snapshotMu.Lock()
defer self.snapshotMu.Unlock()
var uncles []*types.Header
self.current.uncles.Each(func(item interface{}) bool {
if header, ok := item.(*types.Header); ok {
uncles = append(uncles, header)
return true
}
return false
})
self.snapshotBlock = types.NewBlock(
self.current.header,
self.current.txs,
uncles,
self.current.receipts,
)
self.snapshotState = self.current.state.Copy()
}
func (env *Env) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) {
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(env.header.GasLimit)
// Deep copy receipts here to avoid interaction between different tasks.
cpy := make([]*types.Receipt, len(env.receipts))
for i, l := range env.receipts {
cpy[i] = new(types.Receipt)
*cpy[i] = *l
}
var coalescedLogs []*types.Log
for {
// If we don't have enough gas for any further transactions then we're done
if env.gasPool.Gas() < params.TxGas {
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
break
}
// Retrieve the next transaction and abort if all done
tx := txs.Peek()
if tx == nil {
break
}
// Error may be ignored here. The error has already been checked
// during transaction acceptance is the transaction pool.
//
// We use the eip155 signer regardless of the current hf.
from, _ := types.Sender(env.signer, tx)
// Check whether the tx is replay protected. If we're not in the EIP155 hf
// phase, start ignoring the sender until we do.
if tx.Protected() && !env.config.IsEIP155(env.header.Number) {
log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", env.config.EIP155Block)
txs.Pop()
continue
}
// Start executing the transaction
env.state.Prepare(tx.Hash(), common.Hash{}, env.tcount)
err, logs := env.commitTransaction(tx, bc, coinbase, env.gasPool)
switch err {
case core.ErrGasLimitReached:
// Pop the current out-of-gas transaction without shifting in the next from the account
log.Trace("Gas limit exceeded for current block", "sender", from)
txs.Pop()
case core.ErrNonceTooLow:
// New head notification data race between the transaction pool and miner, shift
log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
txs.Shift()
case core.ErrNonceTooHigh:
// Reorg notification data race between the transaction pool and miner, skip account =
log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
txs.Pop()
case nil:
// Everything ok, collect the logs and shift in the next transaction from the same account
coalescedLogs = append(coalescedLogs, logs...)
env.tcount++
txs.Shift()
default:
// Strange error, discard the transaction and get the next in line (note, the
// nonce-too-high clause will prevent us from executing in vain).
log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
txs.Shift()
// We only care about logging if we're actually mining.
if w.isRunning() {
if w.fullTaskInterval != nil {
w.fullTaskInterval()
}
}
if len(coalescedLogs) > 0 || env.tcount > 0 {
// make a copy, the state caches the logs and these logs get "upgraded" from pending to mined
// logs by filling in the block hash when the block was mined by the local miner. This can
// cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed.
cpy := make([]*types.Log, len(coalescedLogs))
for i, l := range coalescedLogs {
cpy[i] = new(types.Log)
*cpy[i] = *l
select {
case w.taskCh <- &task{receipts: cpy, state: fullState, block: fullBlock, createdAt: time.Now()}:
w.unconfirmed.Shift(fullBlock.NumberU64() - 1)
log.Info("Commit new full mining work", "number", fullBlock.Number(), "txs", env.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart)))
case <-w.exitCh:
log.Info("Worker has exited")
}
go func(logs []*types.Log, tcount int) {
if len(logs) > 0 {
mux.Post(core.PendingLogsEvent{Logs: logs})
}
if tcount > 0 {
mux.Post(core.PendingStateEvent{})
}
}(cpy, env.tcount)
}
}
func (env *Env) commitTransaction(tx *types.Transaction, bc *core.BlockChain, coinbase common.Address, gp *core.GasPool) (error, []*types.Log) {
snap := env.state.Snapshot()
receipt, _, err := core.ApplyTransaction(env.config, bc, &coinbase, gp, env.state, env.header, tx, &env.header.GasUsed, vm.Config{})
if err != nil {
env.state.RevertToSnapshot(snap)
return err, nil
}
env.txs = append(env.txs, tx)
env.receipts = append(env.receipts, receipt)
return nil, receipt.Logs
w.updateSnapshot()
}

@ -0,0 +1,212 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package miner
import (
"math/big"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/clique"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
)
var (
// Test chain configurations
testTxPoolConfig core.TxPoolConfig
ethashChainConfig *params.ChainConfig
cliqueChainConfig *params.ChainConfig
// Test accounts
testBankKey, _ = crypto.GenerateKey()
testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey)
testBankFunds = big.NewInt(1000000000000000000)
acc1Key, _ = crypto.GenerateKey()
acc1Addr = crypto.PubkeyToAddress(acc1Key.PublicKey)
// Test transactions
pendingTxs []*types.Transaction
newTxs []*types.Transaction
)
func init() {
testTxPoolConfig = core.DefaultTxPoolConfig
testTxPoolConfig.Journal = ""
ethashChainConfig = params.TestChainConfig
cliqueChainConfig = params.TestChainConfig
cliqueChainConfig.Clique = &params.CliqueConfig{
Period: 1,
Epoch: 30000,
}
tx1, _ := types.SignTx(types.NewTransaction(0, acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), types.HomesteadSigner{}, testBankKey)
pendingTxs = append(pendingTxs, tx1)
tx2, _ := types.SignTx(types.NewTransaction(1, acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), types.HomesteadSigner{}, testBankKey)
newTxs = append(newTxs, tx2)
}
// testWorkerBackend implements worker.Backend interfaces and wraps all information needed during the testing.
type testWorkerBackend struct {
db ethdb.Database
txPool *core.TxPool
chain *core.BlockChain
testTxFeed event.Feed
}
func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) *testWorkerBackend {
var (
db = ethdb.NewMemDatabase()
gspec = core.Genesis{
Config: chainConfig,
Alloc: core.GenesisAlloc{testBankAddress: {Balance: testBankFunds}},
}
)
switch engine.(type) {
case *clique.Clique:
gspec.ExtraData = make([]byte, 32+common.AddressLength+65)
copy(gspec.ExtraData[32:], testBankAddress[:])
case *ethash.Ethash:
default:
t.Fatal("unexpect consensus engine type")
}
gspec.MustCommit(db)
chain, _ := core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{})
txpool := core.NewTxPool(testTxPoolConfig, chainConfig, chain)
return &testWorkerBackend{
db: db,
chain: chain,
txPool: txpool,
}
}
func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain }
func (b *testWorkerBackend) TxPool() *core.TxPool { return b.txPool }
func (b *testWorkerBackend) PostChainEvents(events []interface{}) {
b.chain.PostChainEvents(events, nil)
}
func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) (*worker, *testWorkerBackend) {
backend := newTestWorkerBackend(t, chainConfig, engine)
backend.txPool.AddLocals(pendingTxs)
w := newWorker(chainConfig, engine, backend, new(event.TypeMux))
w.setEtherbase(testBankAddress)
return w, backend
}
func TestPendingStateAndBlockEthash(t *testing.T) {
testPendingStateAndBlock(t, ethashChainConfig, ethash.NewFaker())
}
func TestPendingStateAndBlockClique(t *testing.T) {
testPendingStateAndBlock(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, ethdb.NewMemDatabase()))
}
func testPendingStateAndBlock(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) {
defer engine.Close()
w, b := newTestWorker(t, chainConfig, engine)
defer w.close()
// Ensure snapshot has been updated.
time.Sleep(100 * time.Millisecond)
block, state := w.pending()
if block.NumberU64() != 1 {
t.Errorf("block number mismatch, has %d, want %d", block.NumberU64(), 1)
}
if balance := state.GetBalance(acc1Addr); balance.Cmp(big.NewInt(1000)) != 0 {
t.Errorf("account balance mismatch, has %d, want %d", balance, 1000)
}
b.txPool.AddLocals(newTxs)
// Ensure the new tx events has been processed
time.Sleep(100 * time.Millisecond)
block, state = w.pending()
if balance := state.GetBalance(acc1Addr); balance.Cmp(big.NewInt(2000)) != 0 {
t.Errorf("account balance mismatch, has %d, want %d", balance, 2000)
}
}
func TestEmptyWorkEthash(t *testing.T) {
testEmptyWork(t, ethashChainConfig, ethash.NewFaker())
}
func TestEmptyWorkClique(t *testing.T) {
testEmptyWork(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, ethdb.NewMemDatabase()))
}
func testEmptyWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) {
defer engine.Close()
w, _ := newTestWorker(t, chainConfig, engine)
defer w.close()
var (
taskCh = make(chan struct{}, 2)
taskIndex int
)
checkEqual := func(t *testing.T, task *task, index int) {
receiptLen, balance := 0, big.NewInt(0)
if index == 1 {
receiptLen, balance = 1, big.NewInt(1000)
}
if len(task.receipts) != receiptLen {
t.Errorf("receipt number mismatch has %d, want %d", len(task.receipts), receiptLen)
}
if task.state.GetBalance(acc1Addr).Cmp(balance) != 0 {
t.Errorf("account balance mismatch has %d, want %d", task.state.GetBalance(acc1Addr), balance)
}
}
w.newTaskHook = func(task *task) {
if task.block.NumberU64() == 1 {
checkEqual(t, task, taskIndex)
taskIndex += 1
taskCh <- struct{}{}
}
}
w.fullTaskInterval = func() {
time.Sleep(100 * time.Millisecond)
}
// Ensure worker has finished initialization
for {
b := w.pendingBlock()
if b != nil && b.NumberU64() == 1 {
break
}
}
w.start()
for i := 0; i < 2; i += 1 {
to := time.NewTimer(time.Second)
select {
case <-taskCh:
case <-to.C:
t.Error("new task timeout")
}
}
}
Loading…
Cancel
Save