cmd, eth, miner: make recommit configurable (#17444)

* cmd, eth, miner: make recommit configurable

* cmd, eth, les, miner: polish a bit

* miner: filter duplicate sealing work

* cmd: remove uncessary conversion

* miner: avoid microptimization in favor of cleaner code
pull/17475/head
gary rong 6 years ago committed by Péter Szilágyi
parent 522cfc68ff
commit b2c644ffb5
  1. 1
      cmd/geth/main.go
  2. 1
      cmd/geth/usage.go
  3. 22
      cmd/utils/flags.go
  4. 6
      eth/api.go
  5. 8
      eth/backend.go
  6. 15
      eth/config.go
  7. 53
      eth/gen_config.go
  8. 5
      internal/web3ext/web3ext.go
  9. 2
      les/backend.go
  10. 10
      miner/miner.go
  11. 202
      miner/worker.go
  12. 106
      miner/worker_test.go

@ -106,6 +106,7 @@ var (
utils.MinerLegacyEtherbaseFlag, utils.MinerLegacyEtherbaseFlag,
utils.MinerExtraDataFlag, utils.MinerExtraDataFlag,
utils.MinerLegacyExtraDataFlag, utils.MinerLegacyExtraDataFlag,
utils.MinerRecommitIntervalFlag,
utils.NATFlag, utils.NATFlag,
utils.NoDiscoverFlag, utils.NoDiscoverFlag,
utils.DiscoveryV5Flag, utils.DiscoveryV5Flag,

@ -190,6 +190,7 @@ var AppHelpFlagGroups = []flagGroup{
utils.MinerGasTargetFlag, utils.MinerGasTargetFlag,
utils.MinerEtherbaseFlag, utils.MinerEtherbaseFlag,
utils.MinerExtraDataFlag, utils.MinerExtraDataFlag,
utils.MinerRecommitIntervalFlag,
}, },
}, },
{ {

@ -335,12 +335,12 @@ var (
MinerGasPriceFlag = BigFlag{ MinerGasPriceFlag = BigFlag{
Name: "miner.gasprice", Name: "miner.gasprice",
Usage: "Minimal gas price for mining a transactions", Usage: "Minimal gas price for mining a transactions",
Value: eth.DefaultConfig.GasPrice, Value: eth.DefaultConfig.MinerGasPrice,
} }
MinerLegacyGasPriceFlag = BigFlag{ MinerLegacyGasPriceFlag = BigFlag{
Name: "gasprice", Name: "gasprice",
Usage: "Minimal gas price for mining a transactions (deprecated, use --miner.gasprice)", Usage: "Minimal gas price for mining a transactions (deprecated, use --miner.gasprice)",
Value: eth.DefaultConfig.GasPrice, Value: eth.DefaultConfig.MinerGasPrice,
} }
MinerEtherbaseFlag = cli.StringFlag{ MinerEtherbaseFlag = cli.StringFlag{
Name: "miner.etherbase", Name: "miner.etherbase",
@ -360,6 +360,11 @@ var (
Name: "extradata", Name: "extradata",
Usage: "Block extra data set by the miner (default = client version, deprecated, use --miner.extradata)", Usage: "Block extra data set by the miner (default = client version, deprecated, use --miner.extradata)",
} }
MinerRecommitIntervalFlag = cli.DurationFlag{
Name: "miner.recommit",
Usage: "Time interval to recreate the block being mined.",
Value: 3 * time.Second,
}
// Account settings // Account settings
UnlockedAccountFlag = cli.StringFlag{ UnlockedAccountFlag = cli.StringFlag{
Name: "unlock", Name: "unlock",
@ -1124,16 +1129,19 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
cfg.DocRoot = ctx.GlobalString(DocRootFlag.Name) cfg.DocRoot = ctx.GlobalString(DocRootFlag.Name)
} }
if ctx.GlobalIsSet(MinerLegacyExtraDataFlag.Name) { if ctx.GlobalIsSet(MinerLegacyExtraDataFlag.Name) {
cfg.ExtraData = []byte(ctx.GlobalString(MinerLegacyExtraDataFlag.Name)) cfg.MinerExtraData = []byte(ctx.GlobalString(MinerLegacyExtraDataFlag.Name))
} }
if ctx.GlobalIsSet(MinerExtraDataFlag.Name) { if ctx.GlobalIsSet(MinerExtraDataFlag.Name) {
cfg.ExtraData = []byte(ctx.GlobalString(MinerExtraDataFlag.Name)) cfg.MinerExtraData = []byte(ctx.GlobalString(MinerExtraDataFlag.Name))
} }
if ctx.GlobalIsSet(MinerLegacyGasPriceFlag.Name) { if ctx.GlobalIsSet(MinerLegacyGasPriceFlag.Name) {
cfg.GasPrice = GlobalBig(ctx, MinerLegacyGasPriceFlag.Name) cfg.MinerGasPrice = GlobalBig(ctx, MinerLegacyGasPriceFlag.Name)
} }
if ctx.GlobalIsSet(MinerGasPriceFlag.Name) { if ctx.GlobalIsSet(MinerGasPriceFlag.Name) {
cfg.GasPrice = GlobalBig(ctx, MinerGasPriceFlag.Name) cfg.MinerGasPrice = GlobalBig(ctx, MinerGasPriceFlag.Name)
}
if ctx.GlobalIsSet(MinerRecommitIntervalFlag.Name) {
cfg.MinerRecommit = ctx.Duration(MinerRecommitIntervalFlag.Name)
} }
if ctx.GlobalIsSet(VMEnableDebugFlag.Name) { if ctx.GlobalIsSet(VMEnableDebugFlag.Name) {
// TODO(fjl): force-enable this in --dev mode // TODO(fjl): force-enable this in --dev mode
@ -1176,7 +1184,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
cfg.Genesis = core.DeveloperGenesisBlock(uint64(ctx.GlobalInt(DeveloperPeriodFlag.Name)), developer.Address) cfg.Genesis = core.DeveloperGenesisBlock(uint64(ctx.GlobalInt(DeveloperPeriodFlag.Name)), developer.Address)
if !ctx.GlobalIsSet(MinerGasPriceFlag.Name) && !ctx.GlobalIsSet(MinerLegacyGasPriceFlag.Name) { if !ctx.GlobalIsSet(MinerGasPriceFlag.Name) && !ctx.GlobalIsSet(MinerLegacyGasPriceFlag.Name) {
cfg.GasPrice = big.NewInt(1) cfg.MinerGasPrice = big.NewInt(1)
} }
} }
// TODO(fjl): move trie cache generations into config // TODO(fjl): move trie cache generations into config

@ -25,6 +25,7 @@ import (
"math/big" "math/big"
"os" "os"
"strings" "strings"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
@ -160,6 +161,11 @@ func (api *PrivateMinerAPI) SetEtherbase(etherbase common.Address) bool {
return true return true
} }
// SetRecommitInterval updates the interval for miner sealing work recommitting.
func (api *PrivateMinerAPI) SetRecommitInterval(interval int) {
api.e.Miner().SetRecommitInterval(time.Duration(interval) * time.Millisecond)
}
// GetHashrate returns the current hashrate of the miner. // GetHashrate returns the current hashrate of the miner.
func (api *PrivateMinerAPI) GetHashrate() uint64 { func (api *PrivateMinerAPI) GetHashrate() uint64 {
return api.e.miner.HashRate() return api.e.miner.HashRate()

@ -127,7 +127,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.MinerNotify, chainDb), engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.MinerNotify, chainDb),
shutdownChan: make(chan bool), shutdownChan: make(chan bool),
networkID: config.NetworkId, networkID: config.NetworkId,
gasPrice: config.GasPrice, gasPrice: config.MinerGasPrice,
etherbase: config.Etherbase, etherbase: config.Etherbase,
bloomRequests: make(chan chan *bloombits.Retrieval), bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, bloomConfirms), bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, bloomConfirms),
@ -167,13 +167,13 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
return nil, err return nil, err
} }
eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine, config.MinerRecommit)
eth.miner.SetExtra(makeExtraData(config.ExtraData)) eth.miner.SetExtra(makeExtraData(config.MinerExtraData))
eth.APIBackend = &EthAPIBackend{eth, nil} eth.APIBackend = &EthAPIBackend{eth, nil}
gpoParams := config.GPO gpoParams := config.GPO
if gpoParams.Default == nil { if gpoParams.Default == nil {
gpoParams.Default = config.GasPrice gpoParams.Default = config.MinerGasPrice
} }
eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams) eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)

