eth/fetcher: fix blob transaction propagation (#30125)

This PR fixes an issue with blob transaction propagation due to the blob
transation txpool rejecting transactions with gapped nonces. The
specific changes are:

- fetch transactions from a peer in the order they were announced to
minimize nonce-gaps (which cause blob txs to be rejected

- don't wait on fetching blob transactions after announcement is
received, since they are not broadcast

Testing:
- unit tests updated to reflect that fetch order should always match tx
announcement order
- unit test added to confirm blob transactions are scheduled immediately
for fetching
  - running the PR on an eth mainnet full node without incident so far

---------

Signed-off-by: Roberto Bayardo <bayardo@alum.mit.edu>
Co-authored-by: Gary Rong <garyrong0905@gmail.com>
pull/30408/head
Roberto Bayardo 2 weeks ago committed by GitHub
parent 8f4fac7b86
commit 88c8459005
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 9
      cmd/devp2p/internal/ethtest/suite.go
  2. 169
      eth/fetcher/tx_fetcher.go
  3. 88
      eth/fetcher/tx_fetcher_test.go

@ -849,8 +849,17 @@ func (s *Suite) TestBlobViolations(t *utesting.T) {
if code, _, err := conn.Read(); err != nil { if code, _, err := conn.Read(); err != nil {
t.Fatalf("expected disconnect on blob violation, got err: %v", err) t.Fatalf("expected disconnect on blob violation, got err: %v", err)
} else if code != discMsg { } else if code != discMsg {
if code == protoOffset(ethProto)+eth.NewPooledTransactionHashesMsg {
// sometimes we'll get a blob transaction hashes announcement before the disconnect
// because blob transactions are scheduled to be fetched right away.
if code, _, err = conn.Read(); err != nil {
t.Fatalf("expected disconnect on blob violation, got err on second read: %v", err)
}
}
if code != discMsg {
t.Fatalf("expected disconnect on blob violation, got msg code: %d", code) t.Fatalf("expected disconnect on blob violation, got msg code: %d", code)
} }
}
conn.Close() conn.Close()
} }
} }

