core, eth, miner: start propagating and consuming blob txs (#28243)

* core, eth, miner: start propagating and consuming blob txs

* eth/protocols/eth: disable eth/67 if Cancun is enabled

* core/txpool, eth, miner: pass gas limit infos in lazy tx for mienr filtering

* core/txpool, miner: add lazy resolver for pending txs too

* core, eth: fix review noticed bugs

* eth, miner: minor polishes in the mining and announcing logs

* core/expool: unsubscribe the event scope
pull/28254/head
Péter Szilágyi 12 months ago committed by GitHub
parent bc6d184872
commit a8a9c8e4b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 55
      core/txpool/blobpool/blobpool.go
  2. 18
      core/txpool/legacypool/legacypool.go
  3. 19
      core/txpool/subpool.go
  4. 12
      core/txpool/txpool.go
  5. 2
      eth/api_backend.go
  6. 2
      eth/catalyst/simulated_beacon.go
  7. 41
      eth/handler.go
  8. 14
      eth/handler_eth.go
  9. 4
      eth/handler_eth_test.go
  10. 6
      eth/handler_test.go
  11. 4
      eth/protocols/eth/handler.go
  12. 4
      eth/protocols/eth/handlers.go
  13. 4
      miner/ordering_test.go
  14. 33
      miner/worker.go

@ -97,6 +97,8 @@ type blobTxMeta struct {
execTipCap *uint256.Int // Needed to prioritize inclusion order across accounts and validate replacement price bump execTipCap *uint256.Int // Needed to prioritize inclusion order across accounts and validate replacement price bump
execFeeCap *uint256.Int // Needed to validate replacement price bump execFeeCap *uint256.Int // Needed to validate replacement price bump
blobFeeCap *uint256.Int // Needed to validate replacement price bump blobFeeCap *uint256.Int // Needed to validate replacement price bump
execGas uint64 // Needed to check inclusion validity before reading the blob
blobGas uint64 // Needed to check inclusion validity before reading the blob
basefeeJumps float64 // Absolute number of 1559 fee adjustments needed to reach the tx's fee cap basefeeJumps float64 // Absolute number of 1559 fee adjustments needed to reach the tx's fee cap
blobfeeJumps float64 // Absolute number of 4844 fee adjustments needed to reach the tx's blob fee cap blobfeeJumps float64 // Absolute number of 4844 fee adjustments needed to reach the tx's blob fee cap
@ -118,6 +120,8 @@ func newBlobTxMeta(id uint64, size uint32, tx *types.Transaction) *blobTxMeta {
execTipCap: uint256.MustFromBig(tx.GasTipCap()), execTipCap: uint256.MustFromBig(tx.GasTipCap()),
execFeeCap: uint256.MustFromBig(tx.GasFeeCap()), execFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()), blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()),
execGas: tx.Gas(),
blobGas: tx.BlobGas(),
} }
meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap) meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap)
meta.blobfeeJumps = dynamicFeeJumps(meta.blobFeeCap) meta.blobfeeJumps = dynamicFeeJumps(meta.blobFeeCap)
@ -307,8 +311,8 @@ type BlobPool struct {
spent map[common.Address]*uint256.Int // Expenditure tracking for individual accounts spent map[common.Address]*uint256.Int // Expenditure tracking for individual accounts
evict *evictHeap // Heap of cheapest accounts for eviction when full evict *evictHeap // Heap of cheapest accounts for eviction when full
eventFeed event.Feed // Event feed to send out new tx events on pool inclusion discoverFeed event.Feed // Event feed to send out new tx events on pool discovery (reorg excluded)
eventScope event.SubscriptionScope // Event scope to track and mass unsubscribe on termination insertFeed event.Feed // Event feed to send out new tx events on pool inclusion (reorg included)
lock sync.RWMutex // Mutex protecting the pool during reorg handling lock sync.RWMutex // Mutex protecting the pool during reorg handling
} }
@ -436,8 +440,6 @@ func (p *BlobPool) Close() error {
if err := p.store.Close(); err != nil { if err := p.store.Close(); err != nil {
errs = append(errs, err) errs = append(errs, err)
} }
p.eventScope.Close()
switch { switch {
case errs == nil: case errs == nil:
return nil return nil
@ -758,15 +760,21 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
// Run the reorg between the old and new head and figure out which accounts // Run the reorg between the old and new head and figure out which accounts
// need to be rechecked and which transactions need to be readded // need to be rechecked and which transactions need to be readded
if reinject, inclusions := p.reorg(oldHead, newHead); reinject != nil { if reinject, inclusions := p.reorg(oldHead, newHead); reinject != nil {
var adds []*types.Transaction
for addr, txs := range reinject { for addr, txs := range reinject {
// Blindly push all the lost transactions back into the pool // Blindly push all the lost transactions back into the pool
for _, tx := range txs { for _, tx := range txs {
p.reinject(addr, tx.Hash()) if err := p.reinject(addr, tx.Hash()); err == nil {
adds = append(adds, tx.WithoutBlobTxSidecar())
}
} }
// Recheck the account's pooled transactions to drop included and // Recheck the account's pooled transactions to drop included and
// invalidated one // invalidated one
p.recheck(addr, inclusions) p.recheck(addr, inclusions)
} }
if len(adds) > 0 {
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
}
} }
// Flush out any blobs from limbo that are older than the latest finality // Flush out any blobs from limbo that are older than the latest finality
if p.chain.Config().IsCancun(p.head.Number, p.head.Time) { if p.chain.Config().IsCancun(p.head.Number, p.head.Time) {
@ -921,13 +929,13 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*
// Note, the method will not initialize the eviction cache values as those will // Note, the method will not initialize the eviction cache values as those will
// be done once for all transactions belonging to an account after all individual // be done once for all transactions belonging to an account after all individual
// transactions are injected back into the pool. // transactions are injected back into the pool.
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) { func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
// Retrieve the associated blob from the limbo. Without the blobs, we cannot // Retrieve the associated blob from the limbo. Without the blobs, we cannot
// add the transaction back into the pool as it is not mineable. // add the transaction back into the pool as it is not mineable.
tx, err := p.limbo.pull(txhash) tx, err := p.limbo.pull(txhash)
if err != nil { if err != nil {
log.Error("Blobs unavailable, dropping reorged tx", "err", err) log.Error("Blobs unavailable, dropping reorged tx", "err", err)
return return err
} }
// TODO: seems like an easy optimization here would be getting the serialized tx // TODO: seems like an easy optimization here would be getting the serialized tx
// from limbo instead of re-serializing it here. // from limbo instead of re-serializing it here.
@ -936,12 +944,12 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
blob, err := rlp.EncodeToBytes(tx) blob, err := rlp.EncodeToBytes(tx)
if err != nil { if err != nil {
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err) log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
return return err
} }
id, err := p.store.Put(blob) id, err := p.store.Put(blob)
if err != nil { if err != nil {
log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err) log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err)
return return err
} }
// Update the indixes and metrics // Update the indixes and metrics
@ -949,7 +957,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
if _, ok := p.index[addr]; !ok { if _, ok := p.index[addr]; !ok {
if err := p.reserve(addr, true); err != nil { if err := p.reserve(addr, true); err != nil {
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err) log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
return return err
} }
p.index[addr] = []*blobTxMeta{meta} p.index[addr] = []*blobTxMeta{meta}
p.spent[addr] = meta.costCap p.spent[addr] = meta.costCap
@ -960,6 +968,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
} }
p.lookup[meta.hash] = meta.id p.lookup[meta.hash] = meta.id
p.stored += uint64(meta.size) p.stored += uint64(meta.size)
return nil
} }
// SetGasTip implements txpool.SubPool, allowing the blob pool's gas requirements // SetGasTip implements txpool.SubPool, allowing the blob pool's gas requirements
@ -1154,9 +1163,19 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
// Add inserts a set of blob transactions into the pool if they pass validation (both // Add inserts a set of blob transactions into the pool if they pass validation (both
// consensus validity and pool restictions). // consensus validity and pool restictions).
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error { func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
errs := make([]error, len(txs)) var (
adds = make([]*types.Transaction, 0, len(txs))
errs = make([]error, len(txs))
)
for i, tx := range txs { for i, tx := range txs {
errs[i] = p.add(tx) errs[i] = p.add(tx)
if errs[i] == nil {
adds = append(adds, tx.WithoutBlobTxSidecar())
}
}
if len(adds) > 0 {
p.discoverFeed.Send(core.NewTxsEvent{Txs: adds})
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
} }
return errs return errs
} }
@ -1384,6 +1403,8 @@ func (p *BlobPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTr
Time: time.Now(), // TODO(karalabe): Maybe save these and use that? Time: time.Now(), // TODO(karalabe): Maybe save these and use that?
GasFeeCap: tx.execFeeCap.ToBig(), GasFeeCap: tx.execFeeCap.ToBig(),
GasTipCap: tx.execTipCap.ToBig(), GasTipCap: tx.execTipCap.ToBig(),
Gas: tx.execGas,
BlobGas: tx.blobGas,
}) })
} }
if len(lazies) > 0 { if len(lazies) > 0 {
@ -1468,10 +1489,14 @@ func (p *BlobPool) updateLimboMetrics() {
limboSlotusedGauge.Update(int64(slotused)) limboSlotusedGauge.Update(int64(slotused))
} }
// SubscribeTransactions registers a subscription of NewTxsEvent and // SubscribeTransactions registers a subscription for new transaction events,
// starts sending event to the given channel. // supporting feeding only newly seen or also resurrected transactions.
func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription { func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
return p.eventScope.Track(p.eventFeed.Subscribe(ch)) if reorgs {
return p.insertFeed.Subscribe(ch)
} else {
return p.discoverFeed.Subscribe(ch)
}
} }
// Nonce returns the next nonce of an account, with all transactions executable // Nonce returns the next nonce of an account, with all transactions executable