@ -48,7 +48,7 @@ var DefaultConfig = Config{
DatabaseCache: 768, DatabaseCache: 768,
TrieCache: 256, TrieCache: 256,
TrieTimeout: 60 * time.Minute, TrieTimeout: 60 * time.Minute,
GasPrice: big.NewInt(18 * params.Shannon), MinerGasPrice: big.NewInt(18 * params.Shannon),
TxPool: core.DefaultTxPoolConfig, TxPool: core.DefaultTxPoolConfig,
GPO: gasprice.Config{ GPO: gasprice.Config{
@ -95,11 +95,12 @@ type Config struct {
TrieTimeout time.Duration TrieTimeout time.Duration
// Mining-related options // Mining-related options
Etherbase common.Address `toml:",omitempty"` Etherbase common.Address `toml:",omitempty"`
MinerThreads int `toml:",omitempty"` MinerThreads int `toml:",omitempty"`
MinerNotify []string `toml:",omitempty"` MinerNotify []string `toml:",omitempty"`
ExtraData []byte `toml:",omitempty"` MinerExtraData []byte `toml:",omitempty"`
GasPrice *big.Int MinerGasPrice *big.Int
MinerRecommit time.Duration
// Ethash options // Ethash options
Ethash ethash.Config Ethash ethash.Config
@ -118,5 +119,5 @@ type Config struct {
} }
type configMarshaling struct { type configMarshaling struct {
ExtraData hexutil.Bytes MinerExtraData hexutil.Bytes
} }

@ -4,6 +4,7 @@ package eth
import ( import (
"math/big" "math/big"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
@ -15,20 +16,26 @@ import (
var _ = (*configMarshaling)(nil) var _ = (*configMarshaling)(nil)
// MarshalTOML marshals as TOML.
func (c Config) MarshalTOML() (interface{}, error) { func (c Config) MarshalTOML() (interface{}, error) {
type Config struct { type Config struct {
Genesis *core.Genesis `toml:",omitempty"` Genesis *core.Genesis `toml:",omitempty"`
NetworkId uint64 NetworkId uint64
SyncMode downloader.SyncMode SyncMode downloader.SyncMode
NoPruning bool
LightServ int `toml:",omitempty"` LightServ int `toml:",omitempty"`
LightPeers int `toml:",omitempty"` LightPeers int `toml:",omitempty"`
SkipBcVersionCheck bool `toml:"-"` SkipBcVersionCheck bool `toml:"-"`
DatabaseHandles int `toml:"-"` DatabaseHandles int `toml:"-"`
DatabaseCache int DatabaseCache int
TrieCache int
TrieTimeout time.Duration
Etherbase common.Address `toml:",omitempty"` Etherbase common.Address `toml:",omitempty"`
MinerThreads int `toml:",omitempty"` MinerThreads int `toml:",omitempty"`
ExtraData hexutil.Bytes `toml:",omitempty"` MinerNotify []string `toml:",omitempty"`
GasPrice *big.Int MinerExtraData hexutil.Bytes `toml:",omitempty"`
MinerGasPrice *big.Int
MinerRecommit time.Duration
Ethash ethash.Config Ethash ethash.Config
TxPool core.TxPoolConfig TxPool core.TxPoolConfig
GPO gasprice.Config GPO gasprice.Config
@ -39,15 +46,20 @@ func (c Config) MarshalTOML() (interface{}, error) {
enc.Genesis = c.Genesis enc.Genesis = c.Genesis
enc.NetworkId = c.NetworkId enc.NetworkId = c.NetworkId
enc.SyncMode = c.SyncMode enc.SyncMode = c.SyncMode
enc.NoPruning = c.NoPruning
enc.LightServ = c.LightServ enc.LightServ = c.LightServ
enc.LightPeers = c.LightPeers enc.LightPeers = c.LightPeers
enc.SkipBcVersionCheck = c.SkipBcVersionCheck enc.SkipBcVersionCheck = c.SkipBcVersionCheck
enc.DatabaseHandles = c.DatabaseHandles enc.DatabaseHandles = c.DatabaseHandles
enc.DatabaseCache = c.DatabaseCache enc.DatabaseCache = c.DatabaseCache
enc.TrieCache = c.TrieCache
enc.TrieTimeout = c.TrieTimeout
enc.Etherbase = c.Etherbase enc.Etherbase = c.Etherbase
enc.MinerThreads = c.MinerThreads enc.MinerThreads = c.MinerThreads
enc.ExtraData = c.ExtraData enc.MinerNotify = c.MinerNotify
enc.GasPrice = c.GasPrice enc.MinerExtraData = c.MinerExtraData
enc.MinerGasPrice = c.MinerGasPrice
enc.MinerRecommit = c.MinerRecommit
enc.Ethash = c.Ethash enc.Ethash = c.Ethash
enc.TxPool = c.TxPool enc.TxPool = c.TxPool
enc.GPO = c.GPO enc.GPO = c.GPO
@ -56,20 +68,26 @@ func (c Config) MarshalTOML() (interface{}, error) {
return &enc, nil return &enc, nil
} }
// UnmarshalTOML unmarshals from TOML.
func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
type Config struct { type Config struct {
Genesis *core.Genesis `toml:",omitempty"` Genesis *core.Genesis `toml:",omitempty"`
NetworkId *uint64 NetworkId *uint64
SyncMode *downloader.SyncMode SyncMode *downloader.SyncMode
NoPruning *bool
LightServ *int `toml:",omitempty"` LightServ *int `toml:",omitempty"`
LightPeers *int `toml:",omitempty"` LightPeers *int `toml:",omitempty"`
SkipBcVersionCheck *bool `toml:"-"` SkipBcVersionCheck *bool `toml:"-"`
DatabaseHandles *int `toml:"-"` DatabaseHandles *int `toml:"-"`
DatabaseCache *int DatabaseCache *int
TrieCache *int
TrieTimeout *time.Duration
Etherbase *common.Address `toml:",omitempty"` Etherbase *common.Address `toml:",omitempty"`
MinerThreads *int `toml:",omitempty"` MinerThreads *int `toml:",omitempty"`
ExtraData *hexutil.Bytes `toml:",omitempty"` MinerNotify []string `toml:",omitempty"`
GasPrice *big.Int MinerExtraData *hexutil.Bytes `toml:",omitempty"`
MinerGasPrice *big.Int
MinerRecommit *time.Duration
Ethash *ethash.Config Ethash *ethash.Config
TxPool *core.TxPoolConfig TxPool *core.TxPoolConfig
GPO *gasprice.Config GPO *gasprice.Config
@ -89,6 +107,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
if dec.SyncMode != nil { if dec.SyncMode != nil {
c.SyncMode = *dec.SyncMode c.SyncMode = *dec.SyncMode
} }
if dec.NoPruning != nil {
c.NoPruning = *dec.NoPruning
}
if dec.LightServ != nil { if dec.LightServ != nil {
c.LightServ = *dec.LightServ c.LightServ = *dec.LightServ
} }
@ -104,17 +125,29 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
if dec.DatabaseCache != nil { if dec.DatabaseCache != nil {
c.DatabaseCache = *dec.DatabaseCache c.DatabaseCache = *dec.DatabaseCache
} }
if dec.TrieCache != nil {
c.TrieCache = *dec.TrieCache
}
if dec.TrieTimeout != nil {
c.TrieTimeout = *dec.TrieTimeout
}
if dec.Etherbase != nil { if dec.Etherbase != nil {
c.Etherbase = *dec.Etherbase c.Etherbase = *dec.Etherbase
} }
if dec.MinerThreads != nil { if dec.MinerThreads != nil {
c.MinerThreads = *dec.MinerThreads c.MinerThreads = *dec.MinerThreads
} }
if dec.ExtraData != nil { if dec.MinerNotify != nil {
c.ExtraData = *dec.ExtraData c.MinerNotify = dec.MinerNotify
}
if dec.MinerExtraData != nil {
c.MinerExtraData = *dec.MinerExtraData
}
if dec.MinerGasPrice != nil {
c.MinerGasPrice = dec.MinerGasPrice
} }
if dec.GasPrice != nil { if dec.MinerRecommit != nil {
c.GasPrice = dec.GasPrice c.MinerRecommit = *dec.MinerRecommit
} }
if dec.Ethash != nil { if dec.Ethash != nil {
c.Ethash = *dec.Ethash c.Ethash = *dec.Ethash

@ -519,6 +519,11 @@ web3._extend({
params: 1, params: 1,
inputFormatter: [web3._extend.utils.fromDecimal] inputFormatter: [web3._extend.utils.fromDecimal]
}), }),
new web3._extend.Method({
name: 'setRecommitInterval',
call: 'miner_setRecommitInterval',
params: 1,
}),
new web3._extend.Method({ new web3._extend.Method({
name: 'getHashrate', name: 'getHashrate',
call: 'miner_getHashrate' call: 'miner_getHashrate'

@ -141,7 +141,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
leth.ApiBackend = &LesApiBackend{leth, nil} leth.ApiBackend = &LesApiBackend{leth, nil}
gpoParams := config.GPO gpoParams := config.GPO
if gpoParams.Default == nil { if gpoParams.Default == nil {
gpoParams.Default = config.GasPrice gpoParams.Default = config.MinerGasPrice
} }
leth.ApiBackend.gpo = gasprice.NewOracle(leth.ApiBackend, gpoParams) leth.ApiBackend.gpo = gasprice.NewOracle(leth.ApiBackend, gpoParams)
return leth, nil return leth, nil

@ -20,6 +20,7 @@ package miner
import ( import (
"fmt" "fmt"
"sync/atomic" "sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus"
@ -51,13 +52,13 @@ type Miner struct {
shouldStart int32 // should start indicates whether we should start after sync shouldStart int32 // should start indicates whether we should start after sync
} }
func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine) *Miner { func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, recommit time.Duration) *Miner {
miner := &Miner{ miner := &Miner{
eth: eth, eth: eth,
mux: mux, mux: mux,
engine: engine, engine: engine,
exitCh: make(chan struct{}), exitCh: make(chan struct{}),
worker: newWorker(config, engine, eth, mux), worker: newWorker(config, engine, eth, mux, recommit),
canStart: 1, canStart: 1,
} }
go miner.update() go miner.update()
@ -144,6 +145,11 @@ func (self *Miner) SetExtra(extra []byte) error {
return nil return nil
} }
// SetRecommitInterval sets the interval for sealing work resubmitting.
func (self *Miner) SetRecommitInterval(interval time.Duration) {
self.worker.setRecommitInterval(interval)
}
// Pending returns the currently pending block and associated state. // Pending returns the currently pending block and associated state.
func (self *Miner) Pending() (*types.Block, *state.StateDB) { func (self *Miner) Pending() (*types.Block, *state.StateDB) {
return self.worker.pending() return self.worker.pending()

@ -51,12 +51,27 @@ const (
// chainSideChanSize is the size of channel listening to ChainSideEvent. // chainSideChanSize is the size of channel listening to ChainSideEvent.
chainSideChanSize = 10 chainSideChanSize = 10
// resubmitAdjustChanSize is the size of resubmitting interval adjustment channel.
resubmitAdjustChanSize = 10
// miningLogAtDepth is the number of confirmations before logging successful mining. // miningLogAtDepth is the number of confirmations before logging successful mining.
miningLogAtDepth = 5 miningLogAtDepth = 5
// blockRecommitInterval is the time interval to recreate the mining block with // minRecommitInterval is the minimal time interval to recreate the mining block with
// any newly arrived transactions.
minRecommitInterval = 1 * time.Second
// maxRecommitInterval is the maximum time interval to recreate the mining block with
// any newly arrived transactions. // any newly arrived transactions.
blockRecommitInterval = 3 * time.Second maxRecommitInterval = 15 * time.Second
// intervalAdjustRatio is the impact a single interval adjustment has on sealing work
// resubmitting interval.
intervalAdjustRatio = 0.1
// intervalAdjustBias is applied during the new resubmit interval calculation in favor of
// increasing upper limit or decreasing lower limit so that the limit can be reachable.
intervalAdjustBias = 200 * 1000.0 * 1000.0
) )
// environment is the worker's current environment and holds all of the current state information. // environment is the worker's current environment and holds all of the current state information.
@ -89,11 +104,18 @@ const (
commitInterruptResubmit commitInterruptResubmit
) )
// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
type newWorkReq struct { type newWorkReq struct {
interrupt *int32 interrupt *int32
noempty bool noempty bool
} }
// intervalAdjust represents a resubmitting interval adjustment.
type intervalAdjust struct {
ratio float64
inc bool
}
// worker is the main object which takes care of submitting new work to consensus engine // worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result. // and gathering the sealing result.
type worker struct { type worker struct {
@ -112,11 +134,13 @@ type worker struct {
chainSideSub event.Subscription chainSideSub event.Subscription
// Channels // Channels
newWorkCh chan *newWorkReq newWorkCh chan *newWorkReq
taskCh chan *task taskCh chan *task
resultCh chan *task resultCh chan *task
startCh chan struct{} startCh chan struct{}
exitCh chan struct{} exitCh chan struct{}
resubmitIntervalCh chan time.Duration
resubmitAdjustCh chan *intervalAdjust
current *environment // An environment for current running cycle. current *environment // An environment for current running cycle.
possibleUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks. possibleUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks.
@ -132,30 +156,34 @@ type worker struct {
// atomic status counters // atomic status counters
running int32 // The indicator whether the consensus engine is running or not. running int32 // The indicator whether the consensus engine is running or not.
newTxs int32 // New arrival transaction count since last sealing work submitting.
// Test hooks // Test hooks
newTaskHook func(*task) // Method to call upon receiving a new sealing task newTaskHook func(*task) // Method to call upon receiving a new sealing task.
skipSealHook func(*task) bool // Method to decide whether skipping the sealing. skipSealHook func(*task) bool // Method to decide whether skipping the sealing.
fullTaskHook func() // Method to call before pushing the full sealing task fullTaskHook func() // Method to call before pushing the full sealing task.
resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval.
} }
func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux) *worker { func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, recommit time.Duration) *worker {
worker := &worker{ worker := &worker{
config: config, config: config,
engine: engine, engine: engine,
eth: eth, eth: eth,
mux: mux, mux: mux,
chain: eth.BlockChain(), chain: eth.BlockChain(),
possibleUncles: make(map[common.Hash]*types.Block), possibleUncles: make(map[common.Hash]*types.Block),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
txsCh: make(chan core.NewTxsEvent, txChanSize), txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
newWorkCh: make(chan *newWorkReq), newWorkCh: make(chan *newWorkReq),
taskCh: make(chan *task), taskCh: make(chan *task),
resultCh: make(chan *task, resultQueueSize), resultCh: make(chan *task, resultQueueSize),
exitCh: make(chan struct{}), exitCh: make(chan struct{}),
startCh: make(chan struct{}, 1), startCh: make(chan struct{}, 1),
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
} }
// Subscribe NewTxsEvent for tx pool // Subscribe NewTxsEvent for tx pool
worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
@ -163,8 +191,14 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend,
worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
// Sanitize recommit interval if the user-specified one is too short.
if recommit < minRecommitInterval {
log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
recommit = minRecommitInterval
}
go worker.mainLoop() go worker.mainLoop()
go worker.newWorkLoop() go worker.newWorkLoop(recommit)
go worker.resultLoop() go worker.resultLoop()
go worker.taskLoop() go worker.taskLoop()
@ -188,6 +222,11 @@ func (w *worker) setExtra(extra []byte) {
w.extra = extra w.extra = extra
} }
// setRecommitInterval updates the interval for miner sealing work recommitting.
func (w *worker) setRecommitInterval(interval time.Duration) {
w.resubmitIntervalCh <- interval
}
// pending returns the pending state and corresponding block. // pending returns the pending state and corresponding block.
func (w *worker) pending() (*types.Block, *state.StateDB) { func (w *worker) pending() (*types.Block, *state.StateDB) {
// return a snapshot to avoid contention on currentMu mutex // return a snapshot to avoid contention on currentMu mutex
@ -238,35 +277,94 @@ func (w *worker) close() {
} }
// newWorkLoop is a standalone goroutine to submit new mining work upon received events. // newWorkLoop is a standalone goroutine to submit new mining work upon received events.
func (w *worker) newWorkLoop() { func (w *worker) newWorkLoop(recommit time.Duration) {
var interrupt *int32 var (
interrupt *int32
minRecommit = recommit // minimal resubmit interval specified by user.
)
timer := time.NewTimer(0) timer := time.NewTimer(0)
<-timer.C // discard the initial tick <-timer.C // discard the initial tick
// recommit aborts in-flight transaction execution with given signal and resubmits a new one. // commit aborts in-flight transaction execution with given signal and resubmits a new one.
recommit := func(noempty bool, s int32) { commit := func(noempty bool, s int32) {
if interrupt != nil { if interrupt != nil {
atomic.StoreInt32(interrupt, s) atomic.StoreInt32(interrupt, s)
} }
interrupt = new(int32) interrupt = new(int32)
w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty} w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty}
timer.Reset(blockRecommitInterval) timer.Reset(recommit)
atomic.StoreInt32(&w.newTxs, 0)
}
// recalcRecommit recalculates the resubmitting interval upon feedback.
recalcRecommit := func(target float64, inc bool) {
var (
prev = float64(recommit.Nanoseconds())
next float64
)
if inc {
next = prev*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias)
// Recap if interval is larger than the maximum time interval
if next > float64(maxRecommitInterval.Nanoseconds()) {
next = float64(maxRecommitInterval.Nanoseconds())
}
} else {
next = prev*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias)
// Recap if interval is less than the user specified minimum
if next < float64(minRecommit.Nanoseconds()) {
next = float64(minRecommit.Nanoseconds())
}
}
recommit = time.Duration(int64(next))
} }
for { for {
select { select {
case <-w.startCh: case <-w.startCh:
recommit(false, commitInterruptNewHead) commit(false, commitInterruptNewHead)
case <-w.chainHeadCh: case <-w.chainHeadCh:
recommit(false, commitInterruptNewHead) commit(false, commitInterruptNewHead)
case <-timer.C: case <-timer.C:
// If mining is running resubmit a new work cycle periodically to pull in // If mining is running resubmit a new work cycle periodically to pull in
// higher priced transactions. Disable this overhead for pending blocks. // higher priced transactions. Disable this overhead for pending blocks.
if w.isRunning() && (w.config.Clique == nil || w.config.Clique.Period > 0) { if w.isRunning() && (w.config.Clique == nil || w.config.Clique.Period > 0) {
recommit(true, commitInterruptResubmit) // Short circuit if no new transaction arrives.
if atomic.LoadInt32(&w.newTxs) == 0 {
timer.Reset(recommit)
continue
}
commit(true, commitInterruptResubmit)
}
case interval := <-w.resubmitIntervalCh:
// Adjust resubmit interval explicitly by user.
if interval < minRecommitInterval {
log.Warn("Sanitizing miner recommit interval", "provided", interval, "updated", minRecommitInterval)
interval = minRecommitInterval
}
log.Info("Miner recommit interval update", "from", minRecommit, "to", interval)
minRecommit, recommit = interval, interval
if w.resubmitHook != nil {
w.resubmitHook(minRecommit, recommit)
}
case adjust := <-w.resubmitAdjustCh:
// Adjust resubmit interval by feedback.
if adjust.inc {
before := recommit
recalcRecommit(float64(recommit.Nanoseconds())/adjust.ratio, true)
log.Trace("Increase miner recommit interval", "from", before, "to", recommit)
} else {
before := recommit
recalcRecommit(float64(minRecommit.Nanoseconds()), false)
log.Trace("Decrease miner recommit interval", "from", before, "to", recommit)
}
if w.resubmitHook != nil {
w.resubmitHook(minRecommit, recommit)
} }
case <-w.exitCh: case <-w.exitCh:
@ -339,6 +437,7 @@ func (w *worker) mainLoop() {
w.commitNewWork(nil, false) w.commitNewWork(nil, false)
} }
} }
atomic.AddInt32(&w.newTxs, int32(len(ev.Txs)))
// System stopped // System stopped
case <-w.exitCh: case <-w.exitCh:
@ -383,7 +482,10 @@ func (w *worker) seal(t *task, stop <-chan struct{}) {
// taskLoop is a standalone goroutine to fetch sealing task from the generator and // taskLoop is a standalone goroutine to fetch sealing task from the generator and
// push them to consensus engine. // push them to consensus engine.
func (w *worker) taskLoop() { func (w *worker) taskLoop() {
var stopCh chan struct{} var (
stopCh chan struct{}
prev common.Hash
)
// interrupt aborts the in-flight sealing task. // interrupt aborts the in-flight sealing task.
interrupt := func() { interrupt := func() {
@ -398,8 +500,13 @@ func (w *worker) taskLoop() {
if w.newTaskHook != nil { if w.newTaskHook != nil {
w.newTaskHook(task) w.newTaskHook(task)
} }
// Reject duplicate sealing work due to resubmitting.
if task.block.HashNoNonce() == prev {
continue
}
interrupt() interrupt()
stopCh = make(chan struct{}) stopCh = make(chan struct{})
prev = task.block.HashNoNonce()
go w.seal(task, stopCh) go w.seal(task, stopCh)
case <-w.exitCh: case <-w.exitCh:
interrupt() interrupt()
@ -414,11 +521,15 @@ func (w *worker) resultLoop() {
for { for {
select { select {
case result := <-w.resultCh: case result := <-w.resultCh:
// Short circuit when receiving empty result.
if result == nil { if result == nil {
continue continue
} }
// Short circuit when receiving duplicate result caused by resubmitting.
block := result.block block := result.block
if w.chain.HasBlock(block.Hash(), block.NumberU64()) {
continue
}
// Update the block hash in all logs since it is now available and not when the // Update the block hash in all logs since it is now available and not when the
// receipt/log of individual transactions were created. // receipt/log of individual transactions were created.
for _, r := range result.receipts { for _, r := range result.receipts {
@ -568,8 +679,18 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
// (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2. // (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2.
// For the first two cases, the semi-finished work will be discarded. // For the first two cases, the semi-finished work will be discarded.
// For the third case, the semi-finished work will be submitted to the consensus engine. // For the third case, the semi-finished work will be submitted to the consensus engine.
// TODO(rjl493456442) give feedback to newWorkLoop to adjust resubmit interval if it is too short.
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
ratio := float64(w.current.header.GasLimit-w.current.gasPool.Gas()) / float64(w.current.header.GasLimit)
if ratio < 0.1 {
ratio = 0.1
}
w.resubmitAdjustCh <- &intervalAdjust{
ratio: ratio,
inc: true,
}
}
return atomic.LoadInt32(interrupt) == commitInterruptNewHead return atomic.LoadInt32(interrupt) == commitInterruptNewHead
} }
// If we don't have enough gas for any further transactions then we're done // If we don't have enough gas for any further transactions then we're done
@ -644,6 +765,11 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
} }
go w.mux.Post(core.PendingLogsEvent{Logs: cpy}) go w.mux.Post(core.PendingLogsEvent{Logs: cpy})
} }
// Notify resubmit loop to decrease resubmitting interval if current interval is larger
// than the user-specified one.
if interrupt != nil {
w.resubmitAdjustCh <- &intervalAdjust{inc: false}
}
return false return false
} }

@ -119,7 +119,7 @@ func (b *testWorkerBackend) PostChainEvents(events []interface{}) {
func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) (*worker, *testWorkerBackend) { func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) (*worker, *testWorkerBackend) {
backend := newTestWorkerBackend(t, chainConfig, engine) backend := newTestWorkerBackend(t, chainConfig, engine)
backend.txPool.AddLocals(pendingTxs) backend.txPool.AddLocals(pendingTxs)
w := newWorker(chainConfig, engine, backend, new(event.TypeMux)) w := newWorker(chainConfig, engine, backend, new(event.TypeMux), time.Second)
w.setEtherbase(testBankAddress) w.setEtherbase(testBankAddress)
return w, backend return w, backend
} }
@ -327,7 +327,7 @@ func testRegenerateMiningBlock(t *testing.T, chainConfig *params.ChainConfig, en
} }
} }
b.txPool.AddLocals(newTxs) b.txPool.AddLocals(newTxs)
time.Sleep(3 * time.Second) time.Sleep(time.Second)
select { select {
case <-taskCh: case <-taskCh:
@ -335,3 +335,105 @@ func testRegenerateMiningBlock(t *testing.T, chainConfig *params.ChainConfig, en
t.Error("new task timeout") t.Error("new task timeout")
} }
} }
func TestAdjustIntervalEthash(t *testing.T) {
testAdjustInterval(t, ethashChainConfig, ethash.NewFaker())
}
func TestAdjustIntervalClique(t *testing.T) {
testAdjustInterval(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, ethdb.NewMemDatabase()))
}
func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) {
defer engine.Close()
w, _ := newTestWorker(t, chainConfig, engine)
defer w.close()
w.skipSealHook = func(task *task) bool {
return true
}
w.fullTaskHook = func() {
time.Sleep(100 * time.Millisecond)
}
var (
progress = make(chan struct{}, 10)
result = make([]float64, 0, 10)
index = 0
start = false
)
w.resubmitHook = func(minInterval time.Duration, recommitInterval time.Duration) {
// Short circuit if interval checking hasn't started.
if !start {
return
}
var wantMinInterval, wantRecommitInterval time.Duration
switch index {
case 0:
wantMinInterval, wantRecommitInterval = 3*time.Second, 3*time.Second
case 1:
origin := float64(3 * time.Second.Nanoseconds())
estimate := origin*(1-intervalAdjustRatio) + intervalAdjustRatio*(origin/0.8+intervalAdjustBias)
wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(int(estimate))*time.Nanosecond
case 2:
estimate := result[index-1]
min := float64(3 * time.Second.Nanoseconds())
estimate = estimate*(1-intervalAdjustRatio) + intervalAdjustRatio*(min-intervalAdjustBias)
wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(int(estimate))*time.Nanosecond
case 3:
wantMinInterval, wantRecommitInterval = time.Second, time.Second
}
// Check interval
if minInterval != wantMinInterval {
t.Errorf("resubmit min interval mismatch want %s has %s", wantMinInterval, minInterval)
}
if recommitInterval != wantRecommitInterval {
t.Errorf("resubmit interval mismatch want %s has %s", wantRecommitInterval, recommitInterval)
}
result = append(result, float64(recommitInterval.Nanoseconds()))
index += 1
progress <- struct{}{}
}
// Ensure worker has finished initialization
for {
b := w.pendingBlock()
if b != nil && b.NumberU64() == 1 {
break
}
}
w.start()
time.Sleep(time.Second)
start = true
w.setRecommitInterval(3 * time.Second)
select {
case <-progress:
case <-time.NewTimer(time.Second).C:
t.Error("interval reset timeout")
}
w.resubmitAdjustCh <- &intervalAdjust{inc: true, ratio: 0.8}
select {
case <-progress:
case <-time.NewTimer(time.Second).C:
t.Error("interval reset timeout")
}
w.resubmitAdjustCh <- &intervalAdjust{inc: false}
select {
case <-progress:
case <-time.NewTimer(time.Second).C:
t.Error("interval reset timeout")
}
w.setRecommitInterval(500 * time.Millisecond)
select {
case <-progress:
case <-time.NewTimer(time.Second).C:
t.Error("interval reset timeout")
}
}

Loading…
Cancel
Save