eth/catalyst, miner: build the execution payload async (#24866)

* eth/catalyst: build the execution payload async

* miner: added comment, added test case

* eth/catalyst: miner: move async block production to miner

* eth/catalyst, miner: support generate seal block async

* miner: rework GetSealingBlockAsync to use a passed channel

* miner: apply rjl's diff

* eth/catalyst: nitpicks

Co-authored-by: Gary Rong <garyrong0905@gmail.com>
pull/24900/head
Marius van der Wijden 3 years ago committed by GitHub
parent e6fa102eb0
commit cc9fb8e21d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 30
      eth/catalyst/api.go
  2. 82
      eth/catalyst/api_test.go
  3. 54
      eth/catalyst/queue.go
  4. 30
      miner/miner.go
  5. 35
      miner/worker.go
  6. 8
      miner/worker_test.go

@ -197,18 +197,19 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa
// sealed by the beacon client. The payload will be requested later, and we
// might replace it arbitrarily many times in between.
if payloadAttributes != nil {
log.Info("Creating new payload for sealing")
start := time.Now()
data, err := api.assembleBlock(update.HeadBlockHash, payloadAttributes)
// Create an empty block first which can be used as a fallback
empty, err := api.eth.Miner().GetSealingBlockSync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, true)
if err != nil {
return valid(nil), err
}
// Send a request to generate a full block in the background.
// The result can be obtained via the returned channel.
resCh, err := api.eth.Miner().GetSealingBlockAsync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, false)
if err != nil {
log.Error("Failed to create sealing payload", "err", err)
return valid(nil), err // valid setHead, invalid payload
return valid(nil), err
}
id := computePayloadId(update.HeadBlockHash, payloadAttributes)
api.localBlocks.put(id, data)
log.Info("Created payload for sealing", "id", id, "elapsed", time.Since(start))
api.localBlocks.put(id, &payload{empty: empty, result: resCh})
return valid(&id), nil
}
return valid(nil), nil
@ -344,14 +345,3 @@ func (api *ConsensusAPI) invalid(err error) beacon.PayloadStatusV1 {
errorMsg := err.Error()
return beacon.PayloadStatusV1{Status: beacon.INVALID, LatestValidHash: &currentHash, ValidationError: &errorMsg}
}
// assembleBlock creates a new block and returns the "execution
// data" required for beacon clients to process the new block.
func (api *ConsensusAPI) assembleBlock(parentHash common.Hash, params *beacon.PayloadAttributesV1) (*beacon.ExecutableDataV1, error) {
log.Info("Producing block", "parentHash", parentHash)
block, err := api.eth.Miner().GetSealingBlock(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random)
if err != nil {
return nil, err
}
return beacon.BlockToExecutableData(block), nil
}

