From d8e0807da22eb922539d15b0d5d01ccdd58b1267 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Wed, 6 Mar 2024 13:45:03 +0100 Subject: [PATCH] miner: refactor the miner, make the pending block on demand (#28623) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * miner: untangle miner * miner: use common.hash instead of *types.header * cmd/geth: deprecate --mine * eth: get rid of most miner api * console: get rid of coinbase in welcome message * miner/stress: get rid of the miner stress test * eth: get rid of miner.setEtherbase * ethstats: remove miner and hashrate flags * ethstats: remove miner and hashrate flags * cmd: rename pendingBlockProducer to miner.pending.feeRecipient flag * miner: use pendingFeeRecipient instead of etherbase * miner: add mutex to protect the pending block * miner: add mutex to protect the pending block * eth: get rid of etherbase mentions * miner: no need to lock the coinbase * eth, miner: fix linter --------- Co-authored-by: Martin Holst Swende Co-authored-by: Péter Szilágyi --- cmd/geth/consolecmd_test.go | 3 - cmd/geth/main.go | 26 +- cmd/utils/flags.go | 43 +- cmd/utils/flags_legacy.go | 20 + consensus/consensus.go | 8 - console/console.go | 3 - console/console_test.go | 5 +- eth/api.go | 52 - eth/api_backend.go | 23 +- eth/api_debug.go | 4 +- eth/api_miner.go | 28 - eth/backend.go | 152 +-- eth/catalyst/api_test.go | 5 +- eth/catalyst/simulated_beacon_test.go | 3 +- eth/filters/filter.go | 2 +- eth/filters/filter_system.go | 78 +- eth/filters/filter_system_test.go | 42 +- eth/filters/filter_test.go | 7 +- eth/gasprice/feehistory.go | 2 +- eth/gasprice/gasprice.go | 3 +- eth/gasprice/gasprice_test.go | 8 +- ethclient/ethclient_test.go | 23 +- ethstats/ethstats.go | 25 +- internal/ethapi/api_test.go | 5 +- internal/ethapi/backend.go | 3 +- internal/ethapi/transaction_args_test.go | 5 +- internal/web3ext/web3ext.go | 23 - miner/miner.go | 256 ++--- miner/miner_test.go | 204 +--- miner/payload_building.go | 9 +- miner/payload_building_test.go | 119 ++- miner/pending.go | 67 ++ miner/stress/clique/main.go | 223 ----- miner/worker.go | 1097 +++------------------- miner/worker_test.go | 510 ---------- 35 files changed, 598 insertions(+), 2488 deletions(-) delete mode 100644 eth/api.go create mode 100644 miner/pending.go delete mode 100644 miner/stress/clique/main.go delete mode 100644 miner/worker_test.go diff --git a/cmd/geth/consolecmd_test.go b/cmd/geth/consolecmd_test.go index ef6ef5f288..4d62206417 100644 --- a/cmd/geth/consolecmd_test.go +++ b/cmd/geth/consolecmd_test.go @@ -71,7 +71,6 @@ func TestConsoleWelcome(t *testing.T) { Welcome to the Geth JavaScript console! instance: Geth/v{{gethver}}/{{goos}}-{{goarch}}/{{gover}} -coinbase: {{.Etherbase}} at block: 0 ({{niltime}}) datadir: {{.Datadir}} modules: {{apis}} @@ -131,7 +130,6 @@ func testAttachWelcome(t *testing.T, geth *testgeth, endpoint, apis string) { attach.SetTemplateFunc("goarch", func() string { return runtime.GOARCH }) attach.SetTemplateFunc("gover", runtime.Version) attach.SetTemplateFunc("gethver", func() string { return params.VersionWithCommit("", "") }) - attach.SetTemplateFunc("etherbase", func() string { return geth.Etherbase }) attach.SetTemplateFunc("niltime", func() string { return time.Unix(1548854791, 0).Format("Mon Jan 02 2006 15:04:05 GMT-0700 (MST)") }) @@ -144,7 +142,6 @@ func testAttachWelcome(t *testing.T, geth *testgeth, endpoint, apis string) { Welcome to the Geth JavaScript console! instance: Geth/v{{gethver}}/{{goos}}-{{goarch}}/{{gover}} -coinbase: {{etherbase}} at block: 0 ({{niltime}}){{if ipc}} datadir: {{datadir}}{{end}} modules: {{apis}} diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 2f7d37fdd7..9a88e9f2e8 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -30,7 +30,6 @@ import ( "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/console/prompt" - "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/internal/debug" @@ -116,13 +115,14 @@ var ( utils.DiscoveryPortFlag, utils.MaxPeersFlag, utils.MaxPendingPeersFlag, - utils.MiningEnabledFlag, + utils.MiningEnabledFlag, // deprecated utils.MinerGasLimitFlag, utils.MinerGasPriceFlag, - utils.MinerEtherbaseFlag, + utils.MinerEtherbaseFlag, // deprecated utils.MinerExtraDataFlag, utils.MinerRecommitIntervalFlag, - utils.MinerNewPayloadTimeout, + utils.MinerPendingFeeRecipientFlag, + utils.MinerNewPayloadTimeoutFlag, // deprecated utils.NATFlag, utils.NoDiscoverFlag, utils.DiscoveryV4Flag, @@ -421,24 +421,6 @@ func startNode(ctx *cli.Context, stack *node.Node, backend ethapi.Backend, isCon } }() } - - // Start auxiliary services if enabled - if ctx.Bool(utils.MiningEnabledFlag.Name) { - // Mining only makes sense if a full Ethereum node is running - if ctx.String(utils.SyncModeFlag.Name) == "light" { - utils.Fatalf("Light clients do not support mining") - } - ethBackend, ok := backend.(*eth.EthAPIBackend) - if !ok { - utils.Fatalf("Ethereum service not running") - } - // Set the gas price to the limits from the CLI and start mining - gasprice := flags.GlobalBig(ctx, utils.MinerGasPriceFlag.Name) - ethBackend.TxPool().SetGasTip(gasprice) - if err := ethBackend.StartMining(); err != nil { - utils.Fatalf("Failed to start mining: %v", err) - } - } } // unlockAccounts unlocks any account specifically requested. diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 82af26ff96..fad567cd55 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -425,11 +425,6 @@ var ( } // Miner settings - MiningEnabledFlag = &cli.BoolFlag{ - Name: "mine", - Usage: "Enable mining", - Category: flags.MinerCategory, - } MinerGasLimitFlag = &cli.Uint64Flag{ Name: "miner.gaslimit", Usage: "Target gas ceiling for mined blocks", @@ -442,11 +437,6 @@ var ( Value: ethconfig.Defaults.Miner.GasPrice, Category: flags.MinerCategory, } - MinerEtherbaseFlag = &cli.StringFlag{ - Name: "miner.etherbase", - Usage: "0x prefixed public address for block mining rewards", - Category: flags.MinerCategory, - } MinerExtraDataFlag = &cli.StringFlag{ Name: "miner.extradata", Usage: "Block extra data set by the miner (default = client version)", @@ -458,10 +448,9 @@ var ( Value: ethconfig.Defaults.Miner.Recommit, Category: flags.MinerCategory, } - MinerNewPayloadTimeout = &cli.DurationFlag{ - Name: "miner.newpayload-timeout", - Usage: "Specify the maximum time allowance for creating a new payload", - Value: ethconfig.Defaults.Miner.NewPayloadTimeout, + MinerPendingFeeRecipientFlag = &cli.StringFlag{ + Name: "miner.pending.feeRecipient", + Usage: "0x prefixed public address for the pending block producer (not used for actual block production)", Category: flags.MinerCategory, } @@ -1268,19 +1257,23 @@ func MakeAddress(ks *keystore.KeyStore, account string) (accounts.Account, error // setEtherbase retrieves the etherbase from the directly specified command line flags. func setEtherbase(ctx *cli.Context, cfg *ethconfig.Config) { - if !ctx.IsSet(MinerEtherbaseFlag.Name) { + if ctx.IsSet(MinerEtherbaseFlag.Name) { + log.Warn("Option --miner.etherbase is deprecated as the etherbase is set by the consensus client post-merge") return } - addr := ctx.String(MinerEtherbaseFlag.Name) + if !ctx.IsSet(MinerPendingFeeRecipientFlag.Name) { + return + } + addr := ctx.String(MinerPendingFeeRecipientFlag.Name) if strings.HasPrefix(addr, "0x") || strings.HasPrefix(addr, "0X") { addr = addr[2:] } b, err := hex.DecodeString(addr) if err != nil || len(b) != common.AddressLength { - Fatalf("-%s: invalid etherbase address %q", MinerEtherbaseFlag.Name, addr) + Fatalf("-%s: invalid pending block producer address %q", MinerPendingFeeRecipientFlag.Name, addr) return } - cfg.Miner.Etherbase = common.BytesToAddress(b) + cfg.Miner.PendingFeeRecipient = common.BytesToAddress(b) } // MakePasswordList reads password lines from the file specified by the global --password flag. @@ -1496,6 +1489,9 @@ func setTxPool(ctx *cli.Context, cfg *legacypool.Config) { } func setMiner(ctx *cli.Context, cfg *miner.Config) { + if ctx.Bool(MiningEnabledFlag.Name) { + log.Warn("The flag --mine is deprecated and will be removed") + } if ctx.IsSet(MinerExtraDataFlag.Name) { cfg.ExtraData = []byte(ctx.String(MinerExtraDataFlag.Name)) } @@ -1508,8 +1504,9 @@ func setMiner(ctx *cli.Context, cfg *miner.Config) { if ctx.IsSet(MinerRecommitIntervalFlag.Name) { cfg.Recommit = ctx.Duration(MinerRecommitIntervalFlag.Name) } - if ctx.IsSet(MinerNewPayloadTimeout.Name) { - cfg.NewPayloadTimeout = ctx.Duration(MinerNewPayloadTimeout.Name) + if ctx.IsSet(MinerNewPayloadTimeoutFlag.Name) { + log.Warn("The flag --miner.newpayload-timeout is deprecated and will be removed, please use --miner.recommit") + cfg.Recommit = ctx.Duration(MinerNewPayloadTimeoutFlag.Name) } } @@ -1786,8 +1783,8 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { // Figure out the dev account address. // setEtherbase has been called above, configuring the miner address from command line flags. - if cfg.Miner.Etherbase != (common.Address{}) { - developer = accounts.Account{Address: cfg.Miner.Etherbase} + if cfg.Miner.PendingFeeRecipient != (common.Address{}) { + developer = accounts.Account{Address: cfg.Miner.PendingFeeRecipient} } else if accs := ks.Accounts(); len(accs) > 0 { developer = ks.Accounts()[0] } else { @@ -1798,7 +1795,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { } // Make sure the address is configured as fee recipient, otherwise // the miner will fail to start. - cfg.Miner.Etherbase = developer.Address + cfg.Miner.PendingFeeRecipient = developer.Address if err := ks.Unlock(developer, passphrase); err != nil { Fatalf("Failed to unlock developer account: %v", err) diff --git a/cmd/utils/flags_legacy.go b/cmd/utils/flags_legacy.go index 243abd8311..49321053c6 100644 --- a/cmd/utils/flags_legacy.go +++ b/cmd/utils/flags_legacy.go @@ -47,6 +47,9 @@ var DeprecatedFlags = []cli.Flag{ LightNoSyncServeFlag, LogBacktraceAtFlag, LogDebugFlag, + MinerNewPayloadTimeoutFlag, + MinerEtherbaseFlag, + MiningEnabledFlag, } var ( @@ -132,6 +135,23 @@ var ( Usage: "Prepends log messages with call-site location (deprecated)", Category: flags.DeprecatedCategory, } + // Deprecated February 2024 + MinerNewPayloadTimeoutFlag = &cli.DurationFlag{ + Name: "miner.newpayload-timeout", + Usage: "Specify the maximum time allowance for creating a new payload", + Value: ethconfig.Defaults.Miner.Recommit, + Category: flags.MinerCategory, + } + MinerEtherbaseFlag = &cli.StringFlag{ + Name: "miner.etherbase", + Usage: "0x prefixed public address for block mining rewards", + Category: flags.MinerCategory, + } + MiningEnabledFlag = &cli.BoolFlag{ + Name: "mine", + Usage: "Enable mining", + Category: flags.MinerCategory, + } ) // showDeprecated displays deprecated flags that will be soon removed from the codebase. diff --git a/consensus/consensus.go b/consensus/consensus.go index 3a2c2d2229..5cc052cb0f 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -119,11 +119,3 @@ type Engine interface { // Close terminates any background threads maintained by the consensus engine. Close() error } - -// PoW is a consensus engine based on proof-of-work. -type PoW interface { - Engine - - // Hashrate returns the current mining hashrate of a PoW consensus engine. - Hashrate() float64 -} diff --git a/console/console.go b/console/console.go index cdee53684e..5acb4cdccb 100644 --- a/console/console.go +++ b/console/console.go @@ -325,9 +325,6 @@ func (c *Console) Welcome() { // Print some generic Geth metadata if res, err := c.jsre.Run(` var message = "instance: " + web3.version.node + "\n"; - try { - message += "coinbase: " + eth.coinbase + "\n"; - } catch (err) {} message += "at block: " + eth.blockNumber + " (" + new Date(1000 * eth.getBlock(eth.blockNumber).timestamp) + ")\n"; try { message += " datadir: " + admin.datadir + "\n"; diff --git a/console/console_test.go b/console/console_test.go index a13be6a99d..4c30c1b49c 100644 --- a/console/console_test.go +++ b/console/console_test.go @@ -96,7 +96,7 @@ func newTester(t *testing.T, confOverride func(*ethconfig.Config)) *tester { ethConf := ðconfig.Config{ Genesis: core.DeveloperGenesisBlock(11_500_000, nil), Miner: miner.Config{ - Etherbase: common.HexToAddress(testAddress), + PendingFeeRecipient: common.HexToAddress(testAddress), }, } if confOverride != nil { @@ -167,9 +167,6 @@ func TestWelcome(t *testing.T) { if want := fmt.Sprintf("instance: %s", testInstance); !strings.Contains(output, want) { t.Fatalf("console output missing instance: have\n%s\nwant also %s", output, want) } - if want := fmt.Sprintf("coinbase: %s", testAddress); !strings.Contains(output, want) { - t.Fatalf("console output missing coinbase: have\n%s\nwant also %s", output, want) - } if want := "at block: 0"; !strings.Contains(output, want) { t.Fatalf("console output missing sync status: have\n%s\nwant also %s", output, want) } diff --git a/eth/api.go b/eth/api.go deleted file mode 100644 index 44e934fd04..0000000000 --- a/eth/api.go +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2015 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package eth - -import ( - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" -) - -// EthereumAPI provides an API to access Ethereum full node-related information. -type EthereumAPI struct { - e *Ethereum -} - -// NewEthereumAPI creates a new Ethereum protocol API for full nodes. -func NewEthereumAPI(e *Ethereum) *EthereumAPI { - return &EthereumAPI{e} -} - -// Etherbase is the address that mining rewards will be sent to. -func (api *EthereumAPI) Etherbase() (common.Address, error) { - return api.e.Etherbase() -} - -// Coinbase is the address that mining rewards will be sent to (alias for Etherbase). -func (api *EthereumAPI) Coinbase() (common.Address, error) { - return api.Etherbase() -} - -// Hashrate returns the POW hashrate. -func (api *EthereumAPI) Hashrate() hexutil.Uint64 { - return hexutil.Uint64(api.e.Miner().Hashrate()) -} - -// Mining returns an indication if this node is currently mining. -func (api *EthereumAPI) Mining() bool { - return api.e.IsMining() -} diff --git a/eth/api_backend.go b/eth/api_backend.go index 65adccd851..48c46447c5 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -37,7 +37,6 @@ import ( "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/miner" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" ) @@ -67,7 +66,7 @@ func (b *EthAPIBackend) SetHead(number uint64) { func (b *EthAPIBackend) HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error) { // Pending block is only known by the miner if number == rpc.PendingBlockNumber { - block := b.eth.miner.PendingBlock() + block, _, _ := b.eth.miner.Pending() if block == nil { return nil, errors.New("pending block is not available") } @@ -118,7 +117,7 @@ func (b *EthAPIBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*ty func (b *EthAPIBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error) { // Pending block is only known by the miner if number == rpc.PendingBlockNumber { - block := b.eth.miner.PendingBlock() + block, _, _ := b.eth.miner.Pending() if block == nil { return nil, errors.New("pending block is not available") } @@ -182,14 +181,14 @@ func (b *EthAPIBackend) BlockByNumberOrHash(ctx context.Context, blockNrOrHash r return nil, errors.New("invalid arguments; neither block nor hash specified") } -func (b *EthAPIBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) { - return b.eth.miner.PendingBlockAndReceipts() +func (b *EthAPIBackend) Pending() (*types.Block, types.Receipts, *state.StateDB) { + return b.eth.miner.Pending() } func (b *EthAPIBackend) StateAndHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*state.StateDB, *types.Header, error) { // Pending state is only known by the miner if number == rpc.PendingBlockNumber { - block, state := b.eth.miner.Pending() + block, _, state := b.eth.miner.Pending() if block == nil || state == nil { return nil, nil, errors.New("pending state is not available") } @@ -267,10 +266,6 @@ func (b *EthAPIBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEven return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch) } -func (b *EthAPIBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription { - return b.eth.miner.SubscribePendingLogs(ch) -} - func (b *EthAPIBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { return b.eth.BlockChain().SubscribeChainEvent(ch) } @@ -421,14 +416,6 @@ func (b *EthAPIBackend) CurrentHeader() *types.Header { return b.eth.blockchain.CurrentHeader() } -func (b *EthAPIBackend) Miner() *miner.Miner { - return b.eth.Miner() -} - -func (b *EthAPIBackend) StartMining() error { - return b.eth.StartMining() -} - func (b *EthAPIBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, tracers.StateReleaseFunc, error) { return b.eth.stateAtBlock(ctx, block, reexec, base, readOnly, preferDisk) } diff --git a/eth/api_debug.go b/eth/api_debug.go index 05010a3969..d5e4dda140 100644 --- a/eth/api_debug.go +++ b/eth/api_debug.go @@ -56,7 +56,7 @@ func (api *DebugAPI) DumpBlock(blockNr rpc.BlockNumber) (state.Dump, error) { // If we're dumping the pending state, we need to request // both the pending block as well as the pending state from // the miner and operate on those - _, stateDb := api.eth.miner.Pending() + _, _, stateDb := api.eth.miner.Pending() if stateDb == nil { return state.Dump{}, errors.New("pending state is not available") } @@ -142,7 +142,7 @@ func (api *DebugAPI) AccountRange(blockNrOrHash rpc.BlockNumberOrHash, start hex // If we're dumping the pending state, we need to request // both the pending block as well as the pending state from // the miner and operate on those - _, stateDb = api.eth.miner.Pending() + _, _, stateDb = api.eth.miner.Pending() if stateDb == nil { return state.Dump{}, errors.New("pending state is not available") } diff --git a/eth/api_miner.go b/eth/api_miner.go index 764d0ae5e2..8c96f4c54a 100644 --- a/eth/api_miner.go +++ b/eth/api_miner.go @@ -18,9 +18,7 @@ package eth import ( "math/big" - "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" ) @@ -34,21 +32,6 @@ func NewMinerAPI(e *Ethereum) *MinerAPI { return &MinerAPI{e} } -// Start starts the miner with the given number of threads. If threads is nil, -// the number of workers started is equal to the number of logical CPUs that are -// usable by this process. If mining is already running, this method adjust the -// number of threads allowed to use and updates the minimum price required by the -// transaction pool. -func (api *MinerAPI) Start() error { - return api.e.StartMining() -} - -// Stop terminates the miner, both at the consensus engine level as well as at -// the block creation level. -func (api *MinerAPI) Stop() { - api.e.StopMining() -} - // SetExtra sets the extra data string that is included when this miner mines a block. func (api *MinerAPI) SetExtra(extra string) (bool, error) { if err := api.e.Miner().SetExtra([]byte(extra)); err != nil { @@ -73,14 +56,3 @@ func (api *MinerAPI) SetGasLimit(gasLimit hexutil.Uint64) bool { api.e.Miner().SetGasCeil(uint64(gasLimit)) return true } - -// SetEtherbase sets the etherbase of the miner. -func (api *MinerAPI) SetEtherbase(etherbase common.Address) bool { - api.e.SetEtherbase(etherbase) - return true -} - -// SetRecommitInterval updates the interval for miner sealing work recommitting. -func (api *MinerAPI) SetRecommitInterval(interval int) { - api.e.Miner().SetRecommitInterval(time.Duration(interval) * time.Millisecond) -} diff --git a/eth/backend.go b/eth/backend.go index f6c1637aca..81d84028a5 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -28,8 +28,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/consensus" - "github.com/ethereum/go-ethereum/consensus/beacon" - "github.com/ethereum/go-ethereum/consensus/clique" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/rawdb" @@ -88,9 +86,8 @@ type Ethereum struct { APIBackend *EthAPIBackend - miner *miner.Miner - gasPrice *big.Int - etherbase common.Address + miner *miner.Miner + gasPrice *big.Int networkID uint64 netRPCService *ethapi.NetAPI @@ -164,7 +161,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { closeBloomHandler: make(chan struct{}), networkID: networkID, gasPrice: config.Miner.GasPrice, - etherbase: config.Miner.Etherbase, bloomRequests: make(chan chan *bloombits.Retrieval), bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms), p2pServer: stack.Server(), @@ -211,7 +207,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if config.OverrideVerkle != nil { overrides.OverrideVerkle = config.OverrideVerkle } - eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, config.Genesis, &overrides, eth.engine, vmConfig, eth.shouldPreserve, &config.TransactionHistory) + // TODO (MariusVanDerWijden) get rid of shouldPreserve in a follow-up PR + shouldPreserve := func(header *types.Header) bool { + return false + } + eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, config.Genesis, &overrides, eth.engine, vmConfig, shouldPreserve, &config.TransactionHistory) if err != nil { return nil, err } @@ -247,7 +247,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { return nil, err } - eth.miner = miner.New(eth, &config.Miner, eth.blockchain.Config(), eth.EventMux(), eth.engine, eth.isLocalBlock) + eth.miner = miner.New(eth, config.Miner, eth.engine) eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil} @@ -313,9 +313,6 @@ func (s *Ethereum) APIs() []rpc.API { // Append all the local APIs and return return append(apis, []rpc.API{ { - Namespace: "eth", - Service: NewEthereumAPI(s), - }, { Namespace: "miner", Service: NewMinerAPI(s), }, { @@ -338,138 +335,6 @@ func (s *Ethereum) ResetWithGenesisBlock(gb *types.Block) { s.blockchain.ResetWithGenesisBlock(gb) } -func (s *Ethereum) Etherbase() (eb common.Address, err error) { - s.lock.RLock() - etherbase := s.etherbase - s.lock.RUnlock() - - if etherbase != (common.Address{}) { - return etherbase, nil - } - return common.Address{}, errors.New("etherbase must be explicitly specified") -} - -// isLocalBlock checks whether the specified block is mined -// by local miner accounts. -// -// We regard two types of accounts as local miner account: etherbase -// and accounts specified via `txpool.locals` flag. -func (s *Ethereum) isLocalBlock(header *types.Header) bool { - author, err := s.engine.Author(header) - if err != nil { - log.Warn("Failed to retrieve block author", "number", header.Number.Uint64(), "hash", header.Hash(), "err", err) - return false - } - // Check whether the given address is etherbase. - s.lock.RLock() - etherbase := s.etherbase - s.lock.RUnlock() - if author == etherbase { - return true - } - // Check whether the given address is specified by `txpool.local` - // CLI flag. - for _, account := range s.config.TxPool.Locals { - if account == author { - return true - } - } - return false -} - -// shouldPreserve checks whether we should preserve the given block -// during the chain reorg depending on whether the author of block -// is a local account. -func (s *Ethereum) shouldPreserve(header *types.Header) bool { - // The reason we need to disable the self-reorg preserving for clique - // is it can be probable to introduce a deadlock. - // - // e.g. If there are 7 available signers - // - // r1 A - // r2 B - // r3 C - // r4 D - // r5 A [X] F G - // r6 [X] - // - // In the round5, the in-turn signer E is offline, so the worst case - // is A, F and G sign the block of round5 and reject the block of opponents - // and in the round6, the last available signer B is offline, the whole - // network is stuck. - if _, ok := s.engine.(*clique.Clique); ok { - return false - } - return s.isLocalBlock(header) -} - -// SetEtherbase sets the mining reward address. -func (s *Ethereum) SetEtherbase(etherbase common.Address) { - s.lock.Lock() - s.etherbase = etherbase - s.lock.Unlock() - - s.miner.SetEtherbase(etherbase) -} - -// StartMining starts the miner with the given number of CPU threads. If mining -// is already running, this method adjust the number of threads allowed to use -// and updates the minimum price required by the transaction pool. -func (s *Ethereum) StartMining() error { - // If the miner was not running, initialize it - if !s.IsMining() { - // Propagate the initial price point to the transaction pool - s.lock.RLock() - price := s.gasPrice - s.lock.RUnlock() - s.txPool.SetGasTip(price) - - // Configure the local mining address - eb, err := s.Etherbase() - if err != nil { - log.Error("Cannot start mining without etherbase", "err", err) - return fmt.Errorf("etherbase missing: %v", err) - } - var cli *clique.Clique - if c, ok := s.engine.(*clique.Clique); ok { - cli = c - } else if cl, ok := s.engine.(*beacon.Beacon); ok { - if c, ok := cl.InnerEngine().(*clique.Clique); ok { - cli = c - } - } - if cli != nil { - wallet, err := s.accountManager.Find(accounts.Account{Address: eb}) - if wallet == nil || err != nil { - log.Error("Etherbase account unavailable locally", "err", err) - return fmt.Errorf("signer missing: %v", err) - } - cli.Authorize(eb, wallet.SignData) - } - // If mining is started, we can disable the transaction rejection mechanism - // introduced to speed sync times. - s.handler.enableSyncedFeatures() - - go s.miner.Start() - } - return nil -} - -// StopMining terminates the miner, both at the consensus engine level as well as -// at the block creation level. -func (s *Ethereum) StopMining() { - // Update the thread count within the consensus engine - type threaded interface { - SetThreads(threads int) - } - if th, ok := s.engine.(threaded); ok { - th.SetThreads(-1) - } - // Stop the block creating itself - s.miner.Stop() -} - -func (s *Ethereum) IsMining() bool { return s.miner.Mining() } func (s *Ethereum) Miner() *miner.Miner { return s.miner } func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManager } @@ -531,7 +396,6 @@ func (s *Ethereum) Stop() error { s.bloomIndexer.Close() close(s.closeBloomHandler) s.txPool.Close() - s.miner.Close() s.blockchain.Stop() s.engine.Close() diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index a82e2d6cf6..a88996744c 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -447,7 +447,9 @@ func startEthService(t *testing.T, genesis *core.Genesis, blocks []*types.Block) t.Fatal("can't create node:", err) } - ethcfg := ðconfig.Config{Genesis: genesis, SyncMode: downloader.FullSync, TrieTimeout: time.Minute, TrieDirtyCache: 256, TrieCleanCache: 256} + mcfg := miner.DefaultConfig + mcfg.PendingFeeRecipient = testAddr + ethcfg := ðconfig.Config{Genesis: genesis, SyncMode: downloader.FullSync, TrieTimeout: time.Minute, TrieDirtyCache: 256, TrieCleanCache: 256, Miner: mcfg} ethservice, err := eth.New(n, ethcfg) if err != nil { t.Fatal("can't create eth service:", err) @@ -460,7 +462,6 @@ func startEthService(t *testing.T, genesis *core.Genesis, blocks []*types.Block) t.Fatal("can't import test blocks:", err) } - ethservice.SetEtherbase(testAddr) ethservice.SetSynced() return n, ethservice } diff --git a/eth/catalyst/simulated_beacon_test.go b/eth/catalyst/simulated_beacon_test.go index 6fa97ad87a..df682b49d9 100644 --- a/eth/catalyst/simulated_beacon_test.go +++ b/eth/catalyst/simulated_beacon_test.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/miner" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/params" @@ -48,7 +49,7 @@ func startSimulatedBeaconEthService(t *testing.T, genesis *core.Genesis) (*node. t.Fatal("can't create node:", err) } - ethcfg := ðconfig.Config{Genesis: genesis, SyncMode: downloader.FullSync, TrieTimeout: time.Minute, TrieDirtyCache: 256, TrieCleanCache: 256} + ethcfg := ðconfig.Config{Genesis: genesis, SyncMode: downloader.FullSync, TrieTimeout: time.Minute, TrieDirtyCache: 256, TrieCleanCache: 256, Miner: miner.DefaultConfig} ethservice, err := eth.New(n, ethcfg) if err != nil { t.Fatal("can't create eth service:", err) diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 83e3284a2b..f2b92d5a99 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -333,7 +333,7 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*typ // pendingLogs returns the logs matching the filter criteria within the pending block. func (f *Filter) pendingLogs() []*types.Log { - block, receipts := f.sys.backend.PendingBlockAndReceipts() + block, receipts, _ := f.sys.backend.Pending() if block == nil || receipts == nil { return nil } diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index f98a1f84ce..c32b837eb4 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -62,7 +63,7 @@ type Backend interface { GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) - PendingBlockAndReceipts() (*types.Block, types.Receipts) + Pending() (*types.Block, types.Receipts, *state.StateDB) CurrentHeader() *types.Header ChainConfig() *params.ChainConfig @@ -70,7 +71,6 @@ type Backend interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription - SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) @@ -198,20 +198,18 @@ type EventSystem struct { lastHead *types.Header // Subscriptions - txsSub event.Subscription // Subscription for new transaction event - logsSub event.Subscription // Subscription for new log event - rmLogsSub event.Subscription // Subscription for removed log event - pendingLogsSub event.Subscription // Subscription for pending log event - chainSub event.Subscription // Subscription for new chain event + txsSub event.Subscription // Subscription for new transaction event + logsSub event.Subscription // Subscription for new log event + rmLogsSub event.Subscription // Subscription for removed log event + chainSub event.Subscription // Subscription for new chain event // Channels - install chan *subscription // install filter for event notification - uninstall chan *subscription // remove filter for event notification - txsCh chan core.NewTxsEvent // Channel to receive new transactions event - logsCh chan []*types.Log // Channel to receive new log event - pendingLogsCh chan []*types.Log // Channel to receive new log event - rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event - chainCh chan core.ChainEvent // Channel to receive new chain event + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + txsCh chan core.NewTxsEvent // Channel to receive new transactions event + logsCh chan []*types.Log // Channel to receive new log event + rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event + chainCh chan core.ChainEvent // Channel to receive new chain event } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -222,16 +220,15 @@ type EventSystem struct { // or by stopping the given mux. func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem { m := &EventSystem{ - sys: sys, - backend: sys.backend, - lightMode: lightMode, - install: make(chan *subscription), - uninstall: make(chan *subscription), - txsCh: make(chan core.NewTxsEvent, txChanSize), - logsCh: make(chan []*types.Log, logsChanSize), - rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), - pendingLogsCh: make(chan []*types.Log, logsChanSize), - chainCh: make(chan core.ChainEvent, chainEvChanSize), + sys: sys, + backend: sys.backend, + lightMode: lightMode, + install: make(chan *subscription), + uninstall: make(chan *subscription), + txsCh: make(chan core.NewTxsEvent, txChanSize), + logsCh: make(chan []*types.Log, logsChanSize), + rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), + chainCh: make(chan core.ChainEvent, chainEvChanSize), } // Subscribe events @@ -239,10 +236,9 @@ func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem { m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) - m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh) // Make sure none of the subscriptions are empty - if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil { + if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil { log.Crit("Subscribe for event system failed") } @@ -434,12 +430,12 @@ func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) { } } -func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*types.Log) { - if len(ev) == 0 { +func (es *EventSystem) handlePendingLogs(filters filterIndex, logs []*types.Log) { + if len(logs) == 0 { return } for _, f := range filters[PendingLogsSubscription] { - matchedLogs := filterLogs(ev, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics) + matchedLogs := filterLogs(logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics) if len(matchedLogs) > 0 { f.logs <- matchedLogs } @@ -550,7 +546,6 @@ func (es *EventSystem) eventLoop() { es.txsSub.Unsubscribe() es.logsSub.Unsubscribe() es.rmLogsSub.Unsubscribe() - es.pendingLogsSub.Unsubscribe() es.chainSub.Unsubscribe() }() @@ -567,10 +562,29 @@ func (es *EventSystem) eventLoop() { es.handleLogs(index, ev) case ev := <-es.rmLogsCh: es.handleLogs(index, ev.Logs) - case ev := <-es.pendingLogsCh: - es.handlePendingLogs(index, ev) case ev := <-es.chainCh: es.handleChainEvent(index, ev) + // If we have no pending log subscription, + // we don't need to collect any pending logs. + if len(index[PendingLogsSubscription]) == 0 { + continue + } + + // Pull the pending logs if there is a new chain head. + pendingBlock, pendingReceipts, _ := es.backend.Pending() + if pendingBlock == nil || pendingReceipts == nil { + continue + } + if pendingBlock.ParentHash() != ev.Block.Hash() { + continue + } + var logs []*types.Log + for _, receipt := range pendingReceipts { + if len(receipt.Logs) > 0 { + logs = append(logs, receipt.Logs...) + } + } + es.handlePendingLogs(index, logs) case f := <-es.install: if f.typ == MinedAndPendingLogsSubscription { diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 99c012cc84..6238c97735 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" @@ -48,7 +49,6 @@ type testBackend struct { txFeed event.Feed logsFeed event.Feed rmLogsFeed event.Feed - pendingLogsFeed event.Feed chainFeed event.Feed pendingBlock *types.Block pendingReceipts types.Receipts @@ -125,8 +125,8 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash, number uint return logs, nil } -func (b *testBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) { - return b.pendingBlock, b.pendingReceipts +func (b *testBackend) Pending() (*types.Block, types.Receipts, *state.StateDB) { + return b.pendingBlock, b.pendingReceipts, nil } func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { @@ -141,10 +141,6 @@ func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript return b.logsFeed.Subscribe(ch) } -func (b *testBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription { - return b.pendingLogsFeed.Subscribe(ch) -} - func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { return b.chainFeed.Subscribe(ch) } @@ -180,6 +176,20 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc }() } +func (b *testBackend) setPending(block *types.Block, receipts types.Receipts) { + b.pendingBlock = block + b.pendingReceipts = receipts +} + +func (b *testBackend) notifyPending(logs []*types.Log) { + genesis := &core.Genesis{ + Config: params.TestChainConfig, + } + _, blocks, _ := core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 2, func(i int, b *core.BlockGen) {}) + b.setPending(blocks[1], []*types.Receipt{{Logs: logs}}) + b.chainFeed.Send(core.ChainEvent{Block: blocks[0]}) +} + func newTestFilterSystem(t testing.TB, db ethdb.Database, cfg Config) (*testBackend, *FilterSystem) { backend := &testBackend{db: db} sys := NewFilterSystem(backend, cfg) @@ -203,7 +213,7 @@ func TestBlockSubscription(t *testing.T) { BaseFee: big.NewInt(params.InitialBaseFee), } _, chain, _ = core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 10, func(i int, gen *core.BlockGen) {}) - chainEvents = []core.ChainEvent{} + chainEvents []core.ChainEvent ) for _, blk := range chain { @@ -386,7 +396,7 @@ func TestLogFilterCreation(t *testing.T) { {FilterCriteria{FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(100)}, false}, // from block "higher" than to block {FilterCriteria{FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, false}, - // topics more then 4 + // topics more than 4 {FilterCriteria{Topics: [][]common.Hash{{}, {}, {}, {}, {}}}, false}, } ) @@ -546,9 +556,9 @@ func TestLogFilter(t *testing.T) { if nsend := backend.logsFeed.Send(allLogs); nsend == 0 { t.Fatal("Logs event not delivered") } - if nsend := backend.pendingLogsFeed.Send(allLogs); nsend == 0 { - t.Fatal("Pending logs event not delivered") - } + + // set pending logs + backend.notifyPending(allLogs) for i, tt := range testCases { var fetched []*types.Log @@ -754,10 +764,12 @@ func TestPendingLogsSubscription(t *testing.T) { }() } - // raise events - for _, ev := range allLogs { - backend.pendingLogsFeed.Send(ev) + // set pending logs + var flattenLogs []*types.Log + for _, logs := range allLogs { + flattenLogs = append(flattenLogs, logs...) } + backend.notifyPending(flattenLogs) for i := range testCases { err := <-testCases[i].err diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index 659ca5ce19..48aaa584db 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -109,8 +109,8 @@ func BenchmarkFilters(b *testing.B) { func TestFilters(t *testing.T) { var ( - db = rawdb.NewMemoryDatabase() - _, sys = newTestFilterSystem(t, db, Config{}) + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) // Sender account key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr = crypto.PubkeyToAddress(key1.PublicKey) @@ -277,8 +277,7 @@ func TestFilters(t *testing.T) { }), signer, key1) gen.AddTx(tx) }) - sys.backend.(*testBackend).pendingBlock = pchain[0] - sys.backend.(*testBackend).pendingReceipts = preceipts[0] + backend.setPending(pchain[0], preceipts[0]) for i, tc := range []struct { f *Filter diff --git a/eth/gasprice/feehistory.go b/eth/gasprice/feehistory.go index d657eb6d99..8ab57294b7 100644 --- a/eth/gasprice/feehistory.go +++ b/eth/gasprice/feehistory.go @@ -160,7 +160,7 @@ func (oracle *Oracle) resolveBlockRange(ctx context.Context, reqEnd rpc.BlockNum ) switch reqEnd { case rpc.PendingBlockNumber: - if pendingBlock, pendingReceipts = oracle.backend.PendingBlockAndReceipts(); pendingBlock != nil { + if pendingBlock, pendingReceipts, _ = oracle.backend.Pending(); pendingBlock != nil { resolved = pendingBlock.Header() } else { // Pending block not supported by backend, process only until latest block. diff --git a/eth/gasprice/gasprice.go b/eth/gasprice/gasprice.go index b719649811..3fa70e41a0 100644 --- a/eth/gasprice/gasprice.go +++ b/eth/gasprice/gasprice.go @@ -24,6 +24,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" @@ -54,7 +55,7 @@ type OracleBackend interface { HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error) BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) - PendingBlockAndReceipts() (*types.Block, types.Receipts) + Pending() (*types.Block, types.Receipts, *state.StateDB) ChainConfig() *params.ChainConfig SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription } diff --git a/eth/gasprice/gasprice_test.go b/eth/gasprice/gasprice_test.go index 79217502f7..1d2e02cde6 100644 --- a/eth/gasprice/gasprice_test.go +++ b/eth/gasprice/gasprice_test.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" @@ -97,12 +98,13 @@ func (b *testBackend) GetReceipts(ctx context.Context, hash common.Hash) (types. return b.chain.GetReceiptsByHash(hash), nil } -func (b *testBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) { +func (b *testBackend) Pending() (*types.Block, types.Receipts, *state.StateDB) { if b.pending { block := b.chain.GetBlockByNumber(testHead + 1) - return block, b.chain.GetReceiptsByHash(block.Hash()) + state, _ := b.chain.StateAt(block.Root()) + return block, b.chain.GetReceiptsByHash(block.Hash()), state } - return nil, nil + return nil, nil, nil } func (b *testBackend) ChainConfig() *params.ChainConfig { diff --git a/ethclient/ethclient_test.go b/ethclient/ethclient_test.go index 0d2675f8d1..2f3229cedc 100644 --- a/ethclient/ethclient_test.go +++ b/ethclient/ethclient_test.go @@ -602,17 +602,22 @@ func testAtFunctions(t *testing.T, client *rpc.Client) { } // send a transaction for some interesting pending status + // and wait for the transaction to be included in the pending block sendTransaction(ec) - time.Sleep(100 * time.Millisecond) - // Check pending transaction count - pending, err := ec.PendingTransactionCount(context.Background()) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if pending != 1 { - t.Fatalf("unexpected pending, wanted 1 got: %v", pending) + // wait for the transaction to be included in the pending block + for { + // Check pending transaction count + pending, err := ec.PendingTransactionCount(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if pending == 1 { + break + } + time.Sleep(100 * time.Millisecond) } + // Query balance balance, err := ec.BalanceAt(context.Background(), testAddr, nil) if err != nil { @@ -737,7 +742,7 @@ func sendTransaction(ec *Client) error { if err != nil { return err } - nonce, err := ec.PendingNonceAt(context.Background(), testAddr) + nonce, err := ec.NonceAt(context.Background(), testAddr, nil) if err != nil { return err } diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index 61ceec443e..6e71666ec1 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -39,7 +39,6 @@ import ( ethproto "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/miner" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rpc" @@ -80,13 +79,6 @@ type fullNodeBackend interface { SuggestGasTipCap(ctx context.Context) (*big.Int, error) } -// miningNodeBackend encompasses the functionality necessary for a mining node -// reporting to ethstats -type miningNodeBackend interface { - fullNodeBackend - Miner() *miner.Miner -} - // Service implements an Ethereum netstats reporting daemon that pushes local // chain statistics up to a monitoring server. type Service struct { @@ -777,30 +769,21 @@ func (s *Service) reportPending(conn *connWrapper) error { type nodeStats struct { Active bool `json:"active"` Syncing bool `json:"syncing"` - Mining bool `json:"mining"` - Hashrate int `json:"hashrate"` Peers int `json:"peers"` GasPrice int `json:"gasPrice"` Uptime int `json:"uptime"` } -// reportStats retrieves various stats about the node at the networking and -// mining layer and reports it to the stats server. +// reportStats retrieves various stats about the node at the networking layer +// and reports it to the stats server. func (s *Service) reportStats(conn *connWrapper) error { - // Gather the syncing and mining infos from the local miner instance + // Gather the syncing infos from the local miner instance var ( - mining bool - hashrate int syncing bool gasprice int ) // check if backend is a full node if fullBackend, ok := s.backend.(fullNodeBackend); ok { - if miningBackend, ok := s.backend.(miningNodeBackend); ok { - mining = miningBackend.Miner().Mining() - hashrate = int(miningBackend.Miner().Hashrate()) - } - sync := fullBackend.SyncProgress() syncing = !sync.Done() @@ -820,8 +803,6 @@ func (s *Service) reportStats(conn *connWrapper) error { "id": s.node, "stats": &nodeStats{ Active: true, - Mining: mining, - Hashrate: hashrate, Peers: s.server.PeerCount(), GasPrice: gasprice, Syncing: syncing, diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index 1d0383daad..3f69f86144 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -547,7 +547,7 @@ func (b testBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOr } panic("only implemented for number") } -func (b testBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) { panic("implement me") } +func (b testBackend) Pending() (*types.Block, types.Receipts, *state.StateDB) { panic("implement me") } func (b testBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) { header, err := b.HeaderByHash(ctx, hash) if header == nil || err != nil { @@ -615,9 +615,6 @@ func (b testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) func (b testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { panic("implement me") } -func (b testBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription { - panic("implement me") -} func (b testBackend) BloomStatus() (uint64, uint64) { panic("implement me") } func (b testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { panic("implement me") diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 5f408ba20b..fd2f5699ea 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -65,7 +65,7 @@ type Backend interface { BlockByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Block, error) StateAndHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*state.StateDB, *types.Header, error) StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error) - PendingBlockAndReceipts() (*types.Block, types.Receipts) + Pending() (*types.Block, types.Receipts, *state.StateDB) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) GetTd(ctx context.Context, hash common.Hash) *big.Int GetEVM(ctx context.Context, msg *core.Message, state *state.StateDB, header *types.Header, vmConfig *vm.Config, blockCtx *vm.BlockContext) *vm.EVM @@ -94,7 +94,6 @@ type Backend interface { GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription - SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) } diff --git a/internal/ethapi/transaction_args_test.go b/internal/ethapi/transaction_args_test.go index 1b1634b250..24ecb1dee4 100644 --- a/internal/ethapi/transaction_args_test.go +++ b/internal/ethapi/transaction_args_test.go @@ -358,7 +358,7 @@ func (b *backendMock) StateAndHeaderByNumber(ctx context.Context, number rpc.Blo func (b *backendMock) StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error) { return nil, nil, nil } -func (b *backendMock) PendingBlockAndReceipts() (*types.Block, types.Receipts) { return nil, nil } +func (b *backendMock) Pending() (*types.Block, types.Receipts, *state.StateDB) { return nil, nil, nil } func (b *backendMock) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) { return nil, nil } @@ -396,9 +396,6 @@ func (b *backendMock) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscr func (b *backendMock) BloomStatus() (uint64, uint64) { return 0, 0 } func (b *backendMock) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {} func (b *backendMock) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return nil } -func (b *backendMock) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription { - return nil -} func (b *backendMock) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { return nil } diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index b86b5909d2..1da7d737dd 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -649,20 +649,6 @@ const MinerJs = ` web3._extend({ property: 'miner', methods: [ - new web3._extend.Method({ - name: 'start', - call: 'miner_start', - }), - new web3._extend.Method({ - name: 'stop', - call: 'miner_stop' - }), - new web3._extend.Method({ - name: 'setEtherbase', - call: 'miner_setEtherbase', - params: 1, - inputFormatter: [web3._extend.formatters.inputAddressFormatter] - }), new web3._extend.Method({ name: 'setExtra', call: 'miner_setExtra', @@ -680,15 +666,6 @@ web3._extend({ params: 1, inputFormatter: [web3._extend.utils.fromDecimal] }), - new web3._extend.Method({ - name: 'setRecommitInterval', - call: 'miner_setRecommitInterval', - params: 1, - }), - new web3._extend.Method({ - name: 'getHashrate', - call: 'miner_getHashrate' - }), ], properties: [] }); diff --git a/miner/miner.go b/miner/miner.go index 58bb71b557..430efcb2fc 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -30,9 +30,6 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/eth/downloader" - "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" ) @@ -45,207 +42,124 @@ type Backend interface { // Config is the configuration parameters of mining. type Config struct { - Etherbase common.Address `toml:",omitempty"` // Public address for block mining rewards - ExtraData hexutil.Bytes `toml:",omitempty"` // Block extra data set by the miner - GasFloor uint64 // Target gas floor for mined blocks. - GasCeil uint64 // Target gas ceiling for mined blocks. - GasPrice *big.Int // Minimum gas price for mining a transaction - Recommit time.Duration // The time interval for miner to re-create mining work. - - NewPayloadTimeout time.Duration // The maximum time allowance for creating a new payload + Etherbase common.Address `toml:"-"` // Deprecated + PendingFeeRecipient common.Address `toml:"-"` // Address for pending block rewards. + ExtraData hexutil.Bytes `toml:",omitempty"` // Block extra data set by the miner + GasCeil uint64 // Target gas ceiling for mined blocks. + GasPrice *big.Int // Minimum gas price for mining a transaction + Recommit time.Duration // The time interval for miner to re-create mining work. } // DefaultConfig contains default settings for miner. var DefaultConfig = Config{ - GasCeil: 30000000, + GasCeil: 30_000_000, GasPrice: big.NewInt(params.GWei), // The default recommit time is chosen as two seconds since // consensus-layer usually will wait a half slot of time(6s) // for payload generation. It should be enough for Geth to // run 3 rounds. - Recommit: 2 * time.Second, - NewPayloadTimeout: 2 * time.Second, + Recommit: 2 * time.Second, } -// Miner creates blocks and searches for proof-of-work values. +// Miner is the main object which takes care of submitting new work to consensus +// engine and gathering the sealing result. type Miner struct { - mux *event.TypeMux - eth Backend - engine consensus.Engine - exitCh chan struct{} - startCh chan struct{} - stopCh chan struct{} - worker *worker - - wg sync.WaitGroup -} - -func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, isLocalBlock func(header *types.Header) bool) *Miner { - miner := &Miner{ - mux: mux, - eth: eth, - engine: engine, - exitCh: make(chan struct{}), - startCh: make(chan struct{}), - stopCh: make(chan struct{}), - worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true), - } - miner.wg.Add(1) - go miner.update() - return miner -} - -// update keeps track of the downloader events. Please be aware that this is a one shot type of update loop. -// It's entered once and as soon as `Done` or `Failed` has been broadcasted the events are unregistered and -// the loop is exited. This to prevent a major security vuln where external parties can DOS you with blocks -// and halt your mining operation for as long as the DOS continues. -func (miner *Miner) update() { - defer miner.wg.Done() - - events := miner.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{}) - defer func() { - if !events.Closed() { - events.Unsubscribe() - } - }() - - shouldStart := false - canStart := true - dlEventCh := events.Chan() - for { - select { - case ev := <-dlEventCh: - if ev == nil { - // Unsubscription done, stop listening - dlEventCh = nil - continue - } - switch ev.Data.(type) { - case downloader.StartEvent: - wasMining := miner.Mining() - miner.worker.stop() - canStart = false - if wasMining { - // Resume mining after sync was finished - shouldStart = true - log.Info("Mining aborted due to sync") - } - miner.worker.syncing.Store(true) - - case downloader.FailedEvent: - canStart = true - if shouldStart { - miner.worker.start() - } - miner.worker.syncing.Store(false) - - case downloader.DoneEvent: - canStart = true - if shouldStart { - miner.worker.start() - } - miner.worker.syncing.Store(false) - - // Stop reacting to downloader events - events.Unsubscribe() - } - case <-miner.startCh: - if canStart { - miner.worker.start() - } - shouldStart = true - case <-miner.stopCh: - shouldStart = false - miner.worker.stop() - case <-miner.exitCh: - miner.worker.close() - return - } + confMu sync.RWMutex // The lock used to protect the config fields: GasCeil, GasTip and Extradata + config *Config + chainConfig *params.ChainConfig + engine consensus.Engine + txpool *txpool.TxPool + chain *core.BlockChain + pending *pending + pendingMu sync.Mutex // Lock protects the pending block +} + +// New creates a new miner with provided config. +func New(eth Backend, config Config, engine consensus.Engine) *Miner { + return &Miner{ + config: &config, + chainConfig: eth.BlockChain().Config(), + engine: engine, + txpool: eth.TxPool(), + chain: eth.BlockChain(), + pending: &pending{}, } } -func (miner *Miner) Start() { - miner.startCh <- struct{}{} -} - -func (miner *Miner) Stop() { - miner.stopCh <- struct{}{} -} - -func (miner *Miner) Close() { - close(miner.exitCh) - miner.wg.Wait() -} - -func (miner *Miner) Mining() bool { - return miner.worker.isRunning() -} - -func (miner *Miner) Hashrate() uint64 { - if pow, ok := miner.engine.(consensus.PoW); ok { - return uint64(pow.Hashrate()) +// Pending returns the currently pending block and associated receipts, logs +// and statedb. The returned values can be nil in case the pending block is +// not initialized. +func (miner *Miner) Pending() (*types.Block, types.Receipts, *state.StateDB) { + pending := miner.getPending() + if pending == nil { + return nil, nil, nil } - return 0 + return pending.block, pending.receipts, pending.stateDB.Copy() } +// SetExtra sets the content used to initialize the block extra field. func (miner *Miner) SetExtra(extra []byte) error { if uint64(len(extra)) > params.MaximumExtraDataSize { return fmt.Errorf("extra exceeds max length. %d > %v", len(extra), params.MaximumExtraDataSize) } - miner.worker.setExtra(extra) + miner.confMu.Lock() + miner.config.ExtraData = extra + miner.confMu.Unlock() return nil } -func (miner *Miner) SetGasTip(tip *big.Int) error { - miner.worker.setGasTip(tip) - return nil -} - -// SetRecommitInterval sets the interval for sealing work resubmitting. -func (miner *Miner) SetRecommitInterval(interval time.Duration) { - miner.worker.setRecommitInterval(interval) -} - -// Pending returns the currently pending block and associated state. The returned -// values can be nil in case the pending block is not initialized -func (miner *Miner) Pending() (*types.Block, *state.StateDB) { - return miner.worker.pending() -} - -// PendingBlock returns the currently pending block. The returned block can be -// nil in case the pending block is not initialized. -// -// Note, to access both the pending block and the pending state -// simultaneously, please use Pending(), as the pending state can -// change between multiple method calls -func (miner *Miner) PendingBlock() *types.Block { - return miner.worker.pendingBlock() -} - -// PendingBlockAndReceipts returns the currently pending block and corresponding receipts. -// The returned values can be nil in case the pending block is not initialized. -func (miner *Miner) PendingBlockAndReceipts() (*types.Block, types.Receipts) { - return miner.worker.pendingBlockAndReceipts() -} - -func (miner *Miner) SetEtherbase(addr common.Address) { - miner.worker.setEtherbase(addr) -} - // SetGasCeil sets the gaslimit to strive for when mining blocks post 1559. // For pre-1559 blocks, it sets the ceiling. func (miner *Miner) SetGasCeil(ceil uint64) { - miner.worker.setGasCeil(ceil) + miner.confMu.Lock() + miner.config.GasCeil = ceil + miner.confMu.Unlock() } -// SubscribePendingLogs starts delivering logs from pending transactions -// to the given channel. -func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription { - return miner.worker.pendingLogsFeed.Subscribe(ch) +// SetGasTip sets the minimum gas tip for inclusion. +func (miner *Miner) SetGasTip(tip *big.Int) error { + miner.confMu.Lock() + miner.config.GasPrice = tip + miner.confMu.Unlock() + return nil } // BuildPayload builds the payload according to the provided parameters. func (miner *Miner) BuildPayload(args *BuildPayloadArgs) (*Payload, error) { - return miner.worker.buildPayload(args) + return miner.buildPayload(args) +} + +// getPending retrieves the pending block based on the current head block. +// The result might be nil if pending generation is failed. +func (miner *Miner) getPending() *newPayloadResult { + header := miner.chain.CurrentHeader() + miner.pendingMu.Lock() + defer miner.pendingMu.Unlock() + if cached := miner.pending.resolve(header.Hash()); cached != nil { + return cached + } + + var ( + timestamp = uint64(time.Now().Unix()) + withdrawal types.Withdrawals + ) + if miner.chainConfig.IsShanghai(new(big.Int).Add(header.Number, big.NewInt(1)), timestamp) { + withdrawal = []*types.Withdrawal{} + } + ret := miner.generateWork(&generateParams{ + timestamp: timestamp, + forceTime: false, + parentHash: header.Hash(), + coinbase: miner.config.PendingFeeRecipient, + random: common.Hash{}, + withdrawals: withdrawal, + beaconRoot: nil, + noTxs: false, + }) + if ret.err != nil { + return nil + } + miner.pending.update(header.Hash(), ret) + return ret } diff --git a/miner/miner_test.go b/miner/miner_test.go index 5907fb4464..7c39564240 100644 --- a/miner/miner_test.go +++ b/miner/miner_test.go @@ -18,10 +18,9 @@ package miner import ( - "errors" "math/big" + "sync" "testing" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/clique" @@ -33,7 +32,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" @@ -60,10 +58,6 @@ func (m *mockBackend) TxPool() *txpool.TxPool { return m.txPool } -func (m *mockBackend) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (statedb *state.StateDB, err error) { - return nil, errors.New("not supported") -} - type testBlockChain struct { root common.Hash config *params.ChainConfig @@ -99,171 +93,18 @@ func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) return bc.chainHeadFeed.Subscribe(ch) } -func TestMiner(t *testing.T) { - t.Parallel() - miner, mux, cleanup := createMiner(t) - defer cleanup(false) - - miner.Start() - waitForMiningState(t, miner, true) - // Start the downloader - mux.Post(downloader.StartEvent{}) - waitForMiningState(t, miner, false) - // Stop the downloader and wait for the update loop to run - mux.Post(downloader.DoneEvent{}) - waitForMiningState(t, miner, true) - - // Subsequent downloader events after a successful DoneEvent should not cause the - // miner to start or stop. This prevents a security vulnerability - // that would allow entities to present fake high blocks that would - // stop mining operations by causing a downloader sync - // until it was discovered they were invalid, whereon mining would resume. - mux.Post(downloader.StartEvent{}) - waitForMiningState(t, miner, true) - - mux.Post(downloader.FailedEvent{}) - waitForMiningState(t, miner, true) -} - -// TestMinerDownloaderFirstFails tests that mining is only -// permitted to run indefinitely once the downloader sees a DoneEvent (success). -// An initial FailedEvent should allow mining to stop on a subsequent -// downloader StartEvent. -func TestMinerDownloaderFirstFails(t *testing.T) { - t.Parallel() - miner, mux, cleanup := createMiner(t) - defer cleanup(false) - - miner.Start() - waitForMiningState(t, miner, true) - // Start the downloader - mux.Post(downloader.StartEvent{}) - waitForMiningState(t, miner, false) - - // Stop the downloader and wait for the update loop to run - mux.Post(downloader.FailedEvent{}) - waitForMiningState(t, miner, true) - - // Since the downloader hasn't yet emitted a successful DoneEvent, - // we expect the miner to stop on next StartEvent. - mux.Post(downloader.StartEvent{}) - waitForMiningState(t, miner, false) - - // Downloader finally succeeds. - mux.Post(downloader.DoneEvent{}) - waitForMiningState(t, miner, true) - - // Downloader starts again. - // Since it has achieved a DoneEvent once, we expect miner - // state to be unchanged. - mux.Post(downloader.StartEvent{}) - waitForMiningState(t, miner, true) - - mux.Post(downloader.FailedEvent{}) - waitForMiningState(t, miner, true) -} - -func TestMinerStartStopAfterDownloaderEvents(t *testing.T) { - t.Parallel() - miner, mux, cleanup := createMiner(t) - defer cleanup(false) - - miner.Start() - waitForMiningState(t, miner, true) - // Start the downloader - mux.Post(downloader.StartEvent{}) - waitForMiningState(t, miner, false) - - // Downloader finally succeeds. - mux.Post(downloader.DoneEvent{}) - waitForMiningState(t, miner, true) - - miner.Stop() - waitForMiningState(t, miner, false) - - miner.Start() - waitForMiningState(t, miner, true) - - miner.Stop() - waitForMiningState(t, miner, false) -} - -func TestStartWhileDownload(t *testing.T) { - t.Parallel() - miner, mux, cleanup := createMiner(t) - defer cleanup(false) - waitForMiningState(t, miner, false) - miner.Start() - waitForMiningState(t, miner, true) - // Stop the downloader and wait for the update loop to run - mux.Post(downloader.StartEvent{}) - waitForMiningState(t, miner, false) - // Starting the miner after the downloader should not work - miner.Start() - waitForMiningState(t, miner, false) -} - -func TestStartStopMiner(t *testing.T) { - t.Parallel() - miner, _, cleanup := createMiner(t) - defer cleanup(false) - waitForMiningState(t, miner, false) - miner.Start() - waitForMiningState(t, miner, true) - miner.Stop() - waitForMiningState(t, miner, false) -} - -func TestCloseMiner(t *testing.T) { - t.Parallel() - miner, _, cleanup := createMiner(t) - defer cleanup(true) - waitForMiningState(t, miner, false) - miner.Start() - waitForMiningState(t, miner, true) - // Terminate the miner and wait for the update loop to run - miner.Close() - waitForMiningState(t, miner, false) -} - -// TestMinerSetEtherbase checks that etherbase becomes set even if mining isn't -// possible at the moment -func TestMinerSetEtherbase(t *testing.T) { - t.Parallel() - miner, mux, cleanup := createMiner(t) - defer cleanup(false) - miner.Start() - waitForMiningState(t, miner, true) - // Start the downloader - mux.Post(downloader.StartEvent{}) - waitForMiningState(t, miner, false) - // Now user tries to configure proper mining address - miner.Start() - // Stop the downloader and wait for the update loop to run - mux.Post(downloader.DoneEvent{}) - waitForMiningState(t, miner, true) - - coinbase := common.HexToAddress("0xdeedbeef") - miner.SetEtherbase(coinbase) - if addr := miner.worker.etherbase(); addr != coinbase { - t.Fatalf("Unexpected etherbase want %x got %x", coinbase, addr) - } -} - -// waitForMiningState waits until either -// * the desired mining state was reached -// * a timeout was reached which fails the test -func waitForMiningState(t *testing.T, m *Miner, mining bool) { - t.Helper() - - var state bool - for i := 0; i < 100; i++ { - time.Sleep(10 * time.Millisecond) - if state = m.Mining(); state == mining { - return +func TestBuildPendingBlocks(t *testing.T) { + miner := createMiner(t) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + block, _, _ := miner.Pending() + if block == nil { + t.Error("Pending failed") } - } - t.Fatalf("Mining() == %t, want %t", state, mining) + }() + wg.Wait() } func minerTestGenesisBlock(period uint64, gasLimit uint64, faucet common.Address) *core.Genesis { @@ -294,10 +135,11 @@ func minerTestGenesisBlock(period uint64, gasLimit uint64, faucet common.Address }, } } -func createMiner(t *testing.T) (*Miner, *event.TypeMux, func(skipMiner bool)) { + +func createMiner(t *testing.T) *Miner { // Create Ethash config config := Config{ - Etherbase: common.HexToAddress("123456789"), + PendingFeeRecipient: common.HexToAddress("123456789"), } // Create chainConfig chainDB := rawdb.NewMemoryDatabase() @@ -320,18 +162,8 @@ func createMiner(t *testing.T) (*Miner, *event.TypeMux, func(skipMiner bool)) { pool := legacypool.New(testTxPoolConfig, blockchain) txpool, _ := txpool.New(testTxPoolConfig.PriceLimit, blockchain, []txpool.SubPool{pool}) - backend := NewMockBackend(bc, txpool) - // Create event Mux - mux := new(event.TypeMux) // Create Miner - miner := New(backend, &config, chainConfig, mux, engine, nil) - cleanup := func(skipMiner bool) { - bc.Stop() - engine.Close() - txpool.Close() - if !skipMiner { - miner.Close() - } - } - return miner, mux, cleanup + backend := NewMockBackend(bc, txpool) + miner := New(backend, config, engine) + return miner } diff --git a/miner/payload_building.go b/miner/payload_building.go index 719736c479..cbdb82a642 100644 --- a/miner/payload_building.go +++ b/miner/payload_building.go @@ -46,7 +46,6 @@ type BuildPayloadArgs struct { // Id computes an 8-byte identifier by hashing the components of the payload arguments. func (args *BuildPayloadArgs) Id() engine.PayloadID { - // Hash hasher := sha256.New() hasher.Write(args.Parent[:]) binary.Write(hasher, binary.BigEndian, args.Timestamp) @@ -177,7 +176,7 @@ func (payload *Payload) ResolveFull() *engine.ExecutionPayloadEnvelope { } // buildPayload builds the payload according to the provided parameters. -func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) { +func (miner *Miner) buildPayload(args *BuildPayloadArgs) (*Payload, error) { // Build the initial version with no transaction included. It should be fast // enough to run. The empty payload can at least make sure there is something // to deliver for not missing slot. @@ -191,7 +190,7 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) { beaconRoot: args.BeaconRoot, noTxs: true, } - empty := w.getSealingBlock(emptyParams) + empty := miner.generateWork(emptyParams) if empty.err != nil { return nil, empty.err } @@ -227,11 +226,11 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) { select { case <-timer.C: start := time.Now() - r := w.getSealingBlock(fullParams) + r := miner.generateWork(fullParams) if r.err == nil { payload.update(r, time.Since(start)) } - timer.Reset(w.recommit) + timer.Reset(miner.config.Recommit) case <-payload.stop: log.Info("Stopping work on payload", "id", payload.id, "reason", "delivery") return diff --git a/miner/payload_building_test.go b/miner/payload_building_test.go index 708072b5ec..1728b9e5bd 100644 --- a/miner/payload_building_test.go +++ b/miner/payload_building_test.go @@ -17,26 +17,141 @@ package miner import ( + "math/big" "reflect" "testing" "time" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/consensus/clique" "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/txpool/legacypool" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/params" ) +var ( + // Test chain configurations + testTxPoolConfig legacypool.Config + ethashChainConfig *params.ChainConfig + cliqueChainConfig *params.ChainConfig + + // Test accounts + testBankKey, _ = crypto.GenerateKey() + testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey) + testBankFunds = big.NewInt(1000000000000000000) + + testUserKey, _ = crypto.GenerateKey() + testUserAddress = crypto.PubkeyToAddress(testUserKey.PublicKey) + + // Test transactions + pendingTxs []*types.Transaction + newTxs []*types.Transaction + + testConfig = Config{ + PendingFeeRecipient: testBankAddress, + Recommit: time.Second, + GasCeil: params.GenesisGasLimit, + } +) + +func init() { + testTxPoolConfig = legacypool.DefaultConfig + testTxPoolConfig.Journal = "" + ethashChainConfig = new(params.ChainConfig) + *ethashChainConfig = *params.TestChainConfig + cliqueChainConfig = new(params.ChainConfig) + *cliqueChainConfig = *params.TestChainConfig + cliqueChainConfig.Clique = ¶ms.CliqueConfig{ + Period: 10, + Epoch: 30000, + } + + signer := types.LatestSigner(params.TestChainConfig) + tx1 := types.MustSignNewTx(testBankKey, signer, &types.AccessListTx{ + ChainID: params.TestChainConfig.ChainID, + Nonce: 0, + To: &testUserAddress, + Value: big.NewInt(1000), + Gas: params.TxGas, + GasPrice: big.NewInt(params.InitialBaseFee), + }) + pendingTxs = append(pendingTxs, tx1) + + tx2 := types.MustSignNewTx(testBankKey, signer, &types.LegacyTx{ + Nonce: 1, + To: &testUserAddress, + Value: big.NewInt(1000), + Gas: params.TxGas, + GasPrice: big.NewInt(params.InitialBaseFee), + }) + newTxs = append(newTxs, tx2) +} + +// testWorkerBackend implements worker.Backend interfaces and wraps all information needed during the testing. +type testWorkerBackend struct { + db ethdb.Database + txPool *txpool.TxPool + chain *core.BlockChain + genesis *core.Genesis +} + +func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, n int) *testWorkerBackend { + var gspec = &core.Genesis{ + Config: chainConfig, + Alloc: types.GenesisAlloc{testBankAddress: {Balance: testBankFunds}}, + } + switch e := engine.(type) { + case *clique.Clique: + gspec.ExtraData = make([]byte, 32+common.AddressLength+crypto.SignatureLength) + copy(gspec.ExtraData[32:32+common.AddressLength], testBankAddress.Bytes()) + e.Authorize(testBankAddress, func(account accounts.Account, s string, data []byte) ([]byte, error) { + return crypto.Sign(crypto.Keccak256(data), testBankKey) + }) + case *ethash.Ethash: + default: + t.Fatalf("unexpected consensus engine type: %T", engine) + } + chain, err := core.NewBlockChain(db, &core.CacheConfig{TrieDirtyDisabled: true}, gspec, nil, engine, vm.Config{}, nil, nil) + if err != nil { + t.Fatalf("core.NewBlockChain failed: %v", err) + } + pool := legacypool.New(testTxPoolConfig, chain) + txpool, _ := txpool.New(testTxPoolConfig.PriceLimit, chain, []txpool.SubPool{pool}) + + return &testWorkerBackend{ + db: db, + chain: chain, + txPool: txpool, + genesis: gspec, + } +} + +func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain } +func (b *testWorkerBackend) TxPool() *txpool.TxPool { return b.txPool } + +func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int) (*Miner, *testWorkerBackend) { + backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks) + backend.txPool.Add(pendingTxs, true, false) + w := New(backend, testConfig, engine) + return w, backend +} + func TestBuildPayload(t *testing.T) { - t.Parallel() var ( db = rawdb.NewMemoryDatabase() recipient = common.HexToAddress("0xdeadbeef") ) w, b := newTestWorker(t, params.TestChainConfig, ethash.NewFaker(), db, 0) - defer w.close() timestamp := uint64(time.Now().Unix()) args := &BuildPayloadArgs{ diff --git a/miner/pending.go b/miner/pending.go new file mode 100644 index 0000000000..bb91fe8969 --- /dev/null +++ b/miner/pending.go @@ -0,0 +1,67 @@ +// Copyright 2024 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package miner + +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" +) + +// pendingTTL indicates the period of time a generated pending block should +// exist to serve RPC requests before being discarded if the parent block +// has not changed yet. The value is chosen to align with the recommit interval. +const pendingTTL = 2 * time.Second + +// pending wraps a pending block with additional metadata. +type pending struct { + created time.Time + parentHash common.Hash + result *newPayloadResult + lock sync.Mutex +} + +// resolve retrieves the cached pending result if it's available. Nothing will be +// returned if the parentHash is not matched or the result is already too old. +// +// Note, don't modify the returned payload result. +func (p *pending) resolve(parentHash common.Hash) *newPayloadResult { + p.lock.Lock() + defer p.lock.Unlock() + + if p.result == nil { + return nil + } + if parentHash != p.parentHash { + return nil + } + if time.Since(p.created) > pendingTTL { + return nil + } + return p.result +} + +// update refreshes the cached pending block with newly created one. +func (p *pending) update(parent common.Hash, result *newPayloadResult) { + p.lock.Lock() + defer p.lock.Unlock() + + p.parentHash = parent + p.result = result + p.created = time.Now() +} diff --git a/miner/stress/clique/main.go b/miner/stress/clique/main.go deleted file mode 100644 index 6059393845..0000000000 --- a/miner/stress/clique/main.go +++ /dev/null @@ -1,223 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -// This file contains a miner stress test based on the Clique consensus engine. -package main - -import ( - "bytes" - "crypto/ecdsa" - "math/big" - "math/rand" - "os" - "os/signal" - "time" - - "github.com/ethereum/go-ethereum/accounts/keystore" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/fdlimit" - "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/txpool/legacypool" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/eth" - "github.com/ethereum/go-ethereum/eth/downloader" - "github.com/ethereum/go-ethereum/eth/ethconfig" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/miner" - "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/params" -) - -func main() { - log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelInfo, true))) - fdlimit.Raise(2048) - - // Generate a batch of accounts to seal and fund with - faucets := make([]*ecdsa.PrivateKey, 128) - for i := 0; i < len(faucets); i++ { - faucets[i], _ = crypto.GenerateKey() - } - sealers := make([]*ecdsa.PrivateKey, 4) - for i := 0; i < len(sealers); i++ { - sealers[i], _ = crypto.GenerateKey() - } - // Create a Clique network based off of the Sepolia config - genesis := makeGenesis(faucets, sealers) - - // Handle interrupts. - interruptCh := make(chan os.Signal, 5) - signal.Notify(interruptCh, os.Interrupt) - - var ( - stacks []*node.Node - nodes []*eth.Ethereum - enodes []*enode.Node - ) - for _, sealer := range sealers { - // Start the node and wait until it's up - stack, ethBackend, err := makeSealer(genesis) - if err != nil { - panic(err) - } - defer stack.Close() - - for stack.Server().NodeInfo().Ports.Listener == 0 { - time.Sleep(250 * time.Millisecond) - } - // Connect the node to all the previous ones - for _, n := range enodes { - stack.Server().AddPeer(n) - } - // Start tracking the node and its enode - stacks = append(stacks, stack) - nodes = append(nodes, ethBackend) - enodes = append(enodes, stack.Server().Self()) - - // Inject the signer key and start sealing with it - ks := keystore.NewKeyStore(stack.KeyStoreDir(), keystore.LightScryptN, keystore.LightScryptP) - signer, err := ks.ImportECDSA(sealer, "") - if err != nil { - panic(err) - } - if err := ks.Unlock(signer, ""); err != nil { - panic(err) - } - stack.AccountManager().AddBackend(ks) - } - - // Iterate over all the nodes and start signing on them - time.Sleep(3 * time.Second) - for _, node := range nodes { - if err := node.StartMining(); err != nil { - panic(err) - } - } - time.Sleep(3 * time.Second) - - // Start injecting transactions from the faucet like crazy - nonces := make([]uint64, len(faucets)) - for { - // Stop when interrupted. - select { - case <-interruptCh: - for _, node := range stacks { - node.Close() - } - return - default: - } - - // Pick a random signer node - index := rand.Intn(len(faucets)) - backend := nodes[index%len(nodes)] - - // Create a self transaction and inject into the pool - tx, err := types.SignTx(types.NewTransaction(nonces[index], crypto.PubkeyToAddress(faucets[index].PublicKey), new(big.Int), 21000, big.NewInt(100000000000), nil), types.HomesteadSigner{}, faucets[index]) - if err != nil { - panic(err) - } - if err := backend.TxPool().Add([]*types.Transaction{tx}, true, false); err != nil { - panic(err) - } - nonces[index]++ - - // Wait if we're too saturated - if pend, _ := backend.TxPool().Stats(); pend > 2048 { - time.Sleep(100 * time.Millisecond) - } - } -} - -// makeGenesis creates a custom Clique genesis block based on some pre-defined -// signer and faucet accounts. -func makeGenesis(faucets []*ecdsa.PrivateKey, sealers []*ecdsa.PrivateKey) *core.Genesis { - // Create a Clique network based off of the Sepolia config - genesis := core.DefaultSepoliaGenesisBlock() - genesis.GasLimit = 25000000 - - genesis.Config.ChainID = big.NewInt(18) - genesis.Config.Clique.Period = 1 - - genesis.Alloc = types.GenesisAlloc{} - for _, faucet := range faucets { - genesis.Alloc[crypto.PubkeyToAddress(faucet.PublicKey)] = types.Account{ - Balance: new(big.Int).Exp(big.NewInt(2), big.NewInt(128), nil), - } - } - // Sort the signers and embed into the extra-data section - signers := make([]common.Address, len(sealers)) - for i, sealer := range sealers { - signers[i] = crypto.PubkeyToAddress(sealer.PublicKey) - } - for i := 0; i < len(signers); i++ { - for j := i + 1; j < len(signers); j++ { - if bytes.Compare(signers[i][:], signers[j][:]) > 0 { - signers[i], signers[j] = signers[j], signers[i] - } - } - } - genesis.ExtraData = make([]byte, 32+len(signers)*common.AddressLength+65) - for i, signer := range signers { - copy(genesis.ExtraData[32+i*common.AddressLength:], signer[:]) - } - // Return the genesis block for initialization - return genesis -} - -func makeSealer(genesis *core.Genesis) (*node.Node, *eth.Ethereum, error) { - // Define the basic configurations for the Ethereum node - datadir, _ := os.MkdirTemp("", "") - - config := &node.Config{ - Name: "geth", - Version: params.Version, - DataDir: datadir, - P2P: p2p.Config{ - ListenAddr: "0.0.0.0:0", - NoDiscovery: true, - MaxPeers: 25, - }, - } - // Start the node and configure a full Ethereum node on it - stack, err := node.New(config) - if err != nil { - return nil, nil, err - } - // Create and register the backend - ethBackend, err := eth.New(stack, ðconfig.Config{ - Genesis: genesis, - NetworkId: genesis.Config.ChainID.Uint64(), - SyncMode: downloader.FullSync, - DatabaseCache: 256, - DatabaseHandles: 256, - TxPool: legacypool.DefaultConfig, - GPO: ethconfig.Defaults.GPO, - Miner: miner.Config{ - GasCeil: genesis.GasLimit * 11 / 10, - GasPrice: big.NewInt(1), - Recommit: time.Second, - }, - }) - if err != nil { - return nil, nil, err - } - - err = stack.Start() - return stack, ethBackend, err -} diff --git a/miner/worker.go b/miner/worker.go index 134f91cafc..7e038b0f30 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -20,12 +20,10 @@ import ( "errors" "fmt" "math/big" - "sync" "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/misc/eip1559" "github.com/ethereum/go-ethereum/consensus/misc/eip4844" "github.com/ethereum/go-ethereum/core" @@ -33,47 +31,11 @@ import ( "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/trie" "github.com/holiman/uint256" ) -const ( - // resultQueueSize is the size of channel listening to sealing result. - resultQueueSize = 10 - - // txChanSize is the size of channel listening to NewTxsEvent. - // The number is referenced from the size of tx pool. - txChanSize = 4096 - - // chainHeadChanSize is the size of channel listening to ChainHeadEvent. - chainHeadChanSize = 10 - - // resubmitAdjustChanSize is the size of resubmitting interval adjustment channel. - resubmitAdjustChanSize = 10 - - // minRecommitInterval is the minimal time interval to recreate the sealing block with - // any newly arrived transactions. - minRecommitInterval = 1 * time.Second - - // maxRecommitInterval is the maximum time interval to recreate the sealing block with - // any newly arrived transactions. - maxRecommitInterval = 15 * time.Second - - // intervalAdjustRatio is the impact a single interval adjustment has on sealing work - // resubmitting interval. - intervalAdjustRatio = 0.1 - - // intervalAdjustBias is applied during the new resubmit interval calculation in favor of - // increasing upper limit or decreasing lower limit so that the limit can be reachable. - intervalAdjustBias = 200 * 1000.0 * 1000.0 - - // staleThreshold is the maximum depth of the acceptable stale block. - staleThreshold = 7 -) - var ( errBlockInterruptedByNewHead = errors.New("new head arrived while building block") errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block") @@ -96,47 +58,6 @@ type environment struct { blobs int } -// copy creates a deep copy of environment. -func (env *environment) copy() *environment { - cpy := &environment{ - signer: env.signer, - state: env.state.Copy(), - tcount: env.tcount, - coinbase: env.coinbase, - header: types.CopyHeader(env.header), - receipts: copyReceipts(env.receipts), - } - if env.gasPool != nil { - gasPool := *env.gasPool - cpy.gasPool = &gasPool - } - cpy.txs = make([]*types.Transaction, len(env.txs)) - copy(cpy.txs, env.txs) - - cpy.sidecars = make([]*types.BlobTxSidecar, len(env.sidecars)) - copy(cpy.sidecars, env.sidecars) - - return cpy -} - -// discard terminates the background prefetcher go-routine. It should -// always be called for all created environment instances otherwise -// the go-routine leak can happen. -func (env *environment) discard() { - if env.state == nil { - return - } - env.state.StopPrefetcher() -} - -// task contains all information for consensus engine sealing and result submitting. -type task struct { - receipts []*types.Receipt - state *state.StateDB - block *types.Block - createdAt time.Time -} - const ( commitInterruptNone int32 = iota commitInterruptNewHead @@ -144,629 +65,174 @@ const ( commitInterruptTimeout ) -// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier. -type newWorkReq struct { - interrupt *atomic.Int32 - timestamp int64 -} - // newPayloadResult is the result of payload generation. type newPayloadResult struct { err error block *types.Block fees *big.Int // total block fees sidecars []*types.BlobTxSidecar // collected blobs of blob transactions + stateDB *state.StateDB // StateDB after executing the transactions + receipts []*types.Receipt // Receipts collected during construction } -// getWorkReq represents a request for getting a new sealing work with provided parameters. -type getWorkReq struct { - params *generateParams - result chan *newPayloadResult // non-blocking channel -} - -// intervalAdjust represents a resubmitting interval adjustment. -type intervalAdjust struct { - ratio float64 - inc bool -} - -// worker is the main object which takes care of submitting new work to consensus engine -// and gathering the sealing result. -type worker struct { - config *Config - chainConfig *params.ChainConfig - engine consensus.Engine - eth Backend - chain *core.BlockChain - - // Feeds - pendingLogsFeed event.Feed - - // Subscriptions - mux *event.TypeMux - txsCh chan core.NewTxsEvent - txsSub event.Subscription - chainHeadCh chan core.ChainHeadEvent - chainHeadSub event.Subscription - - // Channels - newWorkCh chan *newWorkReq - getWorkCh chan *getWorkReq - taskCh chan *task - resultCh chan *types.Block - startCh chan struct{} - exitCh chan struct{} - resubmitIntervalCh chan time.Duration - resubmitAdjustCh chan *intervalAdjust - - wg sync.WaitGroup - - current *environment // An environment for current running cycle. - - mu sync.RWMutex // The lock used to protect the coinbase and extra fields - coinbase common.Address - extra []byte - tip *uint256.Int // Minimum tip needed for non-local transaction to include them - - pendingMu sync.RWMutex - pendingTasks map[common.Hash]*task - - snapshotMu sync.RWMutex // The lock used to protect the snapshots below - snapshotBlock *types.Block - snapshotReceipts types.Receipts - snapshotState *state.StateDB - - // atomic status counters - running atomic.Bool // The indicator whether the consensus engine is running or not. - newTxs atomic.Int32 // New arrival transaction count since last sealing work submitting. - syncing atomic.Bool // The indicator whether the node is still syncing. - - // newpayloadTimeout is the maximum timeout allowance for creating payload. - // The default value is 2 seconds but node operator can set it to arbitrary - // large value. A large timeout allowance may cause Geth to fail creating - // a non-empty payload within the specified time and eventually miss the slot - // in case there are some computation expensive transactions in txpool. - newpayloadTimeout time.Duration - - // recommit is the time interval to re-create sealing work or to re-build - // payload in proof-of-stake stage. - recommit time.Duration - - // External functions - isLocalBlock func(header *types.Header) bool // Function used to determine whether the specified block is mined by local miner. - - // Test hooks - newTaskHook func(*task) // Method to call upon receiving a new sealing task. - skipSealHook func(*task) bool // Method to decide whether skipping the sealing. - fullTaskHook func() // Method to call before pushing the full sealing task. - resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval. +// generateParams wraps various of settings for generating sealing task. +type generateParams struct { + timestamp uint64 // The timestamp for sealing task + forceTime bool // Flag whether the given timestamp is immutable or not + parentHash common.Hash // Parent block hash, empty means the latest chain head + coinbase common.Address // The fee recipient address for including transaction + random common.Hash // The randomness generated by beacon chain, empty before the merge + withdrawals types.Withdrawals // List of withdrawals to include in block (shanghai field) + beaconRoot *common.Hash // The beacon root (cancun field). + noTxs bool // Flag whether an empty block without any transaction is expected } -func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool) *worker { - worker := &worker{ - config: config, - chainConfig: chainConfig, - engine: engine, - eth: eth, - chain: eth.BlockChain(), - mux: mux, - isLocalBlock: isLocalBlock, - coinbase: config.Etherbase, - extra: config.ExtraData, - tip: uint256.MustFromBig(config.GasPrice), - pendingTasks: make(map[common.Hash]*task), - txsCh: make(chan core.NewTxsEvent, txChanSize), - chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), - newWorkCh: make(chan *newWorkReq), - getWorkCh: make(chan *getWorkReq), - taskCh: make(chan *task), - resultCh: make(chan *types.Block, resultQueueSize), - startCh: make(chan struct{}, 1), - exitCh: make(chan struct{}), - resubmitIntervalCh: make(chan time.Duration), - resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), - } - // Subscribe for transaction insertion events (whether from network or resurrects) - worker.txsSub = eth.TxPool().SubscribeTransactions(worker.txsCh, true) - // Subscribe events for blockchain - worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) - - // Sanitize recommit interval if the user-specified one is too short. - recommit := worker.config.Recommit - if recommit < minRecommitInterval { - log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval) - recommit = minRecommitInterval - } - worker.recommit = recommit - - // Sanitize the timeout config for creating payload. - newpayloadTimeout := worker.config.NewPayloadTimeout - if newpayloadTimeout == 0 { - log.Warn("Sanitizing new payload timeout to default", "provided", newpayloadTimeout, "updated", DefaultConfig.NewPayloadTimeout) - newpayloadTimeout = DefaultConfig.NewPayloadTimeout - } - if newpayloadTimeout < time.Millisecond*100 { - log.Warn("Low payload timeout may cause high amount of non-full blocks", "provided", newpayloadTimeout, "default", DefaultConfig.NewPayloadTimeout) +// generateWork generates a sealing block based on the given parameters. +func (miner *Miner) generateWork(params *generateParams) *newPayloadResult { + work, err := miner.prepareWork(params) + if err != nil { + return &newPayloadResult{err: err} } - worker.newpayloadTimeout = newpayloadTimeout - - worker.wg.Add(4) - go worker.mainLoop() - go worker.newWorkLoop(recommit) - go worker.resultLoop() - go worker.taskLoop() + if !params.noTxs { + interrupt := new(atomic.Int32) + timer := time.AfterFunc(miner.config.Recommit, func() { + interrupt.Store(commitInterruptTimeout) + }) + defer timer.Stop() - // Submit first work to initialize pending state. - if init { - worker.startCh <- struct{}{} + err := miner.fillTransactions(interrupt, work) + if errors.Is(err, errBlockInterruptedByTimeout) { + log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(miner.config.Recommit)) + } } - return worker -} - -// setEtherbase sets the etherbase used to initialize the block coinbase field. -func (w *worker) setEtherbase(addr common.Address) { - w.mu.Lock() - defer w.mu.Unlock() - w.coinbase = addr -} - -// etherbase retrieves the configured etherbase address. -func (w *worker) etherbase() common.Address { - w.mu.RLock() - defer w.mu.RUnlock() - return w.coinbase -} - -func (w *worker) setGasCeil(ceil uint64) { - w.mu.Lock() - defer w.mu.Unlock() - w.config.GasCeil = ceil -} - -// setExtra sets the content used to initialize the block extra field. -func (w *worker) setExtra(extra []byte) { - w.mu.Lock() - defer w.mu.Unlock() - w.extra = extra -} - -// setGasTip sets the minimum miner tip needed to include a non-local transaction. -func (w *worker) setGasTip(tip *big.Int) { - w.mu.Lock() - defer w.mu.Unlock() - w.tip = uint256.MustFromBig(tip) -} - -// setRecommitInterval updates the interval for miner sealing work recommitting. -func (w *worker) setRecommitInterval(interval time.Duration) { - select { - case w.resubmitIntervalCh <- interval: - case <-w.exitCh: + block, err := miner.engine.FinalizeAndAssemble(miner.chain, work.header, work.state, work.txs, nil, work.receipts, params.withdrawals) + if err != nil { + return &newPayloadResult{err: err} } -} - -// pending returns the pending state and corresponding block. The returned -// values can be nil in case the pending block is not initialized. -func (w *worker) pending() (*types.Block, *state.StateDB) { - w.snapshotMu.RLock() - defer w.snapshotMu.RUnlock() - if w.snapshotState == nil { - return nil, nil + return &newPayloadResult{ + block: block, + fees: totalFees(block, work.receipts), + sidecars: work.sidecars, + stateDB: work.state, + receipts: work.receipts, } - return w.snapshotBlock, w.snapshotState.Copy() } -// pendingBlock returns pending block. The returned block can be nil in case the -// pending block is not initialized. -func (w *worker) pendingBlock() *types.Block { - w.snapshotMu.RLock() - defer w.snapshotMu.RUnlock() - return w.snapshotBlock -} - -// pendingBlockAndReceipts returns pending block and corresponding receipts. -// The returned values can be nil in case the pending block is not initialized. -func (w *worker) pendingBlockAndReceipts() (*types.Block, types.Receipts) { - w.snapshotMu.RLock() - defer w.snapshotMu.RUnlock() - return w.snapshotBlock, w.snapshotReceipts -} - -// start sets the running status as 1 and triggers new work submitting. -func (w *worker) start() { - w.running.Store(true) - w.startCh <- struct{}{} -} - -// stop sets the running status as 0. -func (w *worker) stop() { - w.running.Store(false) -} - -// isRunning returns an indicator whether worker is running or not. -func (w *worker) isRunning() bool { - return w.running.Load() -} - -// close terminates all background threads maintained by the worker. -// Note the worker does not support being closed multiple times. -func (w *worker) close() { - w.running.Store(false) - close(w.exitCh) - w.wg.Wait() -} +// prepareWork constructs the sealing task according to the given parameters, +// either based on the last chain head or specified parent. In this function +// the pending transactions are not filled yet, only the empty task returned. +func (miner *Miner) prepareWork(genParams *generateParams) (*environment, error) { + miner.confMu.RLock() + defer miner.confMu.RUnlock() -// recalcRecommit recalculates the resubmitting interval upon feedback. -func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) time.Duration { - var ( - prevF = float64(prev.Nanoseconds()) - next float64 - ) - if inc { - next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias) - max := float64(maxRecommitInterval.Nanoseconds()) - if next > max { - next = max - } - } else { - next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias) - min := float64(minRecommit.Nanoseconds()) - if next < min { - next = min + // Find the parent block for sealing task + parent := miner.chain.CurrentBlock() + if genParams.parentHash != (common.Hash{}) { + block := miner.chain.GetBlockByHash(genParams.parentHash) + if block == nil { + return nil, fmt.Errorf("missing parent") } + parent = block.Header() } - return time.Duration(int64(next)) -} - -// newWorkLoop is a standalone goroutine to submit new sealing work upon received events. -func (w *worker) newWorkLoop(recommit time.Duration) { - defer w.wg.Done() - var ( - interrupt *atomic.Int32 - minRecommit = recommit // minimal resubmit interval specified by user. - timestamp int64 // timestamp for each round of sealing. - ) - - timer := time.NewTimer(0) - defer timer.Stop() - <-timer.C // discard the initial tick - - // commit aborts in-flight transaction execution with given signal and resubmits a new one. - commit := func(s int32) { - if interrupt != nil { - interrupt.Store(s) - } - interrupt = new(atomic.Int32) - select { - case w.newWorkCh <- &newWorkReq{interrupt: interrupt, timestamp: timestamp}: - case <-w.exitCh: - return + // Sanity check the timestamp correctness, recap the timestamp + // to parent+1 if the mutation is allowed. + timestamp := genParams.timestamp + if parent.Time >= timestamp { + if genParams.forceTime { + return nil, fmt.Errorf("invalid timestamp, parent %d given %d", parent.Time, timestamp) } - timer.Reset(recommit) - w.newTxs.Store(0) + timestamp = parent.Time + 1 } - // clearPending cleans the stale pending tasks. - clearPending := func(number uint64) { - w.pendingMu.Lock() - for h, t := range w.pendingTasks { - if t.block.NumberU64()+staleThreshold <= number { - delete(w.pendingTasks, h) - } - } - w.pendingMu.Unlock() + // Construct the sealing block header. + header := &types.Header{ + ParentHash: parent.Hash(), + Number: new(big.Int).Add(parent.Number, common.Big1), + GasLimit: core.CalcGasLimit(parent.GasLimit, miner.config.GasCeil), + Time: timestamp, + Coinbase: genParams.coinbase, } - - for { - select { - case <-w.startCh: - clearPending(w.chain.CurrentBlock().Number.Uint64()) - timestamp = time.Now().Unix() - commit(commitInterruptNewHead) - - case head := <-w.chainHeadCh: - clearPending(head.Block.NumberU64()) - timestamp = time.Now().Unix() - commit(commitInterruptNewHead) - - case <-timer.C: - // If sealing is running resubmit a new work cycle periodically to pull in - // higher priced transactions. Disable this overhead for pending blocks. - if w.isRunning() && (w.chainConfig.Clique == nil || w.chainConfig.Clique.Period > 0) { - // Short circuit if no new transaction arrives. - if w.newTxs.Load() == 0 { - timer.Reset(recommit) - continue - } - commit(commitInterruptResubmit) - } - - case interval := <-w.resubmitIntervalCh: - // Adjust resubmit interval explicitly by user. - if interval < minRecommitInterval { - log.Warn("Sanitizing miner recommit interval", "provided", interval, "updated", minRecommitInterval) - interval = minRecommitInterval - } - log.Info("Miner recommit interval update", "from", minRecommit, "to", interval) - minRecommit, recommit = interval, interval - - if w.resubmitHook != nil { - w.resubmitHook(minRecommit, recommit) - } - - case adjust := <-w.resubmitAdjustCh: - // Adjust resubmit interval by feedback. - if adjust.inc { - before := recommit - target := float64(recommit.Nanoseconds()) / adjust.ratio - recommit = recalcRecommit(minRecommit, recommit, target, true) - log.Trace("Increase miner recommit interval", "from", before, "to", recommit) - } else { - before := recommit - recommit = recalcRecommit(minRecommit, recommit, float64(minRecommit.Nanoseconds()), false) - log.Trace("Decrease miner recommit interval", "from", before, "to", recommit) - } - - if w.resubmitHook != nil { - w.resubmitHook(minRecommit, recommit) - } - - case <-w.exitCh: - return - } + // Set the extra field. + if len(miner.config.ExtraData) != 0 { + header.Extra = miner.config.ExtraData } -} - -// mainLoop is responsible for generating and submitting sealing work based on -// the received event. It can support two modes: automatically generate task and -// submit it or return task according to given parameters for various proposes. -func (w *worker) mainLoop() { - defer w.wg.Done() - defer w.txsSub.Unsubscribe() - defer w.chainHeadSub.Unsubscribe() - defer func() { - if w.current != nil { - w.current.discard() - } - }() - - for { - select { - case req := <-w.newWorkCh: - w.commitWork(req.interrupt, req.timestamp) - - case req := <-w.getWorkCh: - req.result <- w.generateWork(req.params) - - case ev := <-w.txsCh: - // Apply transactions to the pending state if we're not sealing - // - // Note all transactions received may not be continuous with transactions - // already included in the current sealing block. These transactions will - // be automatically eliminated. - if !w.isRunning() && w.current != nil { - // If block is already full, abort - if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas { - continue - } - txs := make(map[common.Address][]*txpool.LazyTransaction, len(ev.Txs)) - for _, tx := range ev.Txs { - acc, _ := types.Sender(w.current.signer, tx) - txs[acc] = append(txs[acc], &txpool.LazyTransaction{ - Pool: w.eth.TxPool(), // We don't know where this came from, yolo resolve from everywhere - Hash: tx.Hash(), - Tx: nil, // Do *not* set this! We need to resolve it later to pull blobs in - Time: tx.Time(), - GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()), - GasTipCap: uint256.MustFromBig(tx.GasTipCap()), - Gas: tx.Gas(), - BlobGas: tx.BlobGas(), - }) - } - plainTxs := newTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee) // Mixed bag of everrything, yolo - blobTxs := newTransactionsByPriceAndNonce(w.current.signer, nil, w.current.header.BaseFee) // Empty bag, don't bother optimising - - tcount := w.current.tcount - w.commitTransactions(w.current, plainTxs, blobTxs, nil) - - // Only update the snapshot if any new transactions were added - // to the pending block - if tcount != w.current.tcount { - w.updateSnapshot(w.current) - } - } else { - // Special case, if the consensus engine is 0 period clique(dev mode), - // submit sealing work here since all empty submission will be rejected - // by clique. Of course the advance sealing(empty submission) is disabled. - if w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0 { - w.commitWork(nil, time.Now().Unix()) - } - } - w.newTxs.Add(int32(len(ev.Txs))) - - // System stopped - case <-w.exitCh: - return - case <-w.txsSub.Err(): - return - case <-w.chainHeadSub.Err(): - return - } + // Set the randomness field from the beacon chain if it's available. + if genParams.random != (common.Hash{}) { + header.MixDigest = genParams.random } -} - -// taskLoop is a standalone goroutine to fetch sealing task from the generator and -// push them to consensus engine. -func (w *worker) taskLoop() { - defer w.wg.Done() - var ( - stopCh chan struct{} - prev common.Hash - ) - - // interrupt aborts the in-flight sealing task. - interrupt := func() { - if stopCh != nil { - close(stopCh) - stopCh = nil + // Set baseFee and GasLimit if we are on an EIP-1559 chain + if miner.chainConfig.IsLondon(header.Number) { + header.BaseFee = eip1559.CalcBaseFee(miner.chainConfig, parent) + if !miner.chainConfig.IsLondon(parent.Number) { + parentGasLimit := parent.GasLimit * miner.chainConfig.ElasticityMultiplier() + header.GasLimit = core.CalcGasLimit(parentGasLimit, miner.config.GasCeil) } } - for { - select { - case task := <-w.taskCh: - if w.newTaskHook != nil { - w.newTaskHook(task) - } - // Reject duplicate sealing work due to resubmitting. - sealHash := w.engine.SealHash(task.block.Header()) - if sealHash == prev { - continue - } - // Interrupt previous sealing operation - interrupt() - stopCh, prev = make(chan struct{}), sealHash - - if w.skipSealHook != nil && w.skipSealHook(task) { - continue - } - w.pendingMu.Lock() - w.pendingTasks[sealHash] = task - w.pendingMu.Unlock() - - if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil { - log.Warn("Block sealing failed", "err", err) - w.pendingMu.Lock() - delete(w.pendingTasks, sealHash) - w.pendingMu.Unlock() - } - case <-w.exitCh: - interrupt() - return + // Apply EIP-4844, EIP-4788. + if miner.chainConfig.IsCancun(header.Number, header.Time) { + var excessBlobGas uint64 + if miner.chainConfig.IsCancun(parent.Number, parent.Time) { + excessBlobGas = eip4844.CalcExcessBlobGas(*parent.ExcessBlobGas, *parent.BlobGasUsed) + } else { + // For the first post-fork block, both parent.data_gas_used and parent.excess_data_gas are evaluated as 0 + excessBlobGas = eip4844.CalcExcessBlobGas(0, 0) } + header.BlobGasUsed = new(uint64) + header.ExcessBlobGas = &excessBlobGas + header.ParentBeaconRoot = genParams.beaconRoot } -} - -// resultLoop is a standalone goroutine to handle sealing result submitting -// and flush relative data to the database. -func (w *worker) resultLoop() { - defer w.wg.Done() - for { - select { - case block := <-w.resultCh: - // Short circuit when receiving empty result. - if block == nil { - continue - } - // Short circuit when receiving duplicate result caused by resubmitting. - if w.chain.HasBlock(block.Hash(), block.NumberU64()) { - continue - } - var ( - sealhash = w.engine.SealHash(block.Header()) - hash = block.Hash() - ) - w.pendingMu.RLock() - task, exist := w.pendingTasks[sealhash] - w.pendingMu.RUnlock() - if !exist { - log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", hash) - continue - } - // Different block could share same sealhash, deep copy here to prevent write-write conflict. - var ( - receipts = make([]*types.Receipt, len(task.receipts)) - logs []*types.Log - ) - for i, taskReceipt := range task.receipts { - receipt := new(types.Receipt) - receipts[i] = receipt - *receipt = *taskReceipt - - // add block location fields - receipt.BlockHash = hash - receipt.BlockNumber = block.Number() - receipt.TransactionIndex = uint(i) - - // Update the block hash in all logs since it is now available and not when the - // receipt/log of individual transactions were created. - receipt.Logs = make([]*types.Log, len(taskReceipt.Logs)) - for i, taskLog := range taskReceipt.Logs { - log := new(types.Log) - receipt.Logs[i] = log - *log = *taskLog - log.BlockHash = hash - } - logs = append(logs, receipt.Logs...) - } - // Commit block and state to database. - _, err := w.chain.WriteBlockAndSetHead(block, receipts, logs, task.state, true) - if err != nil { - log.Error("Failed writing block to chain", "err", err) - continue - } - log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash, - "elapsed", common.PrettyDuration(time.Since(task.createdAt))) - - // Broadcast the block and announce chain insertion event - w.mux.Post(core.NewMinedBlockEvent{Block: block}) - - case <-w.exitCh: - return - } + // Run the consensus preparation with the default or customized consensus engine. + if err := miner.engine.Prepare(miner.chain, header); err != nil { + log.Error("Failed to prepare header for sealing", "err", err) + return nil, err + } + // Could potentially happen if starting to mine in an odd state. + // Note genParams.coinbase can be different with header.Coinbase + // since clique algorithm can modify the coinbase field in header. + env, err := miner.makeEnv(parent, header, genParams.coinbase) + if err != nil { + log.Error("Failed to create sealing context", "err", err) + return nil, err + } + if header.ParentBeaconRoot != nil { + context := core.NewEVMBlockContext(header, miner.chain, nil) + vmenv := vm.NewEVM(context, vm.TxContext{}, env.state, miner.chainConfig, vm.Config{}) + core.ProcessBeaconBlockRoot(*header.ParentBeaconRoot, vmenv, env.state) } + return env, nil } // makeEnv creates a new environment for the sealing block. -func (w *worker) makeEnv(parent *types.Header, header *types.Header, coinbase common.Address) (*environment, error) { +func (miner *Miner) makeEnv(parent *types.Header, header *types.Header, coinbase common.Address) (*environment, error) { // Retrieve the parent state to execute on top and start a prefetcher for // the miner to speed block sealing up a bit. - state, err := w.chain.StateAt(parent.Root) + state, err := miner.chain.StateAt(parent.Root) if err != nil { return nil, err } - state.StartPrefetcher("miner") - // Note the passed coinbase may be different with header.Coinbase. - env := &environment{ - signer: types.MakeSigner(w.chainConfig, header.Number, header.Time), + return &environment{ + signer: types.MakeSigner(miner.chainConfig, header.Number, header.Time), state: state, coinbase: coinbase, header: header, - } - // Keep track of transactions which return errors so they can be removed - env.tcount = 0 - return env, nil + }, nil } -// updateSnapshot updates pending snapshot block, receipts and state. -func (w *worker) updateSnapshot(env *environment) { - w.snapshotMu.Lock() - defer w.snapshotMu.Unlock() - - w.snapshotBlock = types.NewBlock( - env.header, - env.txs, - nil, - env.receipts, - trie.NewStackTrie(nil), - ) - w.snapshotReceipts = copyReceipts(env.receipts) - w.snapshotState = env.state.Copy() -} - -func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]*types.Log, error) { +func (miner *Miner) commitTransaction(env *environment, tx *types.Transaction) error { if tx.Type() == types.BlobTxType { - return w.commitBlobTransaction(env, tx) + return miner.commitBlobTransaction(env, tx) } - receipt, err := w.applyTransaction(env, tx) + receipt, err := miner.applyTransaction(env, tx) if err != nil { - return nil, err + return err } env.txs = append(env.txs, tx) env.receipts = append(env.receipts, receipt) - return receipt.Logs, nil + env.tcount++ + return nil } -func (w *worker) commitBlobTransaction(env *environment, tx *types.Transaction) ([]*types.Log, error) { +func (miner *Miner) commitBlobTransaction(env *environment, tx *types.Transaction) error { sc := tx.BlobTxSidecar() if sc == nil { panic("blob transaction without blobs in miner") @@ -776,27 +242,28 @@ func (w *worker) commitBlobTransaction(env *environment, tx *types.Transaction) // and not during execution. This means core.ApplyTransaction will not return an error if the // tx has too many blobs. So we have to explicitly check it here. if (env.blobs+len(sc.Blobs))*params.BlobTxBlobGasPerBlob > params.MaxBlobGasPerBlock { - return nil, errors.New("max data blobs reached") + return errors.New("max data blobs reached") } - receipt, err := w.applyTransaction(env, tx) + receipt, err := miner.applyTransaction(env, tx) if err != nil { - return nil, err + return err } env.txs = append(env.txs, tx.WithoutBlobTxSidecar()) env.receipts = append(env.receipts, receipt) env.sidecars = append(env.sidecars, sc) env.blobs += len(sc.Blobs) *env.header.BlobGasUsed += receipt.BlobGasUsed - return receipt.Logs, nil + env.tcount++ + return nil } // applyTransaction runs the transaction. If execution fails, state and gas pool are reverted. -func (w *worker) applyTransaction(env *environment, tx *types.Transaction) (*types.Receipt, error) { +func (miner *Miner) applyTransaction(env *environment, tx *types.Transaction) (*types.Receipt, error) { var ( snap = env.state.Snapshot() gp = env.gasPool.Gas() ) - receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &env.coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, *w.chain.GetVMConfig()) + receipt, err := core.ApplyTransaction(miner.chainConfig, miner.chain, &env.coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, *miner.chain.GetVMConfig()) if err != nil { env.state.RevertToSnapshot(snap) env.gasPool.SetGas(gp) @@ -804,13 +271,11 @@ func (w *worker) applyTransaction(env *environment, tx *types.Transaction) (*typ return receipt, err } -func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, interrupt *atomic.Int32) error { +func (miner *Miner) commitTransactions(env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, interrupt *atomic.Int32) error { gasLimit := env.header.GasLimit if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(gasLimit) } - var coalescedLogs []*types.Log - for { // Check interruption signal and abort building if it's fired. if interrupt != nil { @@ -877,15 +342,15 @@ func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transac // Check whether the tx is replay protected. If we're not in the EIP155 hf // phase, start ignoring the sender until we do. - if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) { - log.Trace("Ignoring replay protected transaction", "hash", ltx.Hash, "eip155", w.chainConfig.EIP155Block) + if tx.Protected() && !miner.chainConfig.IsEIP155(env.header.Number) { + log.Trace("Ignoring replay protected transaction", "hash", ltx.Hash, "eip155", miner.chainConfig.EIP155Block) txs.Pop() continue } // Start executing the transaction env.state.SetTxContext(tx.Hash(), env.tcount) - logs, err := w.commitTransaction(env, tx) + err := miner.commitTransaction(env, tx) switch { case errors.Is(err, core.ErrNonceTooLow): // New head notification data race between the transaction pool and miner, shift @@ -894,8 +359,6 @@ func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transac case errors.Is(err, nil): // Everything ok, collect the logs and shift in the next transaction from the same account - coalescedLogs = append(coalescedLogs, logs...) - env.tcount++ txs.Shift() default: @@ -905,130 +368,20 @@ func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transac txs.Pop() } } - if !w.isRunning() && len(coalescedLogs) > 0 { - // We don't push the pendingLogsEvent while we are sealing. The reason is that - // when we are sealing, the worker will regenerate a sealing block every 3 seconds. - // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. - - // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined - // logs by filling in the block hash when the block was mined by the local miner. This can - // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. - cpy := make([]*types.Log, len(coalescedLogs)) - for i, l := range coalescedLogs { - cpy[i] = new(types.Log) - *cpy[i] = *l - } - w.pendingLogsFeed.Send(cpy) - } return nil } -// generateParams wraps various of settings for generating sealing task. -type generateParams struct { - timestamp uint64 // The timestamp for sealing task - forceTime bool // Flag whether the given timestamp is immutable or not - parentHash common.Hash // Parent block hash, empty means the latest chain head - coinbase common.Address // The fee recipient address for including transaction - random common.Hash // The randomness generated by beacon chain, empty before the merge - withdrawals types.Withdrawals // List of withdrawals to include in block. - beaconRoot *common.Hash // The beacon root (cancun field). - noTxs bool // Flag whether an empty block without any transaction is expected -} - -// prepareWork constructs the sealing task according to the given parameters, -// either based on the last chain head or specified parent. In this function -// the pending transactions are not filled yet, only the empty task returned. -func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { - w.mu.RLock() - defer w.mu.RUnlock() - - // Find the parent block for sealing task - parent := w.chain.CurrentBlock() - if genParams.parentHash != (common.Hash{}) { - block := w.chain.GetBlockByHash(genParams.parentHash) - if block == nil { - return nil, errors.New("missing parent") - } - parent = block.Header() - } - // Sanity check the timestamp correctness, recap the timestamp - // to parent+1 if the mutation is allowed. - timestamp := genParams.timestamp - if parent.Time >= timestamp { - if genParams.forceTime { - return nil, fmt.Errorf("invalid timestamp, parent %d given %d", parent.Time, timestamp) - } - timestamp = parent.Time + 1 - } - // Construct the sealing block header. - header := &types.Header{ - ParentHash: parent.Hash(), - Number: new(big.Int).Add(parent.Number, common.Big1), - GasLimit: core.CalcGasLimit(parent.GasLimit, w.config.GasCeil), - Time: timestamp, - Coinbase: genParams.coinbase, - } - // Set the extra field. - if len(w.extra) != 0 { - header.Extra = w.extra - } - // Set the randomness field from the beacon chain if it's available. - if genParams.random != (common.Hash{}) { - header.MixDigest = genParams.random - } - // Set baseFee and GasLimit if we are on an EIP-1559 chain - if w.chainConfig.IsLondon(header.Number) { - header.BaseFee = eip1559.CalcBaseFee(w.chainConfig, parent) - if !w.chainConfig.IsLondon(parent.Number) { - parentGasLimit := parent.GasLimit * w.chainConfig.ElasticityMultiplier() - header.GasLimit = core.CalcGasLimit(parentGasLimit, w.config.GasCeil) - } - } - // Apply EIP-4844, EIP-4788. - if w.chainConfig.IsCancun(header.Number, header.Time) { - var excessBlobGas uint64 - if w.chainConfig.IsCancun(parent.Number, parent.Time) { - excessBlobGas = eip4844.CalcExcessBlobGas(*parent.ExcessBlobGas, *parent.BlobGasUsed) - } else { - // For the first post-fork block, both parent.data_gas_used and parent.excess_data_gas are evaluated as 0 - excessBlobGas = eip4844.CalcExcessBlobGas(0, 0) - } - header.BlobGasUsed = new(uint64) - header.ExcessBlobGas = &excessBlobGas - header.ParentBeaconRoot = genParams.beaconRoot - } - // Run the consensus preparation with the default or customized consensus engine. - if err := w.engine.Prepare(w.chain, header); err != nil { - log.Error("Failed to prepare header for sealing", "err", err) - return nil, err - } - // Could potentially happen if starting to mine in an odd state. - // Note genParams.coinbase can be different with header.Coinbase - // since clique algorithm can modify the coinbase field in header. - env, err := w.makeEnv(parent, header, genParams.coinbase) - if err != nil { - log.Error("Failed to create sealing context", "err", err) - return nil, err - } - if header.ParentBeaconRoot != nil { - context := core.NewEVMBlockContext(header, w.chain, nil) - vmenv := vm.NewEVM(context, vm.TxContext{}, env.state, w.chainConfig, vm.Config{}) - core.ProcessBeaconBlockRoot(*header.ParentBeaconRoot, vmenv, env.state) - } - return env, nil -} - // fillTransactions retrieves the pending transactions from the txpool and fills them // into the given sealing block. The transaction selection and ordering strategy can // be customized with the plugin in the future. -func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) error { - w.mu.RLock() - tip := w.tip - w.mu.RUnlock() +func (miner *Miner) fillTransactions(interrupt *atomic.Int32, env *environment) error { + miner.confMu.RLock() + tip := miner.config.GasPrice + miner.confMu.RUnlock() // Retrieve the pending transactions pre-filtered by the 1559/4844 dynamic fees filter := txpool.PendingFilter{ - MinTip: tip, + MinTip: uint256.MustFromBig(tip), } if env.header.BaseFee != nil { filter.BaseFee = uint256.MustFromBig(env.header.BaseFee) @@ -1037,16 +390,16 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err filter.BlobFee = uint256.MustFromBig(eip4844.CalcBlobFee(*env.header.ExcessBlobGas)) } filter.OnlyPlainTxs, filter.OnlyBlobTxs = true, false - pendingPlainTxs := w.eth.TxPool().Pending(filter) + pendingPlainTxs := miner.txpool.Pending(filter) filter.OnlyPlainTxs, filter.OnlyBlobTxs = false, true - pendingBlobTxs := w.eth.TxPool().Pending(filter) + pendingBlobTxs := miner.txpool.Pending(filter) // Split the pending transactions into locals and remotes. localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs localBlobTxs, remoteBlobTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingBlobTxs - for _, account := range w.eth.TxPool().Locals() { + for _, account := range miner.txpool.Locals() { if txs := remotePlainTxs[account]; len(txs) > 0 { delete(remotePlainTxs, account) localPlainTxs[account] = txs @@ -1061,7 +414,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err plainTxs := newTransactionsByPriceAndNonce(env.signer, localPlainTxs, env.header.BaseFee) blobTxs := newTransactionsByPriceAndNonce(env.signer, localBlobTxs, env.header.BaseFee) - if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil { + if err := miner.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil { return err } } @@ -1069,189 +422,13 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err plainTxs := newTransactionsByPriceAndNonce(env.signer, remotePlainTxs, env.header.BaseFee) blobTxs := newTransactionsByPriceAndNonce(env.signer, remoteBlobTxs, env.header.BaseFee) - if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil { + if err := miner.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil { return err } } return nil } -// generateWork generates a sealing block based on the given parameters. -func (w *worker) generateWork(params *generateParams) *newPayloadResult { - work, err := w.prepareWork(params) - if err != nil { - return &newPayloadResult{err: err} - } - defer work.discard() - - if !params.noTxs { - interrupt := new(atomic.Int32) - timer := time.AfterFunc(w.newpayloadTimeout, func() { - interrupt.Store(commitInterruptTimeout) - }) - defer timer.Stop() - - err := w.fillTransactions(interrupt, work) - if errors.Is(err, errBlockInterruptedByTimeout) { - log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(w.newpayloadTimeout)) - } - } - block, err := w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, nil, work.receipts, params.withdrawals) - if err != nil { - return &newPayloadResult{err: err} - } - return &newPayloadResult{ - block: block, - fees: totalFees(block, work.receipts), - sidecars: work.sidecars, - } -} - -// commitWork generates several new sealing tasks based on the parent block -// and submit them to the sealer. -func (w *worker) commitWork(interrupt *atomic.Int32, timestamp int64) { - // Abort committing if node is still syncing - if w.syncing.Load() { - return - } - start := time.Now() - - // Set the coinbase if the worker is running or it's required - var coinbase common.Address - if w.isRunning() { - coinbase = w.etherbase() - if coinbase == (common.Address{}) { - log.Error("Refusing to mine without etherbase") - return - } - } - work, err := w.prepareWork(&generateParams{ - timestamp: uint64(timestamp), - coinbase: coinbase, - }) - if err != nil { - return - } - // Fill pending transactions from the txpool into the block. - err = w.fillTransactions(interrupt, work) - switch { - case err == nil: - // The entire block is filled, decrease resubmit interval in case - // of current interval is larger than the user-specified one. - w.adjustResubmitInterval(&intervalAdjust{inc: false}) - - case errors.Is(err, errBlockInterruptedByRecommit): - // Notify resubmit loop to increase resubmitting interval if the - // interruption is due to frequent commits. - gaslimit := work.header.GasLimit - ratio := float64(gaslimit-work.gasPool.Gas()) / float64(gaslimit) - if ratio < 0.1 { - ratio = 0.1 - } - w.adjustResubmitInterval(&intervalAdjust{ - ratio: ratio, - inc: true, - }) - - case errors.Is(err, errBlockInterruptedByNewHead): - // If the block building is interrupted by newhead event, discard it - // totally. Committing the interrupted block introduces unnecessary - // delay, and possibly causes miner to mine on the previous head, - // which could result in higher uncle rate. - work.discard() - return - } - // Submit the generated block for consensus sealing. - w.commit(work.copy(), w.fullTaskHook, true, start) - - // Swap out the old work with the new one, terminating any leftover - // prefetcher processes in the mean time and starting a new one. - if w.current != nil { - w.current.discard() - } - w.current = work -} - -// commit runs any post-transaction state modifications, assembles the final block -// and commits new work if consensus engine is running. -// Note the assumption is held that the mutation is allowed to the passed env, do -// the deep copy first. -func (w *worker) commit(env *environment, interval func(), update bool, start time.Time) error { - if w.isRunning() { - if interval != nil { - interval() - } - // Create a local environment copy, avoid the data race with snapshot state. - // https://github.com/ethereum/go-ethereum/issues/24299 - env := env.copy() - // Withdrawals are set to nil here, because this is only called in PoW. - block, err := w.engine.FinalizeAndAssemble(w.chain, env.header, env.state, env.txs, nil, env.receipts, nil) - if err != nil { - return err - } - // If we're post merge, just ignore - if !w.isTTDReached(block.Header()) { - select { - case w.taskCh <- &task{receipts: env.receipts, state: env.state, block: block, createdAt: time.Now()}: - fees := totalFees(block, env.receipts) - feesInEther := new(big.Float).Quo(new(big.Float).SetInt(fees), big.NewFloat(params.Ether)) - log.Info("Commit new sealing work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()), - "txs", env.tcount, "gas", block.GasUsed(), "fees", feesInEther, - "elapsed", common.PrettyDuration(time.Since(start))) - - case <-w.exitCh: - log.Info("Worker has exited") - } - } - } - if update { - w.updateSnapshot(env) - } - return nil -} - -// getSealingBlock generates the sealing block based on the given parameters. -// The generation result will be passed back via the given channel no matter -// the generation itself succeeds or not. -func (w *worker) getSealingBlock(params *generateParams) *newPayloadResult { - req := &getWorkReq{ - params: params, - result: make(chan *newPayloadResult, 1), - } - select { - case w.getWorkCh <- req: - return <-req.result - case <-w.exitCh: - return &newPayloadResult{err: errors.New("miner closed")} - } -} - -// isTTDReached returns the indicator if the given block has reached the total -// terminal difficulty for The Merge transition. -func (w *worker) isTTDReached(header *types.Header) bool { - td, ttd := w.chain.GetTd(header.ParentHash, header.Number.Uint64()-1), w.chain.Config().TerminalTotalDifficulty - return td != nil && ttd != nil && td.Cmp(ttd) >= 0 -} - -// adjustResubmitInterval adjusts the resubmit interval. -func (w *worker) adjustResubmitInterval(message *intervalAdjust) { - select { - case w.resubmitAdjustCh <- message: - default: - log.Warn("the resubmitAdjustCh is full, discard the message") - } -} - -// copyReceipts makes a deep copy of the given receipts. -func copyReceipts(receipts []*types.Receipt) []*types.Receipt { - result := make([]*types.Receipt, len(receipts)) - for i, l := range receipts { - cpy := *l - result[i] = &cpy - } - return result -} - // totalFees computes total consumed miner fees in Wei. Block transactions and receipts have to have the same order. func totalFees(block *types.Block, receipts []*types.Receipt) *big.Int { feesWei := new(big.Int) diff --git a/miner/worker_test.go b/miner/worker_test.go deleted file mode 100644 index 9dba12ae51..0000000000 --- a/miner/worker_test.go +++ /dev/null @@ -1,510 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package miner - -import ( - "math/big" - "sync/atomic" - "testing" - "time" - - "github.com/ethereum/go-ethereum/accounts" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/consensus" - "github.com/ethereum/go-ethereum/consensus/clique" - "github.com/ethereum/go-ethereum/consensus/ethash" - "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/core/txpool" - "github.com/ethereum/go-ethereum/core/txpool/legacypool" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/core/vm" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/params" - "github.com/holiman/uint256" -) - -const ( - // testCode is the testing contract binary code which will initialises some - // variables in constructor - testCode = "0x60806040527fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff0060005534801561003457600080fd5b5060fc806100436000396000f3fe6080604052348015600f57600080fd5b506004361060325760003560e01c80630c4dae8814603757806398a213cf146053575b600080fd5b603d607e565b6040518082815260200191505060405180910390f35b607c60048036036020811015606757600080fd5b81019080803590602001909291905050506084565b005b60005481565b806000819055507fe9e44f9f7da8c559de847a3232b57364adc0354f15a2cd8dc636d54396f9587a6000546040518082815260200191505060405180910390a15056fea265627a7a723058208ae31d9424f2d0bc2a3da1a5dd659db2d71ec322a17db8f87e19e209e3a1ff4a64736f6c634300050a0032" - - // testGas is the gas required for contract deployment. - testGas = 144109 -) - -var ( - // Test chain configurations - testTxPoolConfig legacypool.Config - ethashChainConfig *params.ChainConfig - cliqueChainConfig *params.ChainConfig - - // Test accounts - testBankKey, _ = crypto.GenerateKey() - testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey) - testBankFunds = big.NewInt(1000000000000000000) - - testUserKey, _ = crypto.GenerateKey() - testUserAddress = crypto.PubkeyToAddress(testUserKey.PublicKey) - - // Test transactions - pendingTxs []*types.Transaction - newTxs []*types.Transaction - - testConfig = &Config{ - Recommit: time.Second, - GasCeil: params.GenesisGasLimit, - } -) - -func init() { - testTxPoolConfig = legacypool.DefaultConfig - testTxPoolConfig.Journal = "" - ethashChainConfig = new(params.ChainConfig) - *ethashChainConfig = *params.TestChainConfig - cliqueChainConfig = new(params.ChainConfig) - *cliqueChainConfig = *params.TestChainConfig - cliqueChainConfig.Clique = ¶ms.CliqueConfig{ - Period: 10, - Epoch: 30000, - } - - signer := types.LatestSigner(params.TestChainConfig) - tx1 := types.MustSignNewTx(testBankKey, signer, &types.AccessListTx{ - ChainID: params.TestChainConfig.ChainID, - Nonce: 0, - To: &testUserAddress, - Value: big.NewInt(1000), - Gas: params.TxGas, - GasPrice: big.NewInt(params.InitialBaseFee), - }) - pendingTxs = append(pendingTxs, tx1) - - tx2 := types.MustSignNewTx(testBankKey, signer, &types.LegacyTx{ - Nonce: 1, - To: &testUserAddress, - Value: big.NewInt(1000), - Gas: params.TxGas, - GasPrice: big.NewInt(params.InitialBaseFee), - }) - newTxs = append(newTxs, tx2) -} - -// testWorkerBackend implements worker.Backend interfaces and wraps all information needed during the testing. -type testWorkerBackend struct { - db ethdb.Database - txPool *txpool.TxPool - chain *core.BlockChain - genesis *core.Genesis -} - -func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, n int) *testWorkerBackend { - var gspec = &core.Genesis{ - Config: chainConfig, - Alloc: types.GenesisAlloc{testBankAddress: {Balance: testBankFunds}}, - } - switch e := engine.(type) { - case *clique.Clique: - gspec.ExtraData = make([]byte, 32+common.AddressLength+crypto.SignatureLength) - copy(gspec.ExtraData[32:32+common.AddressLength], testBankAddress.Bytes()) - e.Authorize(testBankAddress, func(account accounts.Account, s string, data []byte) ([]byte, error) { - return crypto.Sign(crypto.Keccak256(data), testBankKey) - }) - case *ethash.Ethash: - default: - t.Fatalf("unexpected consensus engine type: %T", engine) - } - chain, err := core.NewBlockChain(db, &core.CacheConfig{TrieDirtyDisabled: true}, gspec, nil, engine, vm.Config{}, nil, nil) - if err != nil { - t.Fatalf("core.NewBlockChain failed: %v", err) - } - pool := legacypool.New(testTxPoolConfig, chain) - txpool, _ := txpool.New(testTxPoolConfig.PriceLimit, chain, []txpool.SubPool{pool}) - - return &testWorkerBackend{ - db: db, - chain: chain, - txPool: txpool, - genesis: gspec, - } -} - -func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain } -func (b *testWorkerBackend) TxPool() *txpool.TxPool { return b.txPool } - -func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction { - var tx *types.Transaction - gasPrice := big.NewInt(10 * params.InitialBaseFee) - if creation { - tx, _ = types.SignTx(types.NewContractCreation(b.txPool.Nonce(testBankAddress), big.NewInt(0), testGas, gasPrice, common.FromHex(testCode)), types.HomesteadSigner{}, testBankKey) - } else { - tx, _ = types.SignTx(types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, gasPrice, nil), types.HomesteadSigner{}, testBankKey) - } - return tx -} - -func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int) (*worker, *testWorkerBackend) { - backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks) - backend.txPool.Add(pendingTxs, true, false) - w := newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false) - w.setEtherbase(testBankAddress) - return w, backend -} - -func TestGenerateAndImportBlock(t *testing.T) { - t.Parallel() - var ( - db = rawdb.NewMemoryDatabase() - config = *params.AllCliqueProtocolChanges - ) - config.Clique = ¶ms.CliqueConfig{Period: 1, Epoch: 30000} - engine := clique.New(config.Clique, db) - - w, b := newTestWorker(t, &config, engine, db, 0) - defer w.close() - - // This test chain imports the mined blocks. - chain, _ := core.NewBlockChain(rawdb.NewMemoryDatabase(), nil, b.genesis, nil, engine, vm.Config{}, nil, nil) - defer chain.Stop() - - // Ignore empty commit here for less noise. - w.skipSealHook = func(task *task) bool { - return len(task.receipts) == 0 - } - - // Wait for mined blocks. - sub := w.mux.Subscribe(core.NewMinedBlockEvent{}) - defer sub.Unsubscribe() - - // Start mining! - w.start() - - for i := 0; i < 5; i++ { - b.txPool.Add([]*types.Transaction{b.newRandomTx(true)}, true, false) - b.txPool.Add([]*types.Transaction{b.newRandomTx(false)}, true, false) - - select { - case ev := <-sub.Chan(): - block := ev.Data.(core.NewMinedBlockEvent).Block - if _, err := chain.InsertChain([]*types.Block{block}); err != nil { - t.Fatalf("failed to insert new mined block %d: %v", block.NumberU64(), err) - } - case <-time.After(3 * time.Second): // Worker needs 1s to include new changes. - t.Fatalf("timeout") - } - } -} - -func TestEmptyWorkEthash(t *testing.T) { - t.Parallel() - testEmptyWork(t, ethashChainConfig, ethash.NewFaker()) -} -func TestEmptyWorkClique(t *testing.T) { - t.Parallel() - testEmptyWork(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase())) -} - -func testEmptyWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { - defer engine.Close() - - w, _ := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0) - defer w.close() - - taskCh := make(chan struct{}, 2) - checkEqual := func(t *testing.T, task *task) { - // The work should contain 1 tx - receiptLen, balance := 1, uint256.NewInt(1000) - if len(task.receipts) != receiptLen { - t.Fatalf("receipt number mismatch: have %d, want %d", len(task.receipts), receiptLen) - } - if task.state.GetBalance(testUserAddress).Cmp(balance) != 0 { - t.Fatalf("account balance mismatch: have %d, want %d", task.state.GetBalance(testUserAddress), balance) - } - } - w.newTaskHook = func(task *task) { - if task.block.NumberU64() == 1 { - checkEqual(t, task) - taskCh <- struct{}{} - } - } - w.skipSealHook = func(task *task) bool { return true } - w.fullTaskHook = func() { - time.Sleep(100 * time.Millisecond) - } - w.start() // Start mining! - select { - case <-taskCh: - case <-time.NewTimer(3 * time.Second).C: - t.Error("new task timeout") - } -} - -func TestAdjustIntervalEthash(t *testing.T) { - t.Parallel() - testAdjustInterval(t, ethashChainConfig, ethash.NewFaker()) -} - -func TestAdjustIntervalClique(t *testing.T) { - t.Parallel() - testAdjustInterval(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase())) -} - -func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { - defer engine.Close() - - w, _ := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0) - defer w.close() - - w.skipSealHook = func(task *task) bool { - return true - } - w.fullTaskHook = func() { - time.Sleep(100 * time.Millisecond) - } - var ( - progress = make(chan struct{}, 10) - result = make([]float64, 0, 10) - index = 0 - start atomic.Bool - ) - w.resubmitHook = func(minInterval time.Duration, recommitInterval time.Duration) { - // Short circuit if interval checking hasn't started. - if !start.Load() { - return - } - var wantMinInterval, wantRecommitInterval time.Duration - - switch index { - case 0: - wantMinInterval, wantRecommitInterval = 3*time.Second, 3*time.Second - case 1: - origin := float64(3 * time.Second.Nanoseconds()) - estimate := origin*(1-intervalAdjustRatio) + intervalAdjustRatio*(origin/0.8+intervalAdjustBias) - wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(estimate)*time.Nanosecond - case 2: - estimate := result[index-1] - min := float64(3 * time.Second.Nanoseconds()) - estimate = estimate*(1-intervalAdjustRatio) + intervalAdjustRatio*(min-intervalAdjustBias) - wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(estimate)*time.Nanosecond - case 3: - wantMinInterval, wantRecommitInterval = time.Second, time.Second - } - - // Check interval - if minInterval != wantMinInterval { - t.Errorf("resubmit min interval mismatch: have %v, want %v ", minInterval, wantMinInterval) - } - if recommitInterval != wantRecommitInterval { - t.Errorf("resubmit interval mismatch: have %v, want %v", recommitInterval, wantRecommitInterval) - } - result = append(result, float64(recommitInterval.Nanoseconds())) - index += 1 - progress <- struct{}{} - } - w.start() - - time.Sleep(time.Second) // Ensure two tasks have been submitted due to start opt - start.Store(true) - - w.setRecommitInterval(3 * time.Second) - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } - - w.resubmitAdjustCh <- &intervalAdjust{inc: true, ratio: 0.8} - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } - - w.resubmitAdjustCh <- &intervalAdjust{inc: false} - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } - - w.setRecommitInterval(500 * time.Millisecond) - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } -} - -func TestGetSealingWorkEthash(t *testing.T) { - t.Parallel() - testGetSealingWork(t, ethashChainConfig, ethash.NewFaker()) -} - -func TestGetSealingWorkClique(t *testing.T) { - t.Parallel() - testGetSealingWork(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase())) -} - -func TestGetSealingWorkPostMerge(t *testing.T) { - t.Parallel() - local := new(params.ChainConfig) - *local = *ethashChainConfig - local.TerminalTotalDifficulty = big.NewInt(0) - testGetSealingWork(t, local, ethash.NewFaker()) -} - -func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { - defer engine.Close() - - w, b := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0) - defer w.close() - - w.setExtra([]byte{0x01, 0x02}) - - w.skipSealHook = func(task *task) bool { - return true - } - w.fullTaskHook = func() { - time.Sleep(100 * time.Millisecond) - } - timestamp := uint64(time.Now().Unix()) - assertBlock := func(block *types.Block, number uint64, coinbase common.Address, random common.Hash) { - if block.Time() != timestamp { - // Sometime the timestamp will be mutated if the timestamp - // is even smaller than parent block's. It's OK. - t.Logf("Invalid timestamp, want %d, get %d", timestamp, block.Time()) - } - _, isClique := engine.(*clique.Clique) - if !isClique { - if len(block.Extra()) != 2 { - t.Error("Unexpected extra field") - } - if block.Coinbase() != coinbase { - t.Errorf("Unexpected coinbase got %x want %x", block.Coinbase(), coinbase) - } - } else { - if block.Coinbase() != (common.Address{}) { - t.Error("Unexpected coinbase") - } - } - if !isClique { - if block.MixDigest() != random { - t.Error("Unexpected mix digest") - } - } - if block.Nonce() != 0 { - t.Error("Unexpected block nonce") - } - if block.NumberU64() != number { - t.Errorf("Mismatched block number, want %d got %d", number, block.NumberU64()) - } - } - var cases = []struct { - parent common.Hash - coinbase common.Address - random common.Hash - expectNumber uint64 - expectErr bool - }{ - { - b.chain.Genesis().Hash(), - common.HexToAddress("0xdeadbeef"), - common.HexToHash("0xcafebabe"), - uint64(1), - false, - }, - { - b.chain.CurrentBlock().Hash(), - common.HexToAddress("0xdeadbeef"), - common.HexToHash("0xcafebabe"), - b.chain.CurrentBlock().Number.Uint64() + 1, - false, - }, - { - b.chain.CurrentBlock().Hash(), - common.Address{}, - common.HexToHash("0xcafebabe"), - b.chain.CurrentBlock().Number.Uint64() + 1, - false, - }, - { - b.chain.CurrentBlock().Hash(), - common.Address{}, - common.Hash{}, - b.chain.CurrentBlock().Number.Uint64() + 1, - false, - }, - { - common.HexToHash("0xdeadbeef"), - common.HexToAddress("0xdeadbeef"), - common.HexToHash("0xcafebabe"), - 0, - true, - }, - } - - // This API should work even when the automatic sealing is not enabled - for _, c := range cases { - r := w.getSealingBlock(&generateParams{ - parentHash: c.parent, - timestamp: timestamp, - coinbase: c.coinbase, - random: c.random, - withdrawals: nil, - beaconRoot: nil, - noTxs: false, - forceTime: true, - }) - if c.expectErr { - if r.err == nil { - t.Error("Expect error but get nil") - } - } else { - if r.err != nil { - t.Errorf("Unexpected error %v", r.err) - } - assertBlock(r.block, c.expectNumber, c.coinbase, c.random) - } - } - - // This API should work even when the automatic sealing is enabled - w.start() - for _, c := range cases { - r := w.getSealingBlock(&generateParams{ - parentHash: c.parent, - timestamp: timestamp, - coinbase: c.coinbase, - random: c.random, - withdrawals: nil, - beaconRoot: nil, - noTxs: false, - forceTime: true, - }) - if c.expectErr { - if r.err == nil { - t.Error("Expect error but get nil") - } - } else { - if r.err != nil { - t.Errorf("Unexpected error %v", r.err) - } - assertBlock(r.block, c.expectNumber, c.coinbase, c.random) - } - } -}