@ -208,7 +208,6 @@ type LegacyPool struct {
chain BlockChain chain BlockChain
gasTip atomic.Pointer[big.Int] gasTip atomic.Pointer[big.Int]
txFeed event.Feed txFeed event.Feed
scope event.SubscriptionScope
signer types.Signer signer types.Signer
mu sync.RWMutex mu sync.RWMutex
@ -404,9 +403,6 @@ func (pool *LegacyPool) loop() {
// Close terminates the transaction pool. // Close terminates the transaction pool.
func (pool *LegacyPool) Close() error { func (pool *LegacyPool) Close() error {
// Unsubscribe all subscriptions registered from txpool
pool.scope.Close()
// Terminate the pool reorger and return // Terminate the pool reorger and return
close(pool.reorgShutdownCh) close(pool.reorgShutdownCh)
pool.wg.Wait() pool.wg.Wait()
@ -425,10 +421,14 @@ func (pool *LegacyPool) Reset(oldHead, newHead *types.Header) {
<-wait <-wait
} }
// SubscribeTransactions registers a subscription of NewTxsEvent and // SubscribeTransactions registers a subscription for new transaction events,
// starts sending event to the given channel. // supporting feeding only newly seen or also resurrected transactions.
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription { func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
return pool.scope.Track(pool.txFeed.Subscribe(ch)) // The legacy pool has a very messed up internal shuffling, so it's kind of
// hard to separate newly discovered transaction from resurrected ones. This
// is because the new txs are added to the queue, resurrected ones too and
// reorgs run lazily, so separating the two would need a marker.
return pool.txFeed.Subscribe(ch)
} }
// SetGasTip updates the minimum gas tip required by the transaction pool for a // SetGasTip updates the minimum gas tip required by the transaction pool for a
@ -552,6 +552,8 @@ func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.L
Time: txs[i].Time(), Time: txs[i].Time(),
GasFeeCap: txs[i].GasFeeCap(), GasFeeCap: txs[i].GasFeeCap(),
GasTipCap: txs[i].GasTipCap(), GasTipCap: txs[i].GasTipCap(),
Gas: txs[i].Gas(),
BlobGas: txs[i].BlobGas(),
} }
} }
pending[addr] = lazies pending[addr] = lazies

