From b2c644ffb5c283a171ddf3889693673939917541 Mon Sep 17 00:00:00 2001 From: gary rong Date: Wed, 22 Aug 2018 03:56:54 +0800 Subject: [PATCH] cmd, eth, miner: make recommit configurable (#17444) * cmd, eth, miner: make recommit configurable * cmd, eth, les, miner: polish a bit * miner: filter duplicate sealing work * cmd: remove uncessary conversion * miner: avoid microptimization in favor of cleaner code --- cmd/geth/main.go | 1 + cmd/geth/usage.go | 1 + cmd/utils/flags.go | 22 ++-- eth/api.go | 6 ++ eth/backend.go | 8 +- eth/config.go | 15 +-- eth/gen_config.go | 53 ++++++++-- internal/web3ext/web3ext.go | 5 + les/backend.go | 2 +- miner/miner.go | 10 +- miner/worker.go | 202 +++++++++++++++++++++++++++++------- miner/worker_test.go | 106 ++++++++++++++++++- 12 files changed, 360 insertions(+), 71 deletions(-) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index a063860513..2e87bb8200 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -106,6 +106,7 @@ var ( utils.MinerLegacyEtherbaseFlag, utils.MinerExtraDataFlag, utils.MinerLegacyExtraDataFlag, + utils.MinerRecommitIntervalFlag, utils.NATFlag, utils.NoDiscoverFlag, utils.DiscoveryV5Flag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 9e18f70472..674c5d9015 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -190,6 +190,7 @@ var AppHelpFlagGroups = []flagGroup{ utils.MinerGasTargetFlag, utils.MinerEtherbaseFlag, utils.MinerExtraDataFlag, + utils.MinerRecommitIntervalFlag, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index e3a8cc2eac..7317655836 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -335,12 +335,12 @@ var ( MinerGasPriceFlag = BigFlag{ Name: "miner.gasprice", Usage: "Minimal gas price for mining a transactions", - Value: eth.DefaultConfig.GasPrice, + Value: eth.DefaultConfig.MinerGasPrice, } MinerLegacyGasPriceFlag = BigFlag{ Name: "gasprice", Usage: "Minimal gas price for mining a transactions (deprecated, use --miner.gasprice)", - Value: eth.DefaultConfig.GasPrice, + Value: eth.DefaultConfig.MinerGasPrice, } MinerEtherbaseFlag = cli.StringFlag{ Name: "miner.etherbase", @@ -360,6 +360,11 @@ var ( Name: "extradata", Usage: "Block extra data set by the miner (default = client version, deprecated, use --miner.extradata)", } + MinerRecommitIntervalFlag = cli.DurationFlag{ + Name: "miner.recommit", + Usage: "Time interval to recreate the block being mined.", + Value: 3 * time.Second, + } // Account settings UnlockedAccountFlag = cli.StringFlag{ Name: "unlock", @@ -1124,16 +1129,19 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) { cfg.DocRoot = ctx.GlobalString(DocRootFlag.Name) } if ctx.GlobalIsSet(MinerLegacyExtraDataFlag.Name) { - cfg.ExtraData = []byte(ctx.GlobalString(MinerLegacyExtraDataFlag.Name)) + cfg.MinerExtraData = []byte(ctx.GlobalString(MinerLegacyExtraDataFlag.Name)) } if ctx.GlobalIsSet(MinerExtraDataFlag.Name) { - cfg.ExtraData = []byte(ctx.GlobalString(MinerExtraDataFlag.Name)) + cfg.MinerExtraData = []byte(ctx.GlobalString(MinerExtraDataFlag.Name)) } if ctx.GlobalIsSet(MinerLegacyGasPriceFlag.Name) { - cfg.GasPrice = GlobalBig(ctx, MinerLegacyGasPriceFlag.Name) + cfg.MinerGasPrice = GlobalBig(ctx, MinerLegacyGasPriceFlag.Name) } if ctx.GlobalIsSet(MinerGasPriceFlag.Name) { - cfg.GasPrice = GlobalBig(ctx, MinerGasPriceFlag.Name) + cfg.MinerGasPrice = GlobalBig(ctx, MinerGasPriceFlag.Name) + } + if ctx.GlobalIsSet(MinerRecommitIntervalFlag.Name) { + cfg.MinerRecommit = ctx.Duration(MinerRecommitIntervalFlag.Name) } if ctx.GlobalIsSet(VMEnableDebugFlag.Name) { // TODO(fjl): force-enable this in --dev mode @@ -1176,7 +1184,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) { cfg.Genesis = core.DeveloperGenesisBlock(uint64(ctx.GlobalInt(DeveloperPeriodFlag.Name)), developer.Address) if !ctx.GlobalIsSet(MinerGasPriceFlag.Name) && !ctx.GlobalIsSet(MinerLegacyGasPriceFlag.Name) { - cfg.GasPrice = big.NewInt(1) + cfg.MinerGasPrice = big.NewInt(1) } } // TODO(fjl): move trie cache generations into config diff --git a/eth/api.go b/eth/api.go index c1fbcb6d40..4b0ba8edbd 100644 --- a/eth/api.go +++ b/eth/api.go @@ -25,6 +25,7 @@ import ( "math/big" "os" "strings" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -160,6 +161,11 @@ func (api *PrivateMinerAPI) SetEtherbase(etherbase common.Address) bool { return true } +// SetRecommitInterval updates the interval for miner sealing work recommitting. +func (api *PrivateMinerAPI) SetRecommitInterval(interval int) { + api.e.Miner().SetRecommitInterval(time.Duration(interval) * time.Millisecond) +} + // GetHashrate returns the current hashrate of the miner. func (api *PrivateMinerAPI) GetHashrate() uint64 { return api.e.miner.HashRate() diff --git a/eth/backend.go b/eth/backend.go index 588b782568..648175acfd 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -127,7 +127,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.MinerNotify, chainDb), shutdownChan: make(chan bool), networkID: config.NetworkId, - gasPrice: config.GasPrice, + gasPrice: config.MinerGasPrice, etherbase: config.Etherbase, bloomRequests: make(chan chan *bloombits.Retrieval), bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, bloomConfirms), @@ -167,13 +167,13 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { return nil, err } - eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) - eth.miner.SetExtra(makeExtraData(config.ExtraData)) + eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine, config.MinerRecommit) + eth.miner.SetExtra(makeExtraData(config.MinerExtraData)) eth.APIBackend = &EthAPIBackend{eth, nil} gpoParams := config.GPO if gpoParams.Default == nil { - gpoParams.Default = config.GasPrice + gpoParams.Default = config.MinerGasPrice } eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams) diff --git a/eth/config.go b/eth/config.go index 0c82f29232..cbd02416be 100644 --- a/eth/config.go +++ b/eth/config.go @@ -48,7 +48,7 @@ var DefaultConfig = Config{ DatabaseCache: 768, TrieCache: 256, TrieTimeout: 60 * time.Minute, - GasPrice: big.NewInt(18 * params.Shannon), + MinerGasPrice: big.NewInt(18 * params.Shannon), TxPool: core.DefaultTxPoolConfig, GPO: gasprice.Config{ @@ -95,11 +95,12 @@ type Config struct { TrieTimeout time.Duration // Mining-related options - Etherbase common.Address `toml:",omitempty"` - MinerThreads int `toml:",omitempty"` - MinerNotify []string `toml:",omitempty"` - ExtraData []byte `toml:",omitempty"` - GasPrice *big.Int + Etherbase common.Address `toml:",omitempty"` + MinerThreads int `toml:",omitempty"` + MinerNotify []string `toml:",omitempty"` + MinerExtraData []byte `toml:",omitempty"` + MinerGasPrice *big.Int + MinerRecommit time.Duration // Ethash options Ethash ethash.Config @@ -118,5 +119,5 @@ type Config struct { } type configMarshaling struct { - ExtraData hexutil.Bytes + MinerExtraData hexutil.Bytes } diff --git a/eth/gen_config.go b/eth/gen_config.go index 4f2e82d941..62556be7e1 100644 --- a/eth/gen_config.go +++ b/eth/gen_config.go @@ -4,6 +4,7 @@ package eth import ( "math/big" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -15,20 +16,26 @@ import ( var _ = (*configMarshaling)(nil) +// MarshalTOML marshals as TOML. func (c Config) MarshalTOML() (interface{}, error) { type Config struct { Genesis *core.Genesis `toml:",omitempty"` NetworkId uint64 SyncMode downloader.SyncMode + NoPruning bool LightServ int `toml:",omitempty"` LightPeers int `toml:",omitempty"` SkipBcVersionCheck bool `toml:"-"` DatabaseHandles int `toml:"-"` DatabaseCache int + TrieCache int + TrieTimeout time.Duration Etherbase common.Address `toml:",omitempty"` MinerThreads int `toml:",omitempty"` - ExtraData hexutil.Bytes `toml:",omitempty"` - GasPrice *big.Int + MinerNotify []string `toml:",omitempty"` + MinerExtraData hexutil.Bytes `toml:",omitempty"` + MinerGasPrice *big.Int + MinerRecommit time.Duration Ethash ethash.Config TxPool core.TxPoolConfig GPO gasprice.Config @@ -39,15 +46,20 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.Genesis = c.Genesis enc.NetworkId = c.NetworkId enc.SyncMode = c.SyncMode + enc.NoPruning = c.NoPruning enc.LightServ = c.LightServ enc.LightPeers = c.LightPeers enc.SkipBcVersionCheck = c.SkipBcVersionCheck enc.DatabaseHandles = c.DatabaseHandles enc.DatabaseCache = c.DatabaseCache + enc.TrieCache = c.TrieCache + enc.TrieTimeout = c.TrieTimeout enc.Etherbase = c.Etherbase enc.MinerThreads = c.MinerThreads - enc.ExtraData = c.ExtraData - enc.GasPrice = c.GasPrice + enc.MinerNotify = c.MinerNotify + enc.MinerExtraData = c.MinerExtraData + enc.MinerGasPrice = c.MinerGasPrice + enc.MinerRecommit = c.MinerRecommit enc.Ethash = c.Ethash enc.TxPool = c.TxPool enc.GPO = c.GPO @@ -56,20 +68,26 @@ func (c Config) MarshalTOML() (interface{}, error) { return &enc, nil } +// UnmarshalTOML unmarshals from TOML. func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { type Config struct { Genesis *core.Genesis `toml:",omitempty"` NetworkId *uint64 SyncMode *downloader.SyncMode + NoPruning *bool LightServ *int `toml:",omitempty"` LightPeers *int `toml:",omitempty"` SkipBcVersionCheck *bool `toml:"-"` DatabaseHandles *int `toml:"-"` DatabaseCache *int + TrieCache *int + TrieTimeout *time.Duration Etherbase *common.Address `toml:",omitempty"` MinerThreads *int `toml:",omitempty"` - ExtraData *hexutil.Bytes `toml:",omitempty"` - GasPrice *big.Int + MinerNotify []string `toml:",omitempty"` + MinerExtraData *hexutil.Bytes `toml:",omitempty"` + MinerGasPrice *big.Int + MinerRecommit *time.Duration Ethash *ethash.Config TxPool *core.TxPoolConfig GPO *gasprice.Config @@ -89,6 +107,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.SyncMode != nil { c.SyncMode = *dec.SyncMode } + if dec.NoPruning != nil { + c.NoPruning = *dec.NoPruning + } if dec.LightServ != nil { c.LightServ = *dec.LightServ } @@ -104,17 +125,29 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.DatabaseCache != nil { c.DatabaseCache = *dec.DatabaseCache } + if dec.TrieCache != nil { + c.TrieCache = *dec.TrieCache + } + if dec.TrieTimeout != nil { + c.TrieTimeout = *dec.TrieTimeout + } if dec.Etherbase != nil { c.Etherbase = *dec.Etherbase } if dec.MinerThreads != nil { c.MinerThreads = *dec.MinerThreads } - if dec.ExtraData != nil { - c.ExtraData = *dec.ExtraData + if dec.MinerNotify != nil { + c.MinerNotify = dec.MinerNotify + } + if dec.MinerExtraData != nil { + c.MinerExtraData = *dec.MinerExtraData + } + if dec.MinerGasPrice != nil { + c.MinerGasPrice = dec.MinerGasPrice } - if dec.GasPrice != nil { - c.GasPrice = dec.GasPrice + if dec.MinerRecommit != nil { + c.MinerRecommit = *dec.MinerRecommit } if dec.Ethash != nil { c.Ethash = *dec.Ethash diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index 000e3728de..f4eb47a12a 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -519,6 +519,11 @@ web3._extend({ params: 1, inputFormatter: [web3._extend.utils.fromDecimal] }), + new web3._extend.Method({ + name: 'setRecommitInterval', + call: 'miner_setRecommitInterval', + params: 1, + }), new web3._extend.Method({ name: 'getHashrate', call: 'miner_getHashrate' diff --git a/les/backend.go b/les/backend.go index d26c1470fe..00025ba634 100644 --- a/les/backend.go +++ b/les/backend.go @@ -141,7 +141,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { leth.ApiBackend = &LesApiBackend{leth, nil} gpoParams := config.GPO if gpoParams.Default == nil { - gpoParams.Default = config.GasPrice + gpoParams.Default = config.MinerGasPrice } leth.ApiBackend.gpo = gasprice.NewOracle(leth.ApiBackend, gpoParams) return leth, nil diff --git a/miner/miner.go b/miner/miner.go index e350e456e9..c5a0c9d62a 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -20,6 +20,7 @@ package miner import ( "fmt" "sync/atomic" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" @@ -51,13 +52,13 @@ type Miner struct { shouldStart int32 // should start indicates whether we should start after sync } -func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine) *Miner { +func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, recommit time.Duration) *Miner { miner := &Miner{ eth: eth, mux: mux, engine: engine, exitCh: make(chan struct{}), - worker: newWorker(config, engine, eth, mux), + worker: newWorker(config, engine, eth, mux, recommit), canStart: 1, } go miner.update() @@ -144,6 +145,11 @@ func (self *Miner) SetExtra(extra []byte) error { return nil } +// SetRecommitInterval sets the interval for sealing work resubmitting. +func (self *Miner) SetRecommitInterval(interval time.Duration) { + self.worker.setRecommitInterval(interval) +} + // Pending returns the currently pending block and associated state. func (self *Miner) Pending() (*types.Block, *state.StateDB) { return self.worker.pending() diff --git a/miner/worker.go b/miner/worker.go index 23cfaf2256..c299ff9dc1 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -51,12 +51,27 @@ const ( // chainSideChanSize is the size of channel listening to ChainSideEvent. chainSideChanSize = 10 + // resubmitAdjustChanSize is the size of resubmitting interval adjustment channel. + resubmitAdjustChanSize = 10 + // miningLogAtDepth is the number of confirmations before logging successful mining. miningLogAtDepth = 5 - // blockRecommitInterval is the time interval to recreate the mining block with + // minRecommitInterval is the minimal time interval to recreate the mining block with + // any newly arrived transactions. + minRecommitInterval = 1 * time.Second + + // maxRecommitInterval is the maximum time interval to recreate the mining block with // any newly arrived transactions. - blockRecommitInterval = 3 * time.Second + maxRecommitInterval = 15 * time.Second + + // intervalAdjustRatio is the impact a single interval adjustment has on sealing work + // resubmitting interval. + intervalAdjustRatio = 0.1 + + // intervalAdjustBias is applied during the new resubmit interval calculation in favor of + // increasing upper limit or decreasing lower limit so that the limit can be reachable. + intervalAdjustBias = 200 * 1000.0 * 1000.0 ) // environment is the worker's current environment and holds all of the current state information. @@ -89,11 +104,18 @@ const ( commitInterruptResubmit ) +// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier. type newWorkReq struct { interrupt *int32 noempty bool } +// intervalAdjust represents a resubmitting interval adjustment. +type intervalAdjust struct { + ratio float64 + inc bool +} + // worker is the main object which takes care of submitting new work to consensus engine // and gathering the sealing result. type worker struct { @@ -112,11 +134,13 @@ type worker struct { chainSideSub event.Subscription // Channels - newWorkCh chan *newWorkReq - taskCh chan *task - resultCh chan *task - startCh chan struct{} - exitCh chan struct{} + newWorkCh chan *newWorkReq + taskCh chan *task + resultCh chan *task + startCh chan struct{} + exitCh chan struct{} + resubmitIntervalCh chan time.Duration + resubmitAdjustCh chan *intervalAdjust current *environment // An environment for current running cycle. possibleUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks. @@ -132,30 +156,34 @@ type worker struct { // atomic status counters running int32 // The indicator whether the consensus engine is running or not. + newTxs int32 // New arrival transaction count since last sealing work submitting. // Test hooks - newTaskHook func(*task) // Method to call upon receiving a new sealing task - skipSealHook func(*task) bool // Method to decide whether skipping the sealing. - fullTaskHook func() // Method to call before pushing the full sealing task + newTaskHook func(*task) // Method to call upon receiving a new sealing task. + skipSealHook func(*task) bool // Method to decide whether skipping the sealing. + fullTaskHook func() // Method to call before pushing the full sealing task. + resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval. } -func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux) *worker { +func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, recommit time.Duration) *worker { worker := &worker{ - config: config, - engine: engine, - eth: eth, - mux: mux, - chain: eth.BlockChain(), - possibleUncles: make(map[common.Hash]*types.Block), - unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), - txsCh: make(chan core.NewTxsEvent, txChanSize), - chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), - chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), - newWorkCh: make(chan *newWorkReq), - taskCh: make(chan *task), - resultCh: make(chan *task, resultQueueSize), - exitCh: make(chan struct{}), - startCh: make(chan struct{}, 1), + config: config, + engine: engine, + eth: eth, + mux: mux, + chain: eth.BlockChain(), + possibleUncles: make(map[common.Hash]*types.Block), + unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), + txsCh: make(chan core.NewTxsEvent, txChanSize), + chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), + chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), + newWorkCh: make(chan *newWorkReq), + taskCh: make(chan *task), + resultCh: make(chan *task, resultQueueSize), + exitCh: make(chan struct{}), + startCh: make(chan struct{}, 1), + resubmitIntervalCh: make(chan time.Duration), + resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), } // Subscribe NewTxsEvent for tx pool worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) @@ -163,8 +191,14 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) + // Sanitize recommit interval if the user-specified one is too short. + if recommit < minRecommitInterval { + log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval) + recommit = minRecommitInterval + } + go worker.mainLoop() - go worker.newWorkLoop() + go worker.newWorkLoop(recommit) go worker.resultLoop() go worker.taskLoop() @@ -188,6 +222,11 @@ func (w *worker) setExtra(extra []byte) { w.extra = extra } +// setRecommitInterval updates the interval for miner sealing work recommitting. +func (w *worker) setRecommitInterval(interval time.Duration) { + w.resubmitIntervalCh <- interval +} + // pending returns the pending state and corresponding block. func (w *worker) pending() (*types.Block, *state.StateDB) { // return a snapshot to avoid contention on currentMu mutex @@ -238,35 +277,94 @@ func (w *worker) close() { } // newWorkLoop is a standalone goroutine to submit new mining work upon received events. -func (w *worker) newWorkLoop() { - var interrupt *int32 +func (w *worker) newWorkLoop(recommit time.Duration) { + var ( + interrupt *int32 + minRecommit = recommit // minimal resubmit interval specified by user. + ) timer := time.NewTimer(0) <-timer.C // discard the initial tick - // recommit aborts in-flight transaction execution with given signal and resubmits a new one. - recommit := func(noempty bool, s int32) { + // commit aborts in-flight transaction execution with given signal and resubmits a new one. + commit := func(noempty bool, s int32) { if interrupt != nil { atomic.StoreInt32(interrupt, s) } interrupt = new(int32) w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty} - timer.Reset(blockRecommitInterval) + timer.Reset(recommit) + atomic.StoreInt32(&w.newTxs, 0) + } + // recalcRecommit recalculates the resubmitting interval upon feedback. + recalcRecommit := func(target float64, inc bool) { + var ( + prev = float64(recommit.Nanoseconds()) + next float64 + ) + if inc { + next = prev*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias) + // Recap if interval is larger than the maximum time interval + if next > float64(maxRecommitInterval.Nanoseconds()) { + next = float64(maxRecommitInterval.Nanoseconds()) + } + } else { + next = prev*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias) + // Recap if interval is less than the user specified minimum + if next < float64(minRecommit.Nanoseconds()) { + next = float64(minRecommit.Nanoseconds()) + } + } + recommit = time.Duration(int64(next)) } for { select { case <-w.startCh: - recommit(false, commitInterruptNewHead) + commit(false, commitInterruptNewHead) case <-w.chainHeadCh: - recommit(false, commitInterruptNewHead) + commit(false, commitInterruptNewHead) case <-timer.C: // If mining is running resubmit a new work cycle periodically to pull in // higher priced transactions. Disable this overhead for pending blocks. if w.isRunning() && (w.config.Clique == nil || w.config.Clique.Period > 0) { - recommit(true, commitInterruptResubmit) + // Short circuit if no new transaction arrives. + if atomic.LoadInt32(&w.newTxs) == 0 { + timer.Reset(recommit) + continue + } + commit(true, commitInterruptResubmit) + } + + case interval := <-w.resubmitIntervalCh: + // Adjust resubmit interval explicitly by user. + if interval < minRecommitInterval { + log.Warn("Sanitizing miner recommit interval", "provided", interval, "updated", minRecommitInterval) + interval = minRecommitInterval + } + log.Info("Miner recommit interval update", "from", minRecommit, "to", interval) + minRecommit, recommit = interval, interval + + if w.resubmitHook != nil { + w.resubmitHook(minRecommit, recommit) + } + + case adjust := <-w.resubmitAdjustCh: + // Adjust resubmit interval by feedback. + if adjust.inc { + before := recommit + recalcRecommit(float64(recommit.Nanoseconds())/adjust.ratio, true) + log.Trace("Increase miner recommit interval", "from", before, "to", recommit) + } else { + before := recommit + recalcRecommit(float64(minRecommit.Nanoseconds()), false) + log.Trace("Decrease miner recommit interval", "from", before, "to", recommit) + } + + if w.resubmitHook != nil { + w.resubmitHook(minRecommit, recommit) } case <-w.exitCh: @@ -339,6 +437,7 @@ func (w *worker) mainLoop() { w.commitNewWork(nil, false) } } + atomic.AddInt32(&w.newTxs, int32(len(ev.Txs))) // System stopped case <-w.exitCh: @@ -383,7 +482,10 @@ func (w *worker) seal(t *task, stop <-chan struct{}) { // taskLoop is a standalone goroutine to fetch sealing task from the generator and // push them to consensus engine. func (w *worker) taskLoop() { - var stopCh chan struct{} + var ( + stopCh chan struct{} + prev common.Hash + ) // interrupt aborts the in-flight sealing task. interrupt := func() { @@ -398,8 +500,13 @@ func (w *worker) taskLoop() { if w.newTaskHook != nil { w.newTaskHook(task) } + // Reject duplicate sealing work due to resubmitting. + if task.block.HashNoNonce() == prev { + continue + } interrupt() stopCh = make(chan struct{}) + prev = task.block.HashNoNonce() go w.seal(task, stopCh) case <-w.exitCh: interrupt() @@ -414,11 +521,15 @@ func (w *worker) resultLoop() { for { select { case result := <-w.resultCh: + // Short circuit when receiving empty result. if result == nil { continue } + // Short circuit when receiving duplicate result caused by resubmitting. block := result.block - + if w.chain.HasBlock(block.Hash(), block.NumberU64()) { + continue + } // Update the block hash in all logs since it is now available and not when the // receipt/log of individual transactions were created. for _, r := range result.receipts { @@ -568,8 +679,18 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin // (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2. // For the first two cases, the semi-finished work will be discarded. // For the third case, the semi-finished work will be submitted to the consensus engine. - // TODO(rjl493456442) give feedback to newWorkLoop to adjust resubmit interval if it is too short. if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { + // Notify resubmit loop to increase resubmitting interval due to too frequent commits. + if atomic.LoadInt32(interrupt) == commitInterruptResubmit { + ratio := float64(w.current.header.GasLimit-w.current.gasPool.Gas()) / float64(w.current.header.GasLimit) + if ratio < 0.1 { + ratio = 0.1 + } + w.resubmitAdjustCh <- &intervalAdjust{ + ratio: ratio, + inc: true, + } + } return atomic.LoadInt32(interrupt) == commitInterruptNewHead } // If we don't have enough gas for any further transactions then we're done @@ -644,6 +765,11 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin } go w.mux.Post(core.PendingLogsEvent{Logs: cpy}) } + // Notify resubmit loop to decrease resubmitting interval if current interval is larger + // than the user-specified one. + if interrupt != nil { + w.resubmitAdjustCh <- &intervalAdjust{inc: false} + } return false } diff --git a/miner/worker_test.go b/miner/worker_test.go index 34bb7f5f33..16708c18c6 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -119,7 +119,7 @@ func (b *testWorkerBackend) PostChainEvents(events []interface{}) { func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) (*worker, *testWorkerBackend) { backend := newTestWorkerBackend(t, chainConfig, engine) backend.txPool.AddLocals(pendingTxs) - w := newWorker(chainConfig, engine, backend, new(event.TypeMux)) + w := newWorker(chainConfig, engine, backend, new(event.TypeMux), time.Second) w.setEtherbase(testBankAddress) return w, backend } @@ -327,7 +327,7 @@ func testRegenerateMiningBlock(t *testing.T, chainConfig *params.ChainConfig, en } } b.txPool.AddLocals(newTxs) - time.Sleep(3 * time.Second) + time.Sleep(time.Second) select { case <-taskCh: @@ -335,3 +335,105 @@ func testRegenerateMiningBlock(t *testing.T, chainConfig *params.ChainConfig, en t.Error("new task timeout") } } + +func TestAdjustIntervalEthash(t *testing.T) { + testAdjustInterval(t, ethashChainConfig, ethash.NewFaker()) +} + +func TestAdjustIntervalClique(t *testing.T) { + testAdjustInterval(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, ethdb.NewMemDatabase())) +} + +func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { + defer engine.Close() + + w, _ := newTestWorker(t, chainConfig, engine) + defer w.close() + + w.skipSealHook = func(task *task) bool { + return true + } + w.fullTaskHook = func() { + time.Sleep(100 * time.Millisecond) + } + var ( + progress = make(chan struct{}, 10) + result = make([]float64, 0, 10) + index = 0 + start = false + ) + w.resubmitHook = func(minInterval time.Duration, recommitInterval time.Duration) { + // Short circuit if interval checking hasn't started. + if !start { + return + } + var wantMinInterval, wantRecommitInterval time.Duration + + switch index { + case 0: + wantMinInterval, wantRecommitInterval = 3*time.Second, 3*time.Second + case 1: + origin := float64(3 * time.Second.Nanoseconds()) + estimate := origin*(1-intervalAdjustRatio) + intervalAdjustRatio*(origin/0.8+intervalAdjustBias) + wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(int(estimate))*time.Nanosecond + case 2: + estimate := result[index-1] + min := float64(3 * time.Second.Nanoseconds()) + estimate = estimate*(1-intervalAdjustRatio) + intervalAdjustRatio*(min-intervalAdjustBias) + wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(int(estimate))*time.Nanosecond + case 3: + wantMinInterval, wantRecommitInterval = time.Second, time.Second + } + + // Check interval + if minInterval != wantMinInterval { + t.Errorf("resubmit min interval mismatch want %s has %s", wantMinInterval, minInterval) + } + if recommitInterval != wantRecommitInterval { + t.Errorf("resubmit interval mismatch want %s has %s", wantRecommitInterval, recommitInterval) + } + result = append(result, float64(recommitInterval.Nanoseconds())) + index += 1 + progress <- struct{}{} + } + // Ensure worker has finished initialization + for { + b := w.pendingBlock() + if b != nil && b.NumberU64() == 1 { + break + } + } + + w.start() + + time.Sleep(time.Second) + + start = true + w.setRecommitInterval(3 * time.Second) + select { + case <-progress: + case <-time.NewTimer(time.Second).C: + t.Error("interval reset timeout") + } + + w.resubmitAdjustCh <- &intervalAdjust{inc: true, ratio: 0.8} + select { + case <-progress: + case <-time.NewTimer(time.Second).C: + t.Error("interval reset timeout") + } + + w.resubmitAdjustCh <- &intervalAdjust{inc: false} + select { + case <-progress: + case <-time.NewTimer(time.Second).C: + t.Error("interval reset timeout") + } + + w.setRecommitInterval(500 * time.Millisecond) + select { + case <-progress: + case <-time.NewTimer(time.Second).C: + t.Error("interval reset timeout") + } +}