@ -93,7 +93,7 @@ func TestEth2AssembleBlock(t *testing.T) {
blockParams := beacon.PayloadAttributesV1{
Timestamp: blocks[9].Time() + 5,
}
execData, err := api.assembleBlock(blocks[9].Hash(), &blockParams)
execData, err := assembleBlock(api, blocks[9].Hash(), &blockParams)
if err != nil {
t.Fatalf("error producing block, err=%v", err)
}
@ -114,7 +114,7 @@ func TestEth2AssembleBlockWithAnotherBlocksTxs(t *testing.T) {
blockParams := beacon.PayloadAttributesV1{
Timestamp: blocks[8].Time() + 5,
}
execData, err := api.assembleBlock(blocks[8].Hash(), &blockParams)
execData, err := assembleBlock(api, blocks[8].Hash(), &blockParams)
if err != nil {
t.Fatalf("error producing block, err=%v", err)
}
@ -273,7 +273,7 @@ func TestEth2NewBlock(t *testing.T) {
tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
ethservice.TxPool().AddLocal(tx)
execData, err := api.assembleBlock(parent.Hash(), &beacon.PayloadAttributesV1{
execData, err := assembleBlock(api, parent.Hash(), &beacon.PayloadAttributesV1{
Timestamp: parent.Time() + 5,
})
if err != nil {
@ -313,7 +313,7 @@ func TestEth2NewBlock(t *testing.T) {
)
parent = preMergeBlocks[len(preMergeBlocks)-1]
for i := 0; i < 10; i++ {
execData, err := api.assembleBlock(parent.Hash(), &beacon.PayloadAttributesV1{
execData, err := assembleBlock(api, parent.Hash(), &beacon.PayloadAttributesV1{
Timestamp: parent.Time() + 6,
})
if err != nil {
@ -530,3 +530,77 @@ func TestExchangeTransitionConfig(t *testing.T) {
t.Fatalf("expected no error on valid config, got %v", err)
}
}
func TestEmptyBlocks(t *testing.T) {
genesis, preMergeBlocks := generatePreMergeChain(10)
n, ethservice := startEthService(t, genesis, preMergeBlocks)
ethservice.Merger().ReachTTD()
defer n.Close()
var (
api = NewConsensusAPI(ethservice)
parent = ethservice.BlockChain().CurrentBlock()
// This EVM code generates a log when the contract is created.
logCode = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
)
for i := 0; i < 10; i++ {
statedb, _ := ethservice.BlockChain().StateAt(parent.Root())
nonce := statedb.GetNonce(testAddr)
tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
ethservice.TxPool().AddLocal(tx)
params := beacon.PayloadAttributesV1{
Timestamp: parent.Time() + 1,
Random: crypto.Keccak256Hash([]byte{byte(i)}),
SuggestedFeeRecipient: parent.Coinbase(),
}
fcState := beacon.ForkchoiceStateV1{
HeadBlockHash: parent.Hash(),
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
}
resp, err := api.ForkchoiceUpdatedV1(fcState, &params)
if err != nil {
t.Fatalf("error preparing payload, err=%v", err)
}
if resp.PayloadStatus.Status != beacon.VALID {
t.Fatalf("error preparing payload, invalid status: %v", resp.PayloadStatus.Status)
}
payload, err := api.GetPayloadV1(*resp.PayloadID)
if err != nil {
t.Fatalf("can't get payload: %v", err)
}
// TODO(493456442, marius) this test can be flaky since we rely on a 100ms
// allowance for block generation internally.
if len(payload.Transactions) == 0 {
t.Fatalf("payload should not be empty")
}
execResp, err := api.NewPayloadV1(*payload)
if err != nil {
t.Fatalf("can't execute payload: %v", err)
}
if execResp.Status != beacon.VALID {
t.Fatalf("invalid status: %v", execResp.Status)
}
fcState = beacon.ForkchoiceStateV1{
HeadBlockHash: payload.BlockHash,
SafeBlockHash: payload.ParentHash,
FinalizedBlockHash: payload.ParentHash,
}
if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
t.Fatalf("Failed to insert block: %v", err)
}
if ethservice.BlockChain().CurrentBlock().NumberU64() != payload.Number {
t.Fatalf("Chain head should be updated")
}
parent = ethservice.BlockChain().CurrentBlock()
}
}
func assembleBlock(api *ConsensusAPI, parentHash common.Hash, params *beacon.PayloadAttributesV1) (*beacon.ExecutableDataV1, error) {
block, err := api.eth.Miner().GetSealingBlockSync(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random, false)
if err != nil {
return nil, err
}
return beacon.BlockToExecutableData(block), nil
}

@ -18,6 +18,7 @@ package catalyst
import (
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/beacon"
@ -34,11 +35,52 @@ const maxTrackedPayloads = 10
// latest one; but have a slight wiggle room for non-ideal conditions.
const maxTrackedHeaders = 10
// payload wraps the miner's block production channel, allowing the mined block
// to be retrieved later upon the GetPayload engine API call.
type payload struct {
lock sync.Mutex
done bool
empty *types.Block
block *types.Block
result chan *types.Block
}
// resolve extracts the generated full block from the given channel if possible
// or fallback to empty block as an alternative.
func (req *payload) resolve() *beacon.ExecutableDataV1 {
// this function can be called concurrently, prevent any
// concurrency issue in the first place.
req.lock.Lock()
defer req.lock.Unlock()
// Try to resolve the full block first if it's not obtained
// yet. The returned block can be nil if the generation fails.
if !req.done {
timeout := time.NewTimer(500 * time.Millisecond)
defer timeout.Stop()
select {
case req.block = <-req.result:
req.done = true
case <-timeout.C:
// TODO(rjl49345642, Marius), should we keep this
// 100ms timeout allowance? Why not just use the
// default and then fallback to empty directly?
}
}
if req.block != nil {
return beacon.BlockToExecutableData(req.block)
}
return beacon.BlockToExecutableData(req.empty)
}
// payloadQueueItem represents an id->payload tuple to store until it's retrieved
// or evicted.
type payloadQueueItem struct {
id beacon.PayloadID
payload *beacon.ExecutableDataV1
id beacon.PayloadID
data *payload
}
// payloadQueue tracks the latest handful of constructed payloads to be retrieved
@ -57,14 +99,14 @@ func newPayloadQueue() *payloadQueue {
}
// put inserts a new payload into the queue at the given id.
func (q *payloadQueue) put(id beacon.PayloadID, data *beacon.ExecutableDataV1) {
func (q *payloadQueue) put(id beacon.PayloadID, data *payload) {
q.lock.Lock()
defer q.lock.Unlock()
copy(q.payloads[1:], q.payloads)
q.payloads[0] = &payloadQueueItem{
id: id,
payload: data,
id: id,
data: data,
}
}
@ -78,7 +120,7 @@ func (q *payloadQueue) get(id beacon.PayloadID) *beacon.ExecutableDataV1 {
return nil // no more items
}
if item.id == id {
return item.payload
return item.data.resolve()
}
}
return nil

@ -235,14 +235,32 @@ func (miner *Miner) DisablePreseal() {
miner.worker.disablePreseal()
}
// GetSealingBlock retrieves a sealing block based on the given parameters.
// The returned block is not sealed but all other fields should be filled.
func (miner *Miner) GetSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*types.Block, error) {
return miner.worker.getSealingBlock(parent, timestamp, coinbase, random)
}
// SubscribePendingLogs starts delivering logs from pending transactions
// to the given channel.
func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription {
return miner.worker.pendingLogsFeed.Subscribe(ch)
}
// GetSealingBlockAsync requests to generate a sealing block according to the
// given parameters. Regardless of whether the generation is successful or not,
// there is always a result that will be returned through the result channel.
// The difference is that if the execution fails, the returned result is nil
// and the concrete error is dropped silently.
func (miner *Miner) GetSealingBlockAsync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, error) {
resCh, _, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs)
if err != nil {
return nil, err
}
return resCh, nil
}
// GetSealingBlockSync creates a sealing block according to the given parameters.
// If the generation is failed or the underlying work is already closed, an error
// will be returned.
func (miner *Miner) GetSealingBlockSync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (*types.Block, error) {
resCh, errCh, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs)
if err != nil {
return nil, err
}
return <-resCh, <-errCh
}

@ -170,8 +170,8 @@ type newWorkReq struct {
// getWorkReq represents a request for getting a new sealing work with provided parameters.
type getWorkReq struct {
params *generateParams
err error
result chan *types.Block
result chan *types.Block // non-blocking channel
err chan error
}
// intervalAdjust represents a resubmitting interval adjustment.
@ -536,12 +536,12 @@ func (w *worker) mainLoop() {
case req := <-w.getWorkCh:
block, err := w.generateWork(req.params)
if err != nil {
req.err = err
req.err <- err
req.result <- nil
} else {
req.err <- nil
req.result <- block
}
case ev := <-w.chainSideCh:
// Short circuit for duplicate side blocks
if _, exist := w.localUncles[ev.Block.Hash()]; exist {
@ -969,6 +969,7 @@ type generateParams struct {
random common.Hash // The randomness generated by beacon chain, empty before the merge
noUncle bool // Flag whether the uncle block inclusion is allowed
noExtra bool // Flag whether the extra field assignment is allowed
noTxs bool // Flag whether an empty block without any transaction is expected
}
// prepareWork constructs the sealing task according to the given parameters,
@ -1090,8 +1091,9 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) {
}
defer work.discard()
w.fillTransactions(nil, work)
if !params.noTxs {
w.fillTransactions(nil, work)
}
return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
}
@ -1128,7 +1130,6 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
work.discard()
return
}
w.commit(work.copy(), w.fullTaskHook, true, start)
// Swap out the old work with the new one, terminating any leftover
@ -1177,7 +1178,13 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti
}
// getSealingBlock generates the sealing block based on the given parameters.
func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*types.Block, error) {
// The generation result will be passed back via the given channel no matter
// the generation itself succeeds or not.
func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, chan error, error) {
var (
resCh = make(chan *types.Block, 1)
errCh = make(chan error, 1)
)
req := &getWorkReq{
params: &generateParams{
timestamp: timestamp,
@ -1187,18 +1194,16 @@ func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase
random: random,
noUncle: true,
noExtra: true,
noTxs: noTxs,
},
result: make(chan *types.Block, 1),
result: resCh,
err: errCh,
}
select {
case w.getWorkCh <- req:
block := <-req.result
if block == nil {
return nil, req.err
}
return block, nil
return resCh, errCh, nil
case <-w.exitCh:
return nil, errors.New("miner closed")
return nil, nil, errors.New("miner closed")
}
}

@ -638,7 +638,9 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co
// This API should work even when the automatic sealing is not enabled
for _, c := range cases {
block, err := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random)
resChan, errChan, _ := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false)
block := <-resChan
err := <-errChan
if c.expectErr {
if err == nil {
t.Error("Expect error but get nil")
@ -654,7 +656,9 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co
// This API should work even when the automatic sealing is enabled
w.start()
for _, c := range cases {
block, err := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random)
resChan, errChan, _ := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false)
block := <-resChan
err := <-errChan
if c.expectErr {
if err == nil {
t.Error("Expect error but get nil")

Loading…
Cancel
Save