@ -30,13 +30,16 @@ import (
// enough for the miner and other APIs to handle large batches of transactions; // enough for the miner and other APIs to handle large batches of transactions;
// and supports pulling up the entire transaction when really needed. // and supports pulling up the entire transaction when really needed.
type LazyTransaction struct { type LazyTransaction struct {
Pool SubPool // Transaction subpool to pull the real transaction up Pool LazyResolver // Transaction resolver to pull the real transaction up
Hash common.Hash // Transaction hash to pull up if needed Hash common.Hash // Transaction hash to pull up if needed
Tx *types.Transaction // Transaction if already resolved Tx *types.Transaction // Transaction if already resolved
Time time.Time // Time when the transaction was first seen Time time.Time // Time when the transaction was first seen
GasFeeCap *big.Int // Maximum fee per gas the transaction may consume GasFeeCap *big.Int // Maximum fee per gas the transaction may consume
GasTipCap *big.Int // Maximum miner tip per gas the transaction can pay GasTipCap *big.Int // Maximum miner tip per gas the transaction can pay
Gas uint64 // Amount of gas required by the transaction
BlobGas uint64 // Amount of blob gas required by the transaction
} }
// Resolve retrieves the full transaction belonging to a lazy handle if it is still // Resolve retrieves the full transaction belonging to a lazy handle if it is still
@ -48,6 +51,14 @@ func (ltx *LazyTransaction) Resolve() *types.Transaction {
return ltx.Tx return ltx.Tx
} }
// LazyResolver is a minimal interface needed for a transaction pool to satisfy
// resolving lazy transactions. It's mostly a helper to avoid the entire sub-
// pool being injected into the lazy transaction.
type LazyResolver interface {
// Get returns a transaction if it is contained in the pool, or nil otherwise.
Get(hash common.Hash) *types.Transaction
}
// AddressReserver is passed by the main transaction pool to subpools, so they // AddressReserver is passed by the main transaction pool to subpools, so they
// may request (and relinquish) exclusive access to certain addresses. // may request (and relinquish) exclusive access to certain addresses.
type AddressReserver func(addr common.Address, reserve bool) error type AddressReserver func(addr common.Address, reserve bool) error
@ -99,8 +110,10 @@ type SubPool interface {
// account and sorted by nonce. // account and sorted by nonce.
Pending(enforceTips bool) map[common.Address][]*LazyTransaction Pending(enforceTips bool) map[common.Address][]*LazyTransaction
// SubscribeTransactions subscribes to new transaction events. // SubscribeTransactions subscribes to new transaction events. The subscriber
SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription // can decide whether to receive notifications only for newly seen transactions
// or also for reorged out ones.
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription
// Nonce returns the next nonce of an account, with all transactions executable // Nonce returns the next nonce of an account, with all transactions executable
// by the pool already applied on top. // by the pool already applied on top.

@ -155,13 +155,15 @@ func (p *TxPool) Close() error {
if err := <-errc; err != nil { if err := <-errc; err != nil {
errs = append(errs, err) errs = append(errs, err)
} }
// Terminate each subpool // Terminate each subpool
for _, subpool := range p.subpools { for _, subpool := range p.subpools {
if err := subpool.Close(); err != nil { if err := subpool.Close(); err != nil {
errs = append(errs, err) errs = append(errs, err)
} }
} }
// Unsubscribe anyone still listening for tx events
p.subs.Close()
if len(errs) > 0 { if len(errs) > 0 {
return fmt.Errorf("subpool close errors: %v", errs) return fmt.Errorf("subpool close errors: %v", errs)
} }
@ -316,12 +318,12 @@ func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction
return txs return txs
} }
// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and starts sending // SubscribeTransactions registers a subscription for new transaction events,
// events to the given channel. // supporting feeding only newly seen or also resurrected transactions.
func (p *TxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
subs := make([]event.Subscription, len(p.subpools)) subs := make([]event.Subscription, len(p.subpools))
for i, subpool := range p.subpools { for i, subpool := range p.subpools {
subs[i] = subpool.SubscribeTransactions(ch) subs[i] = subpool.SubscribeTransactions(ch, reorgs)
} }
return p.subs.Track(event.JoinSubscriptions(subs...)) return p.subs.Track(event.JoinSubscriptions(subs...))
} }