@ -17,7 +17,6 @@
package fetcher package fetcher
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"math" "math"
@ -35,7 +34,7 @@ import (
) )
const ( const (
// maxTxAnnounces is the maximum number of unique transaction a peer // maxTxAnnounces is the maximum number of unique transactions a peer
// can announce in a short time. // can announce in a short time.
maxTxAnnounces = 4096 maxTxAnnounces = 4096
@ -114,16 +113,23 @@ var errTerminated = errors.New("terminated")
type txAnnounce struct { type txAnnounce struct {
origin string // Identifier of the peer originating the notification origin string // Identifier of the peer originating the notification
hashes []common.Hash // Batch of transaction hashes being announced hashes []common.Hash // Batch of transaction hashes being announced
metas []*txMetadata // Batch of metadata associated with the hashes metas []txMetadata // Batch of metadata associated with the hashes
} }
// txMetadata is a set of extra data transmitted along the announcement for better // txMetadata provides the extra data transmitted along with the announcement
// fetch scheduling. // for better fetch scheduling.
type txMetadata struct { type txMetadata struct {
kind byte // Transaction consensus type kind byte // Transaction consensus type
size uint32 // Transaction size in bytes size uint32 // Transaction size in bytes
} }
// txMetadataWithSeq is a wrapper of transaction metadata with an extra field
// tracking the transaction sequence number.
type txMetadataWithSeq struct {
txMetadata
seq uint64
}
// txRequest represents an in-flight transaction retrieval request destined to // txRequest represents an in-flight transaction retrieval request destined to
// a specific peers. // a specific peers.
type txRequest struct { type txRequest struct {
@ -159,7 +165,7 @@ type txDrop struct {
// The invariants of the fetcher are: // The invariants of the fetcher are:
// - Each tracked transaction (hash) must only be present in one of the // - Each tracked transaction (hash) must only be present in one of the
// three stages. This ensures that the fetcher operates akin to a finite // three stages. This ensures that the fetcher operates akin to a finite
// state automata and there's do data leak. // state automata and there's no data leak.
// - Each peer that announced transactions may be scheduled retrievals, but // - Each peer that announced transactions may be scheduled retrievals, but
// only ever one concurrently. This ensures we can immediately know what is // only ever one concurrently. This ensures we can immediately know what is
// missing from a reply and reschedule it. // missing from a reply and reschedule it.
@ -169,17 +175,18 @@ type TxFetcher struct {
drop chan *txDrop drop chan *txDrop
quit chan struct{} quit chan struct{}
txSeq uint64 // Unique transaction sequence number
underpriced *lru.Cache[common.Hash, time.Time] // Transactions discarded as too cheap (don't re-fetch) underpriced *lru.Cache[common.Hash, time.Time] // Transactions discarded as too cheap (don't re-fetch)
// Stage 1: Waiting lists for newly discovered transactions that might be // Stage 1: Waiting lists for newly discovered transactions that might be
// broadcast without needing explicit request/reply round trips. // broadcast without needing explicit request/reply round trips.
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]*txMetadata // Waiting announcements grouped by peer (DoS protection) waitslots map[string]map[common.Hash]*txMetadataWithSeq // Waiting announcements grouped by peer (DoS protection)
// Stage 2: Queue of transactions that waiting to be allocated to some peer // Stage 2: Queue of transactions that waiting to be allocated to some peer
// to be retrieved directly. // to be retrieved directly.
announces map[string]map[common.Hash]*txMetadata // Set of announced transactions, grouped by origin peer announces map[string]map[common.Hash]*txMetadataWithSeq // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
// Stage 3: Set of transactions currently being retrieved, some which may be // Stage 3: Set of transactions currently being retrieved, some which may be
@ -218,8 +225,8 @@ func NewTxFetcherForTests(
quit: make(chan struct{}), quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}), waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime), waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]*txMetadata), waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq),
announces: make(map[string]map[common.Hash]*txMetadata), announces: make(map[string]map[common.Hash]*txMetadataWithSeq),
announced: make(map[common.Hash]map[string]struct{}), announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string), fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest), requests: make(map[string]*txRequest),
@ -247,7 +254,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c
// loop, so anything caught here is time saved internally. // loop, so anything caught here is time saved internally.
var ( var (
unknownHashes = make([]common.Hash, 0, len(hashes)) unknownHashes = make([]common.Hash, 0, len(hashes))
unknownMetas = make([]*txMetadata, 0, len(hashes)) unknownMetas = make([]txMetadata, 0, len(hashes))
duplicate int64 duplicate int64
underpriced int64 underpriced int64
@ -264,7 +271,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c
// Transaction metadata has been available since eth68, and all // Transaction metadata has been available since eth68, and all
// legacy eth protocols (prior to eth68) have been deprecated. // legacy eth protocols (prior to eth68) have been deprecated.
// Therefore, metadata is always expected in the announcement. // Therefore, metadata is always expected in the announcement.
unknownMetas = append(unknownMetas, &txMetadata{kind: types[i], size: sizes[i]}) unknownMetas = append(unknownMetas, txMetadata{kind: types[i], size: sizes[i]})
} }
} }
txAnnounceKnownMeter.Mark(duplicate) txAnnounceKnownMeter.Mark(duplicate)
@ -431,9 +438,19 @@ func (f *TxFetcher) loop() {
ann.metas = ann.metas[:want-maxTxAnnounces] ann.metas = ann.metas[:want-maxTxAnnounces]
} }
// All is well, schedule the remainder of the transactions // All is well, schedule the remainder of the transactions
idleWait := len(f.waittime) == 0 var (
_, oldPeer := f.announces[ann.origin] idleWait = len(f.waittime) == 0
_, oldPeer = f.announces[ann.origin]
hasBlob bool
// nextSeq returns the next available sequence number for tagging
// transaction announcement and also bump it internally.
nextSeq = func() uint64 {
seq := f.txSeq
f.txSeq++
return seq
}
)
for i, hash := range ann.hashes { for i, hash := range ann.hashes {
// If the transaction is already downloading, add it to the list // If the transaction is already downloading, add it to the list
// of possible alternates (in case the current retrieval fails) and // of possible alternates (in case the current retrieval fails) and
@ -443,9 +460,17 @@ func (f *TxFetcher) loop() {
// Stage 2 and 3 share the set of origins per tx // Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil { if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = ann.metas[i] announces[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: nextSeq(),
}
} else { } else {
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]} f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: nextSeq(),
},
}
} }
continue continue
} }
@ -456,9 +481,17 @@ func (f *TxFetcher) loop() {
// Stage 2 and 3 share the set of origins per tx // Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil { if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = ann.metas[i] announces[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: nextSeq(),
}
} else { } else {
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]} f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: nextSeq(),
},
}
} }
continue continue
} }
@ -475,24 +508,47 @@ func (f *TxFetcher) loop() {
f.waitlist[hash][ann.origin] = struct{}{} f.waitlist[hash][ann.origin] = struct{}{}
if waitslots := f.waitslots[ann.origin]; waitslots != nil { if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = ann.metas[i] waitslots[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: nextSeq(),
}
} else { } else {
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]} f.waitslots[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: nextSeq(),
},
}
} }
continue continue
} }
// Transaction unknown to the fetcher, insert it into the waiting list // Transaction unknown to the fetcher, insert it into the waiting list
f.waitlist[hash] = map[string]struct{}{ann.origin: {}} f.waitlist[hash] = map[string]struct{}{ann.origin: {}}
f.waittime[hash] = f.clock.Now()
// Assign the current timestamp as the wait time, but for blob transactions,
// skip the wait time since they are only announced.
if ann.metas[i].kind != types.BlobTxType {
f.waittime[hash] = f.clock.Now()
} else {
hasBlob = true
f.waittime[hash] = f.clock.Now() - mclock.AbsTime(txArriveTimeout)
}
if waitslots := f.waitslots[ann.origin]; waitslots != nil { if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = ann.metas[i] waitslots[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: nextSeq(),
}
} else { } else {
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]} f.waitslots[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: nextSeq(),
},
}
} }
} }
// If a new item was added to the waitlist, schedule it into the fetcher // If a new item was added to the waitlist, schedule it into the fetcher
if idleWait && len(f.waittime) > 0 { if hasBlob || (idleWait && len(f.waittime) > 0) {
f.rescheduleWait(waitTimer, waitTrigger) f.rescheduleWait(waitTimer, waitTrigger)
} }
// If this peer is new and announced something already queued, maybe // If this peer is new and announced something already queued, maybe
@ -516,7 +572,7 @@ func (f *TxFetcher) loop() {
if announces := f.announces[peer]; announces != nil { if announces := f.announces[peer]; announces != nil {
announces[hash] = f.waitslots[peer][hash] announces[hash] = f.waitslots[peer][hash]
} else { } else {
f.announces[peer] = map[common.Hash]*txMetadata{hash: f.waitslots[peer][hash]} f.announces[peer] = map[common.Hash]*txMetadataWithSeq{hash: f.waitslots[peer][hash]}
} }
delete(f.waitslots[peer], hash) delete(f.waitslots[peer], hash)
if len(f.waitslots[peer]) == 0 { if len(f.waitslots[peer]) == 0 {
@ -873,7 +929,7 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
hashes = make([]common.Hash, 0, maxTxRetrievals) hashes = make([]common.Hash, 0, maxTxRetrievals)
bytes uint64 bytes uint64
) )
f.forEachAnnounce(f.announces[peer], func(hash common.Hash, meta *txMetadata) bool { f.forEachAnnounce(f.announces[peer], func(hash common.Hash, meta txMetadata) bool {
// If the transaction is already fetching, skip to the next one // If the transaction is already fetching, skip to the next one
if _, ok := f.fetching[hash]; ok { if _, ok := f.fetching[hash]; ok {
return true return true
@ -938,28 +994,26 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))
} }
} }
// forEachAnnounce does a range loop over a map of announcements in production, // forEachAnnounce loops over the given announcements in arrival order, invoking
// but during testing it does a deterministic sorted random to allow reproducing // the do function for each until it returns false. We enforce an arrival
// issues. // ordering to minimize the chances of transaction nonce-gaps, which result in
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadata, do func(hash common.Hash, meta *txMetadata) bool) { // transactions being rejected by the txpool.
// If we're running production, use whatever Go's map gives us func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadataWithSeq, do func(hash common.Hash, meta txMetadata) bool) {
if f.rand == nil { type announcement struct {
for hash, meta := range announces { hash common.Hash
if !do(hash, meta) { meta txMetadata
return seq uint64
}
} }
return // Process announcements by their arrival order
list := make([]announcement, 0, len(announces))
for hash, entry := range announces {
list = append(list, announcement{hash: hash, meta: entry.txMetadata, seq: entry.seq})
} }
// We're running the test suite, make iteration deterministic sort.Slice(list, func(i, j int) bool {
list := make([]common.Hash, 0, len(announces)) return list[i].seq < list[j].seq
for hash := range announces { })
list = append(list, hash) for i := range list {
} if !do(list[i].hash, list[i].meta) {
sortHashes(list)
rotateHashes(list, f.rand.Intn(len(list)))
for _, hash := range list {
if !do(hash, announces[hash]) {
return return
} }
} }
@ -975,26 +1029,3 @@ func rotateStrings(slice []string, n int) {
slice[i] = orig[(i+n)%len(orig)] slice[i] = orig[(i+n)%len(orig)]
} }
} }
// sortHashes sorts a slice of hashes. This method is only used in tests in order
// to simulate random map iteration but keep it deterministic.
func sortHashes(slice []common.Hash) {
for i := 0; i < len(slice); i++ {
for j := i + 1; j < len(slice); j++ {
if bytes.Compare(slice[i][:], slice[j][:]) > 0 {
slice[i], slice[j] = slice[j], slice[i]
}
}
}
}
// rotateHashes rotates the contents of a slice by n steps. This method is only
// used in tests to simulate random map iteration but keep it deterministic.
func rotateHashes(slice []common.Hash, n int) {
orig := make([]common.Hash, len(slice))
copy(orig, slice)
for i := 0; i < len(orig); i++ {
slice[i] = orig[(i+n)%len(orig)]
}
}

