eth/catalyst: ensure period zero mode leaves no pending txs in pool (#30264)

closes #29475, replaces #29657, #30104 

Fixes two issues. First is a deadlock where the txpool attempts to reorg, but can't complete because there are no readers left for the new txs subscription. Second, resolves a problem with on demand mode where txs may be left pending when there are more pending txs than block space.

Co-authored-by: Martin Holst Swende <martin@swende.se>
pull/30327/head
lightclient 3 months ago committed by GitHub
parent 41b3b30863
commit 84565dc899
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 21
      eth/catalyst/api.go
  2. 87
      eth/catalyst/simulated_beacon.go
  3. 72
      eth/catalyst/simulated_beacon_api.go
  4. 68
      eth/catalyst/simulated_beacon_test.go

@ -184,7 +184,7 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update engine.ForkchoiceStateV1, pa
return engine.STATUS_INVALID, engine.InvalidParams.With(errors.New("forkChoiceUpdateV1 called post-shanghai"))
}
}
return api.forkchoiceUpdated(update, payloadAttributes, engine.PayloadV1, false)
return api.forkchoiceUpdated(update, payloadAttributes, engine.PayloadV1)
}
// ForkchoiceUpdatedV2 is equivalent to V1 with the addition of withdrawals in the payload
@ -207,7 +207,7 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV2(update engine.ForkchoiceStateV1, pa
return engine.STATUS_INVALID, engine.UnsupportedFork.With(errors.New("forkchoiceUpdatedV2 must only be called with paris and shanghai payloads"))
}
}
return api.forkchoiceUpdated(update, params, engine.PayloadV2, false)
return api.forkchoiceUpdated(update, params, engine.PayloadV2)
}
// ForkchoiceUpdatedV3 is equivalent to V2 with the addition of parent beacon block root
@ -228,10 +228,10 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV3(update engine.ForkchoiceStateV1, pa
// hash, even if params are wrong. To do this we need to split up
// forkchoiceUpdate into a function that only updates the head and then a
// function that kicks off block construction.
return api.forkchoiceUpdated(update, params, engine.PayloadV3, false)
return api.forkchoiceUpdated(update, params, engine.PayloadV3)
}
func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, payloadVersion engine.PayloadVersion, simulatorMode bool) (engine.ForkChoiceResponse, error) {
func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, payloadVersion engine.PayloadVersion) (engine.ForkChoiceResponse, error) {
api.forkchoiceLock.Lock()
defer api.forkchoiceLock.Unlock()
@ -374,19 +374,6 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
if api.localBlocks.has(id) {
return valid(&id), nil
}
// If the beacon chain is ran by a simulator, then transaction insertion,
// block insertion and block production will happen without any timing
// delay between them. This will cause flaky simulator executions due to
// the transaction pool running its internal reset operation on a back-
// ground thread. To avoid the racey behavior - in simulator mode - the
// pool will be explicitly blocked on its reset before continuing to the
// block production below.
if simulatorMode {
if err := api.eth.TxPool().Sync(); err != nil {
log.Error("Failed to sync transaction pool", "err", err)
return valid(nil), engine.InvalidPayloadAttributes.With(err)
}
}
payload, err := api.eth.Miner().BuildPayload(args)
if err != nil {
log.Error("Failed to build payload", "err", err)

@ -20,6 +20,7 @@ import (
"crypto/rand"
"crypto/sha256"
"errors"
"fmt"
"math/big"
"sync"
"time"
@ -30,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
@ -41,36 +43,46 @@ const devEpochLength = 32
// withdrawalQueue implements a FIFO queue which holds withdrawals that are
// pending inclusion.
type withdrawalQueue struct {
pending chan *types.Withdrawal
pending types.Withdrawals
mu sync.Mutex
feed event.Feed
subs event.SubscriptionScope
}
type newWithdrawalsEvent struct{ Withdrawals types.Withdrawals }
// add queues a withdrawal for future inclusion.
func (w *withdrawalQueue) add(withdrawal *types.Withdrawal) error {
select {
case w.pending <- withdrawal:
break
default:
return errors.New("withdrawal queue full")
}
w.mu.Lock()
w.pending = append(w.pending, withdrawal)
w.mu.Unlock()
w.feed.Send(newWithdrawalsEvent{types.Withdrawals{withdrawal}})
return nil
}
// gatherPending returns a number of queued withdrawals up to a maximum count.
func (w *withdrawalQueue) gatherPending(maxCount int) []*types.Withdrawal {
withdrawals := []*types.Withdrawal{}
for {
select {
case withdrawal := <-w.pending:
withdrawals = append(withdrawals, withdrawal)
if len(withdrawals) == maxCount {
return withdrawals
}
default:
return withdrawals
}
}
// pop dequeues the specified number of withdrawals from the queue.
func (w *withdrawalQueue) pop(count int) types.Withdrawals {
w.mu.Lock()
defer w.mu.Unlock()
count = min(count, len(w.pending))
popped := w.pending[0:count]
w.pending = w.pending[count:]
return popped
}
// subscribe allows a listener to be updated when new withdrawals are added to
// the queue.
func (w *withdrawalQueue) subscribe(ch chan<- newWithdrawalsEvent) event.Subscription {
sub := w.feed.Subscribe(ch)
return w.subs.Track(sub)
}
// SimulatedBeacon drives an Ethereum instance as if it were a real beacon
// client. It can run in period mode where it mines a new block every period
// (seconds) or on every transaction via Commit, Fork and AdjustTime.
type SimulatedBeacon struct {
shutdownCh chan struct{}
eth *eth.Ethereum
@ -86,10 +98,6 @@ type SimulatedBeacon struct {
}
// NewSimulatedBeacon constructs a new simulated beacon chain.
// Period sets the period in which blocks should be produced.
//
// - If period is set to 0, a block is produced on every transaction.
// via Commit, Fork and AdjustTime.
func NewSimulatedBeacon(period uint64, eth *eth.Ethereum) (*SimulatedBeacon, error) {
block := eth.BlockChain().CurrentBlock()
current := engine.ForkchoiceStateV1{
@ -112,7 +120,6 @@ func NewSimulatedBeacon(period uint64, eth *eth.Ethereum) (*SimulatedBeacon, err
engineAPI: engineAPI,
lastBlockTime: block.Time,
curForkchoiceState: current,
withdrawals: withdrawalQueue{make(chan *types.Withdrawal, 20)},
}, nil
}
@ -156,6 +163,16 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
c.setCurrentState(header.Hash(), *finalizedHash)
}
// Because transaction insertion, block insertion, and block production will
// happen without any timing delay between them in simulator mode and the
// transaction pool will be running its internal reset operation on a
// background thread, flaky executions can happen. To avoid the racey
// behavior, the pool will be explicitly blocked on its reset before
// continuing to the block production below.
if err := c.eth.APIBackend.TxPool().Sync(); err != nil {
return fmt.Errorf("failed to sync txpool: %w", err)
}
var random [32]byte
rand.Read(random[:])
fcResponse, err := c.engineAPI.forkchoiceUpdated(c.curForkchoiceState, &engine.PayloadAttributes{
@ -164,13 +181,14 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
Withdrawals: withdrawals,
Random: random,
BeaconRoot: &common.Hash{},
}, engine.PayloadV3, true)
}, engine.PayloadV3)
if err != nil {
return err
}
if fcResponse == engine.STATUS_SYNCING {
return errors.New("chain rewind prevented invocation of payload creation")
}
envelope, err := c.engineAPI.getPayload(*fcResponse.PayloadID, true)
if err != nil {
return err
@ -223,8 +241,7 @@ func (c *SimulatedBeacon) loop() {
case <-c.shutdownCh:
return
case <-timer.C:
withdrawals := c.withdrawals.gatherPending(10)
if err := c.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil {
if err := c.sealBlock(c.withdrawals.pop(10), uint64(time.Now().Unix())); err != nil {
log.Warn("Error performing sealing work", "err", err)
} else {
timer.Reset(time.Second * time.Duration(c.period))
@ -260,7 +277,7 @@ func (c *SimulatedBeacon) setCurrentState(headHash, finalizedHash common.Hash) {
// Commit seals a block on demand.
func (c *SimulatedBeacon) Commit() common.Hash {
withdrawals := c.withdrawals.gatherPending(10)
withdrawals := c.withdrawals.pop(10)
if err := c.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil {
log.Warn("Error performing sealing work", "err", err)
}
@ -301,16 +318,14 @@ func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error {
if parent == nil {
return errors.New("parent not found")
}
withdrawals := c.withdrawals.gatherPending(10)
withdrawals := c.withdrawals.pop(10)
return c.sealBlock(withdrawals, parent.Time+uint64(adjustment/time.Second))
}
// RegisterSimulatedBeaconAPIs registers the simulated beacon's API with the
// stack.
func RegisterSimulatedBeaconAPIs(stack *node.Node, sim *SimulatedBeacon) {
api := &api{sim}
if sim.period == 0 {
// mine on demand if period is set to 0
go api.loop()
}
api := newSimulatedBeaconAPI(sim)
stack.RegisterAPIs([]rpc.API{
{
Namespace: "dev",

@ -18,44 +18,88 @@ package catalyst
import (
"context"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type api struct {
// simulatedBeaconAPI provides a RPC API for SimulatedBeacon.
type simulatedBeaconAPI struct {
sim *SimulatedBeacon
}
func (a *api) loop() {
// newSimulatedBeaconAPI returns an instance of simulatedBeaconAPI with a
// buffered commit channel. If period is zero, it starts a goroutine to handle
// new tx events.
func newSimulatedBeaconAPI(sim *SimulatedBeacon) *simulatedBeaconAPI {
api := &simulatedBeaconAPI{sim: sim}
if sim.period == 0 {
// mine on demand if period is set to 0
go api.loop()
}
return api
}
// loop is the main loop for the API when it's running in period = 0 mode. It
// ensures that block production is triggered as soon as a new withdrawal or
// transaction is received.
func (a *simulatedBeaconAPI) loop() {
var (
newTxs = make(chan core.NewTxsEvent)
sub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true)
newTxs = make(chan core.NewTxsEvent)
newWxs = make(chan newWithdrawalsEvent)
newTxsSub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true)
newWxsSub = a.sim.withdrawals.subscribe(newWxs)
doCommit = make(chan struct{}, 1)
)
defer sub.Unsubscribe()
defer newTxsSub.Unsubscribe()
defer newWxsSub.Unsubscribe()
// A background thread which signals to the simulator when to commit
// based on messages over doCommit.
go func() {
for range doCommit {
a.sim.Commit()
a.sim.eth.TxPool().Sync()
// It's worth noting that in case a tx ends up in the pool listed as
// "executable", but for whatever reason the miner does not include it in
// a block -- maybe the miner is enforcing a higher tip than the pool --
// this code will spinloop.
for {
if executable, _ := a.sim.eth.TxPool().Stats(); executable == 0 {
break
}
a.sim.Commit()
}
}
}()
for {
select {
case <-a.sim.shutdownCh:
close(doCommit)
return
case w := <-a.sim.withdrawals.pending:
withdrawals := append(a.sim.withdrawals.gatherPending(9), w)
if err := a.sim.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil {
log.Warn("Error performing sealing work", "err", err)
case <-newWxs:
select {
case doCommit <- struct{}{}:
default:
}
case <-newTxs:
a.sim.Commit()
select {
case doCommit <- struct{}{}:
default:
}
}
}
}
func (a *api) AddWithdrawal(ctx context.Context, withdrawal *types.Withdrawal) error {
// AddWithdrawal adds a withdrawal to the pending queue.
func (a *simulatedBeaconAPI) AddWithdrawal(ctx context.Context, withdrawal *types.Withdrawal) error {
return a.sim.withdrawals.add(withdrawal)
}
func (a *api) SetFeeRecipient(ctx context.Context, feeRecipient common.Address) {
// SetFeeRecipient sets the fee recipient for block building purposes.
func (a *simulatedBeaconAPI) SetFeeRecipient(ctx context.Context, feeRecipient common.Address) {
a.sim.setFeeRecipient(feeRecipient)
}

@ -35,7 +35,7 @@ import (
"github.com/ethereum/go-ethereum/params"
)
func startSimulatedBeaconEthService(t *testing.T, genesis *core.Genesis) (*node.Node, *eth.Ethereum, *SimulatedBeacon) {
func startSimulatedBeaconEthService(t *testing.T, genesis *core.Genesis, period uint64) (*node.Node, *eth.Ethereum, *SimulatedBeacon) {
t.Helper()
n, err := node.New(&node.Config{
@ -55,7 +55,7 @@ func startSimulatedBeaconEthService(t *testing.T, genesis *core.Genesis) (*node.
t.Fatal("can't create eth service:", err)
}
simBeacon, err := NewSimulatedBeacon(1, ethservice)
simBeacon, err := NewSimulatedBeacon(period, ethservice)
if err != nil {
t.Fatal("can't create simulated beacon:", err)
}
@ -87,7 +87,7 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) {
// short period (1 second) for testing purposes
var gasLimit uint64 = 10_000_000
genesis := core.DeveloperGenesisBlock(gasLimit, &testAddr)
node, ethService, mock := startSimulatedBeaconEthService(t, genesis)
node, ethService, mock := startSimulatedBeaconEthService(t, genesis, 1)
_ = mock
defer node.Close()
@ -140,3 +140,65 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) {
}
}
}
// Tests that zero-period dev mode can handle a lot of simultaneous
// transactions/withdrawals
func TestOnDemandSpam(t *testing.T) {
var (
withdrawals []types.Withdrawal
txs = make(map[common.Hash]*types.Transaction)
testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
testAddr = crypto.PubkeyToAddress(testKey.PublicKey)
gasLimit uint64 = 10_000_000
genesis = core.DeveloperGenesisBlock(gasLimit, &testAddr)
node, eth, mock = startSimulatedBeaconEthService(t, genesis, 0)
_ = newSimulatedBeaconAPI(mock)
signer = types.LatestSigner(eth.BlockChain().Config())
chainHeadCh = make(chan core.ChainHeadEvent, 100)
sub = eth.BlockChain().SubscribeChainHeadEvent(chainHeadCh)
)
defer node.Close()
defer sub.Unsubscribe()
// generate some withdrawals
for i := 0; i < 20; i++ {
withdrawals = append(withdrawals, types.Withdrawal{Index: uint64(i)})
if err := mock.withdrawals.add(&withdrawals[i]); err != nil {
t.Fatal("addWithdrawal failed", err)
}
}
// generate a bunch of transactions
for i := 0; i < 20000; i++ {
tx, err := types.SignTx(types.NewTransaction(uint64(i), common.Address{byte(i), byte(1)}, big.NewInt(1000), params.TxGas, big.NewInt(params.InitialBaseFee*2), nil), signer, testKey)
if err != nil {
t.Fatal("error signing transaction", err)
}
txs[tx.Hash()] = tx
if err := eth.APIBackend.SendTx(context.Background(), tx); err != nil {
t.Fatal("error adding txs to pool", err)
}
}
var (
includedTxs = make(map[common.Hash]struct{})
includedWxs []uint64
)
for {
select {
case evt := <-chainHeadCh:
for _, itx := range evt.Block.Transactions() {
includedTxs[itx.Hash()] = struct{}{}
}
for _, iwx := range evt.Block.Withdrawals() {
includedWxs = append(includedWxs, iwx.Index)
}
// ensure all withdrawals/txs included. this will take two blocks b/c number of withdrawals > 10
if len(includedTxs) == len(txs) && len(includedWxs) == len(withdrawals) {
return
}
case <-time.After(10 * time.Second):
t.Fatalf("timed out without including all withdrawals/txs: have txs %d, want %d, have wxs %d, want %d", len(includedTxs), len(txs), len(includedWxs), len(withdrawals))
}
}
}

Loading…
Cancel
Save