@ -334,7 +334,7 @@ func (b *EthAPIBackend) TxPool() *txpool.TxPool {
} }
func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return b.eth.txPool.SubscribeNewTxsEvent(ch) return b.eth.txPool.SubscribeTransactions(ch, true)
} }
func (b *EthAPIBackend) SyncProgress() ethereum.SyncProgress { func (b *EthAPIBackend) SyncProgress() ethereum.SyncProgress {

@ -199,7 +199,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal) error {
func (c *SimulatedBeacon) loopOnDemand() { func (c *SimulatedBeacon) loopOnDemand() {
var ( var (
newTxs = make(chan core.NewTxsEvent) newTxs = make(chan core.NewTxsEvent)
sub = c.eth.TxPool().SubscribeNewTxsEvent(newTxs) sub = c.eth.TxPool().SubscribeTransactions(newTxs, true)
) )
defer sub.Unsubscribe() defer sub.Unsubscribe()

@ -75,9 +75,10 @@ type txPool interface {
// The slice should be modifiable by the caller. // The slice should be modifiable by the caller.
Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction
// SubscribeNewTxsEvent should return an event subscription of // SubscribeTransactions subscribes to new transaction events. The subscriber
// NewTxsEvent and send events to the given channel. // can decide whether to receive notifications only for newly seen transactions
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription // or also for reorged out ones.
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription
} }
// handlerConfig is the collection of initialization parameters to create a full // handlerConfig is the collection of initialization parameters to create a full
@ -509,10 +510,10 @@ func (h *handler) unregisterPeer(id string) {
func (h *handler) Start(maxPeers int) { func (h *handler) Start(maxPeers int) {
h.maxPeers = maxPeers h.maxPeers = maxPeers
// broadcast transactions // broadcast and announce transactions (only new ones, not resurrected ones)
h.wg.Add(1) h.wg.Add(1)
h.txsCh = make(chan core.NewTxsEvent, txChanSize) h.txsCh = make(chan core.NewTxsEvent, txChanSize)
h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh) h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false)
go h.txBroadcastLoop() go h.txBroadcastLoop()
// broadcast mined blocks // broadcast mined blocks
@ -592,26 +593,33 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
} }
// BroadcastTransactions will propagate a batch of transactions // BroadcastTransactions will propagate a batch of transactions
// - To a square root of all peers // - To a square root of all peers for non-blob transactions
// - And, separately, as announcements to all peers which are not known to // - And, separately, as announcements to all peers which are not known to
// already have the given transaction. // already have the given transaction.
func (h *handler) BroadcastTransactions(txs types.Transactions) { func (h *handler) BroadcastTransactions(txs types.Transactions) {
var ( var (
annoCount int // Count of announcements made blobTxs int // Number of blob transactions to announce only
annoPeers int largeTxs int // Number of large transactions to announce only
directCount int // Count of the txs sent directly to peers
directPeers int // Count of the peers that were sent transactions directly directCount int // Number of transactions sent directly to peers (duplicates included)
directPeers int // Number of peers that were sent transactions directly
annCount int // Number of transactions announced across all peers (duplicates included)
annPeers int // Number of peers announced about transactions
txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly
annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce
) )
// Broadcast transactions to a batch of peers not knowing about it // Broadcast transactions to a batch of peers not knowing about it
for _, tx := range txs { for _, tx := range txs {
peers := h.peers.peersWithoutTransaction(tx.Hash()) peers := h.peers.peersWithoutTransaction(tx.Hash())
var numDirect int var numDirect int
if tx.Size() <= txMaxBroadcastSize { switch {
case tx.Type() == types.BlobTxType:
blobTxs++
case tx.Size() > txMaxBroadcastSize:
largeTxs++
default:
numDirect = int(math.Sqrt(float64(len(peers)))) numDirect = int(math.Sqrt(float64(len(peers))))
} }
// Send the tx unconditionally to a subset of our peers // Send the tx unconditionally to a subset of our peers
@ -629,13 +637,12 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
peer.AsyncSendTransactions(hashes) peer.AsyncSendTransactions(hashes)
} }
for peer, hashes := range annos { for peer, hashes := range annos {
annoPeers++ annPeers++
annoCount += len(hashes) annCount += len(hashes)
peer.AsyncSendPooledTransactionHashes(hashes) peer.AsyncSendPooledTransactionHashes(hashes)
} }
log.Debug("Transaction broadcast", "txs", len(txs), log.Debug("Distributed transactions", "plaintxs", len(txs)-blobTxs-largeTxs, "blobtxs", blobTxs, "largetxs", largeTxs,
"announce packs", annoPeers, "announced hashes", annoCount, "bcastpeers", directPeers, "bcastcount", directCount, "annpeers", annPeers, "anncount", annCount)
"tx packs", directPeers, "broadcast txs", directCount)
} }
// minedBroadcastLoop sends mined blocks to connected peers. // minedBroadcastLoop sends mined blocks to connected peers.

@ -17,6 +17,7 @@
package eth package eth
import ( import (
"errors"
"fmt" "fmt"
"math/big" "math/big"
"time" "time"
@ -73,6 +74,11 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
return h.txFetcher.Notify(peer.ID(), packet.Hashes) return h.txFetcher.Notify(peer.ID(), packet.Hashes)
case *eth.TransactionsPacket: case *eth.TransactionsPacket:
for _, tx := range *packet {
if tx.Type() == types.BlobTxType {
return errors.New("disallowed broadcast blob transaction")
}
}
return h.txFetcher.Enqueue(peer.ID(), *packet, false) return h.txFetcher.Enqueue(peer.ID(), *packet, false)
case *eth.PooledTransactionsResponse: case *eth.PooledTransactionsResponse:
@ -90,9 +96,7 @@ func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash,
// the chain already entered the pos stage and disconnect the // the chain already entered the pos stage and disconnect the
// remote peer. // remote peer.
if h.merger.PoSFinalized() { if h.merger.PoSFinalized() {
// TODO (MariusVanDerWijden) drop non-updated peers after the merge return errors.New("disallowed block announcement")
return nil
// return errors.New("unexpected block announces")
} }
// Schedule all the unknown hashes for retrieval // Schedule all the unknown hashes for retrieval
var ( var (
@ -118,9 +122,7 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td
// the chain already entered the pos stage and disconnect the // the chain already entered the pos stage and disconnect the
// remote peer. // remote peer.
if h.merger.PoSFinalized() { if h.merger.PoSFinalized() {
// TODO (MariusVanDerWijden) drop non-updated peers after the merge return errors.New("disallowed block broadcast")
return nil
// return errors.New("unexpected block announces")
} }
// Schedule the block for import // Schedule the block for import
h.blockFetcher.Enqueue(peer.ID(), block) h.blockFetcher.Enqueue(peer.ID(), block)

@ -249,7 +249,7 @@ func testRecvTransactions(t *testing.T, protocol uint) {
handler.handler.synced.Store(true) // mark synced to accept transactions handler.handler.synced.Store(true) // mark synced to accept transactions
txs := make(chan core.NewTxsEvent) txs := make(chan core.NewTxsEvent)
sub := handler.txpool.SubscribeNewTxsEvent(txs) sub := handler.txpool.SubscribeTransactions(txs, false)
defer sub.Unsubscribe() defer sub.Unsubscribe()
// Create a source peer to send messages through and a sink handler to receive them // Create a source peer to send messages through and a sink handler to receive them
@ -424,7 +424,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
for i := 0; i < len(sinks); i++ { for i := 0; i < len(sinks); i++ {
txChs[i] = make(chan core.NewTxsEvent, 1024) txChs[i] = make(chan core.NewTxsEvent, 1024)
sub := sinks[i].txpool.SubscribeNewTxsEvent(txChs[i]) sub := sinks[i].txpool.SubscribeTransactions(txChs[i], false)
defer sub.Unsubscribe() defer sub.Unsubscribe()
} }
// Fill the source pool with transactions and wait for them at the sinks // Fill the source pool with transactions and wait for them at the sinks

@ -113,15 +113,17 @@ func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*txpool.Lazy
Time: tx.Time(), Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(), GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(), GasTipCap: tx.GasTipCap(),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
}) })
} }
} }
return pending return pending
} }
// SubscribeNewTxsEvent should return an event subscription of NewTxsEvent and // SubscribeTransactions should return an event subscription of NewTxsEvent and
// send events to the given channel. // send events to the given channel.
func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { func (p *testTxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
return p.txFeed.Subscribe(ch) return p.txFeed.Subscribe(ch)
} }