@ -701,7 +701,7 @@ func TestTransactionFetcherMissingRescheduling(t *testing.T) {
}, },
// Deliver the middle transaction requested, the one before which // Deliver the middle transaction requested, the one before which
// should be dropped and the one after re-requested. // should be dropped and the one after re-requested.
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true}, // This depends on the deterministic random doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[1]}, direct: true},
isScheduled{ isScheduled{
tracking: map[string][]announce{ tracking: map[string][]announce{
"A": { "A": {
@ -1070,7 +1070,7 @@ func TestTransactionFetcherRateLimiting(t *testing.T) {
"A": announces, "A": announces,
}, },
fetching: map[string][]common.Hash{ fetching: map[string][]common.Hash{
"A": hashes[1643 : 1643+maxTxRetrievals], "A": hashes[:maxTxRetrievals],
}, },
}, },
}, },
@ -1130,9 +1130,9 @@ func TestTransactionFetcherBandwidthLimiting(t *testing.T) {
}, },
}, },
fetching: map[string][]common.Hash{ fetching: map[string][]common.Hash{
"A": {{0x02}, {0x03}, {0x04}}, "A": {{0x01}, {0x02}, {0x03}},
"B": {{0x06}}, "B": {{0x05}},
"C": {{0x08}}, "C": {{0x07}},
}, },
}, },
}, },
@ -1209,8 +1209,8 @@ func TestTransactionFetcherDoSProtection(t *testing.T) {
"B": announceB[:maxTxAnnounces/2-1], "B": announceB[:maxTxAnnounces/2-1],
}, },
fetching: map[string][]common.Hash{ fetching: map[string][]common.Hash{
"A": hashesA[1643 : 1643+maxTxRetrievals], "A": hashesA[:maxTxRetrievals],
"B": append(append([]common.Hash{}, hashesB[maxTxAnnounces/2-3:maxTxAnnounces/2-1]...), hashesB[:maxTxRetrievals-2]...), "B": hashesB[:maxTxRetrievals],
}, },
}, },
// Ensure that adding even one more hash results in dropping the hash // Ensure that adding even one more hash results in dropping the hash
@ -1227,8 +1227,8 @@ func TestTransactionFetcherDoSProtection(t *testing.T) {
"B": announceB[:maxTxAnnounces/2-1], "B": announceB[:maxTxAnnounces/2-1],
}, },
fetching: map[string][]common.Hash{ fetching: map[string][]common.Hash{
"A": hashesA[1643 : 1643+maxTxRetrievals], "A": hashesA[:maxTxRetrievals],
"B": append(append([]common.Hash{}, hashesB[maxTxAnnounces/2-3:maxTxAnnounces/2-1]...), hashesB[:maxTxRetrievals-2]...), "B": hashesB[:maxTxRetrievals],
}, },
}, },
}, },
@ -1759,6 +1759,76 @@ func TestTransactionFetcherFuzzCrash04(t *testing.T) {
}) })
} }
// This test ensures the blob transactions will be scheduled for fetching
// once they are announced in the network.
func TestBlobTransactionAnnounce(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Initial announcement to get something into the waitlist
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 222}},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
}),
// Announce a blob transaction
doTxNotify{peer: "B", hashes: []common.Hash{{0x03}}, types: []byte{types.BlobTxType}, sizes: []uint32{333}},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
"B": {
{common.Hash{0x03}, types.BlobTxType, 333},
},
}),
doWait{time: 0, step: true}, // zero time, but the blob fetching should be scheduled
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
}),
isScheduled{
tracking: map[string][]announce{
"B": {
{common.Hash{0x03}, types.BlobTxType, 333},
},
},
fetching: map[string][]common.Hash{ // Depends on deterministic test randomizer
"B": {{0x03}},
},
},
doWait{time: txArriveTimeout, step: true}, // zero time, but the blob fetching should be scheduled
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
"B": {
{common.Hash{0x03}, types.BlobTxType, 333},
},
},
fetching: map[string][]common.Hash{ // Depends on deterministic test randomizer
"A": {{0x01}, {0x02}},
"B": {{0x03}},
},
},
},
})
}
func testTransactionFetcherParallel(t *testing.T, tt txFetcherTest) { func testTransactionFetcherParallel(t *testing.T, tt txFetcherTest) {
t.Parallel() t.Parallel()
testTransactionFetcher(t, tt) testTransactionFetcher(t, tt)

Loading…
Cancel
Save