@ -93,6 +93,10 @@ type TxPool interface {
func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator) []p2p.Protocol { func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator) []p2p.Protocol {
protocols := make([]p2p.Protocol, 0, len(ProtocolVersions)) protocols := make([]p2p.Protocol, 0, len(ProtocolVersions))
for _, version := range ProtocolVersions { for _, version := range ProtocolVersions {
// Blob transactions require eth/68 announcements, disable everything else
if version <= ETH67 && backend.Chain().Config().CancunTime != nil {
continue
}
version := version // Closure version := version // Closure
protocols = append(protocols, p2p.Protocol{ protocols = append(protocols, p2p.Protocol{

@ -426,11 +426,11 @@ func handleGetPooledTransactions(backend Backend, msg Decoder, peer *Peer) error
if err := msg.Decode(&query); err != nil { if err := msg.Decode(&query); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
} }
hashes, txs := answerGetPooledTransactions(backend, query.GetPooledTransactionsRequest, peer) hashes, txs := answerGetPooledTransactions(backend, query.GetPooledTransactionsRequest)
return peer.ReplyPooledTransactionsRLP(query.RequestId, hashes, txs) return peer.ReplyPooledTransactionsRLP(query.RequestId, hashes, txs)
} }
func answerGetPooledTransactions(backend Backend, query GetPooledTransactionsRequest, peer *Peer) ([]common.Hash, []rlp.RawValue) { func answerGetPooledTransactions(backend Backend, query GetPooledTransactionsRequest) ([]common.Hash, []rlp.RawValue) {
// Gather transactions until the fetch or network limits is reached // Gather transactions until the fetch or network limits is reached
var ( var (
bytes int bytes int

@ -92,6 +92,8 @@ func testTransactionPriceNonceSort(t *testing.T, baseFee *big.Int) {
Time: tx.Time(), Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(), GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(), GasTipCap: tx.GasTipCap(),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
}) })
} }
expectedCount += count expectedCount += count
@ -157,6 +159,8 @@ func TestTransactionTimeSort(t *testing.T) {
Time: tx.Time(), Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(), GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(), GasTipCap: tx.GasTipCap(),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
}) })
} }
// Sort the transactions and cross check the nonce ordering // Sort the transactions and cross check the nonce ordering

@ -263,8 +263,8 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
resubmitIntervalCh: make(chan time.Duration), resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
} }
// Subscribe NewTxsEvent for tx pool // Subscribe for transaction insertion events (whether from network or resurrects)
worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) worker.txsSub = eth.TxPool().SubscribeTransactions(worker.txsCh, true)
// Subscribe events for blockchain // Subscribe events for blockchain
worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
@ -542,11 +542,14 @@ func (w *worker) mainLoop() {
for _, tx := range ev.Txs { for _, tx := range ev.Txs {
acc, _ := types.Sender(w.current.signer, tx) acc, _ := types.Sender(w.current.signer, tx)
txs[acc] = append(txs[acc], &txpool.LazyTransaction{ txs[acc] = append(txs[acc], &txpool.LazyTransaction{
Pool: w.eth.TxPool(), // We don't know where this came from, yolo resolve from everywhere
Hash: tx.Hash(), Hash: tx.Hash(),
Tx: tx.WithoutBlobTxSidecar(), Tx: nil, // Do *not* set this! We need to resolve it later to pull blobs in
Time: tx.Time(), Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(), GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(), GasTipCap: tx.GasTipCap(),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
}) })
} }
txset := newTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee) txset := newTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee)
@ -742,7 +745,6 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]*
if tx.Type() == types.BlobTxType { if tx.Type() == types.BlobTxType {
return w.commitBlobTransaction(env, tx) return w.commitBlobTransaction(env, tx)
} }
receipt, err := w.applyTransaction(env, tx) receipt, err := w.applyTransaction(env, tx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -764,7 +766,6 @@ func (w *worker) commitBlobTransaction(env *environment, tx *types.Transaction)
if (env.blobs+len(sc.Blobs))*params.BlobTxBlobGasPerBlob > params.MaxBlobGasPerBlock { if (env.blobs+len(sc.Blobs))*params.BlobTxBlobGasPerBlob > params.MaxBlobGasPerBlock {
return nil, errors.New("max data blobs reached") return nil, errors.New("max data blobs reached")
} }
receipt, err := w.applyTransaction(env, tx) receipt, err := w.applyTransaction(env, tx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -815,13 +816,24 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn
if ltx == nil { if ltx == nil {
break break
} }
// If we don't have enough space for the next transaction, skip the account.
if env.gasPool.Gas() < ltx.Gas {
log.Trace("Not enough gas left for transaction", "hash", ltx.Hash, "left", env.gasPool.Gas(), "needed", ltx.Gas)
txs.Pop()
continue
}
if left := uint64(params.MaxBlobGasPerBlock - env.blobs*params.BlobTxBlobGasPerBlob); left < ltx.BlobGas {
log.Trace("Not enough blob gas left for transaction", "hash", ltx.Hash, "left", left, "needed", ltx.BlobGas)
txs.Pop()
continue
}
// Transaction seems to fit, pull it up from the pool
tx := ltx.Resolve() tx := ltx.Resolve()
if tx == nil { if tx == nil {
log.Warn("Ignoring evicted transaction") log.Trace("Ignoring evicted transaction", "hash", ltx.Hash)
txs.Pop() txs.Pop()
continue continue
} }
// Error may be ignored here. The error has already been checked // Error may be ignored here. The error has already been checked
// during transaction acceptance is the transaction pool. // during transaction acceptance is the transaction pool.
from, _ := types.Sender(env.signer, tx) from, _ := types.Sender(env.signer, tx)
@ -829,11 +841,10 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn
// Check whether the tx is replay protected. If we're not in the EIP155 hf // Check whether the tx is replay protected. If we're not in the EIP155 hf
// phase, start ignoring the sender until we do. // phase, start ignoring the sender until we do.
if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) { if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) {
log.Trace("Ignoring replay protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block) log.Trace("Ignoring replay protected transaction", "hash", ltx.Hash, "eip155", w.chainConfig.EIP155Block)
txs.Pop() txs.Pop()
continue continue
} }
// Start executing the transaction // Start executing the transaction
env.state.SetTxContext(tx.Hash(), env.tcount) env.state.SetTxContext(tx.Hash(), env.tcount)
@ -841,7 +852,7 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn
switch { switch {
case errors.Is(err, core.ErrNonceTooLow): case errors.Is(err, core.ErrNonceTooLow):
// New head notification data race between the transaction pool and miner, shift // New head notification data race between the transaction pool and miner, shift
log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce()) log.Trace("Skipping transaction with low nonce", "hash", ltx.Hash, "sender", from, "nonce", tx.Nonce())
txs.Shift() txs.Shift()
case errors.Is(err, nil): case errors.Is(err, nil):
@ -853,7 +864,7 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn
default: default:
// Transaction is regarded as invalid, drop all consecutive transactions from // Transaction is regarded as invalid, drop all consecutive transactions from
// the same sender because of `nonce-too-high` clause. // the same sender because of `nonce-too-high` clause.
log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) log.Debug("Transaction failed, account skipped", "hash", ltx.Hash, "err", err)
txs.Pop() txs.Pop()
} }
} }

Loading…
Cancel
Save