diff --git a/cmd/devp2p/internal/ethtest/suite.go b/cmd/devp2p/internal/ethtest/suite.go index b5cc27a2b5..5cb9fa0297 100644 --- a/cmd/devp2p/internal/ethtest/suite.go +++ b/cmd/devp2p/internal/ethtest/suite.go @@ -849,7 +849,16 @@ func (s *Suite) TestBlobViolations(t *utesting.T) { if code, _, err := conn.Read(); err != nil { t.Fatalf("expected disconnect on blob violation, got err: %v", err) } else if code != discMsg { - t.Fatalf("expected disconnect on blob violation, got msg code: %d", code) + if code == protoOffset(ethProto)+eth.NewPooledTransactionHashesMsg { + // sometimes we'll get a blob transaction hashes announcement before the disconnect + // because blob transactions are scheduled to be fetched right away. + if code, _, err = conn.Read(); err != nil { + t.Fatalf("expected disconnect on blob violation, got err on second read: %v", err) + } + } + if code != discMsg { + t.Fatalf("expected disconnect on blob violation, got msg code: %d", code) + } } conn.Close() } diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index a113155009..97d1e29862 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -17,7 +17,6 @@ package fetcher import ( - "bytes" "errors" "fmt" "math" @@ -35,7 +34,7 @@ import ( ) const ( - // maxTxAnnounces is the maximum number of unique transaction a peer + // maxTxAnnounces is the maximum number of unique transactions a peer // can announce in a short time. maxTxAnnounces = 4096 @@ -114,16 +113,23 @@ var errTerminated = errors.New("terminated") type txAnnounce struct { origin string // Identifier of the peer originating the notification hashes []common.Hash // Batch of transaction hashes being announced - metas []*txMetadata // Batch of metadata associated with the hashes + metas []txMetadata // Batch of metadata associated with the hashes } -// txMetadata is a set of extra data transmitted along the announcement for better -// fetch scheduling. +// txMetadata provides the extra data transmitted along with the announcement +// for better fetch scheduling. type txMetadata struct { kind byte // Transaction consensus type size uint32 // Transaction size in bytes } +// txMetadataWithSeq is a wrapper of transaction metadata with an extra field +// tracking the transaction sequence number. +type txMetadataWithSeq struct { + txMetadata + seq uint64 +} + // txRequest represents an in-flight transaction retrieval request destined to // a specific peers. type txRequest struct { @@ -159,7 +165,7 @@ type txDrop struct { // The invariants of the fetcher are: // - Each tracked transaction (hash) must only be present in one of the // three stages. This ensures that the fetcher operates akin to a finite -// state automata and there's do data leak. +// state automata and there's no data leak. // - Each peer that announced transactions may be scheduled retrievals, but // only ever one concurrently. This ensures we can immediately know what is // missing from a reply and reschedule it. @@ -169,18 +175,19 @@ type TxFetcher struct { drop chan *txDrop quit chan struct{} + txSeq uint64 // Unique transaction sequence number underpriced *lru.Cache[common.Hash, time.Time] // Transactions discarded as too cheap (don't re-fetch) // Stage 1: Waiting lists for newly discovered transactions that might be // broadcast without needing explicit request/reply round trips. - waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast - waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist - waitslots map[string]map[common.Hash]*txMetadata // Waiting announcements grouped by peer (DoS protection) + waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast + waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist + waitslots map[string]map[common.Hash]*txMetadataWithSeq // Waiting announcements grouped by peer (DoS protection) // Stage 2: Queue of transactions that waiting to be allocated to some peer // to be retrieved directly. - announces map[string]map[common.Hash]*txMetadata // Set of announced transactions, grouped by origin peer - announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash + announces map[string]map[common.Hash]*txMetadataWithSeq // Set of announced transactions, grouped by origin peer + announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash // Stage 3: Set of transactions currently being retrieved, some which may be // fulfilled and some rescheduled. Note, this step shares 'announces' from the @@ -218,8 +225,8 @@ func NewTxFetcherForTests( quit: make(chan struct{}), waitlist: make(map[common.Hash]map[string]struct{}), waittime: make(map[common.Hash]mclock.AbsTime), - waitslots: make(map[string]map[common.Hash]*txMetadata), - announces: make(map[string]map[common.Hash]*txMetadata), + waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq), + announces: make(map[string]map[common.Hash]*txMetadataWithSeq), announced: make(map[common.Hash]map[string]struct{}), fetching: make(map[common.Hash]string), requests: make(map[string]*txRequest), @@ -247,7 +254,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c // loop, so anything caught here is time saved internally. var ( unknownHashes = make([]common.Hash, 0, len(hashes)) - unknownMetas = make([]*txMetadata, 0, len(hashes)) + unknownMetas = make([]txMetadata, 0, len(hashes)) duplicate int64 underpriced int64 @@ -264,7 +271,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c // Transaction metadata has been available since eth68, and all // legacy eth protocols (prior to eth68) have been deprecated. // Therefore, metadata is always expected in the announcement. - unknownMetas = append(unknownMetas, &txMetadata{kind: types[i], size: sizes[i]}) + unknownMetas = append(unknownMetas, txMetadata{kind: types[i], size: sizes[i]}) } } txAnnounceKnownMeter.Mark(duplicate) @@ -431,9 +438,19 @@ func (f *TxFetcher) loop() { ann.metas = ann.metas[:want-maxTxAnnounces] } // All is well, schedule the remainder of the transactions - idleWait := len(f.waittime) == 0 - _, oldPeer := f.announces[ann.origin] - + var ( + idleWait = len(f.waittime) == 0 + _, oldPeer = f.announces[ann.origin] + hasBlob bool + + // nextSeq returns the next available sequence number for tagging + // transaction announcement and also bump it internally. + nextSeq = func() uint64 { + seq := f.txSeq + f.txSeq++ + return seq + } + ) for i, hash := range ann.hashes { // If the transaction is already downloading, add it to the list // of possible alternates (in case the current retrieval fails) and @@ -443,9 +460,17 @@ func (f *TxFetcher) loop() { // Stage 2 and 3 share the set of origins per tx if announces := f.announces[ann.origin]; announces != nil { - announces[hash] = ann.metas[i] + announces[hash] = &txMetadataWithSeq{ + txMetadata: ann.metas[i], + seq: nextSeq(), + } } else { - f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]} + f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{ + hash: { + txMetadata: ann.metas[i], + seq: nextSeq(), + }, + } } continue } @@ -456,9 +481,17 @@ func (f *TxFetcher) loop() { // Stage 2 and 3 share the set of origins per tx if announces := f.announces[ann.origin]; announces != nil { - announces[hash] = ann.metas[i] + announces[hash] = &txMetadataWithSeq{ + txMetadata: ann.metas[i], + seq: nextSeq(), + } } else { - f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]} + f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{ + hash: { + txMetadata: ann.metas[i], + seq: nextSeq(), + }, + } } continue } @@ -475,24 +508,47 @@ func (f *TxFetcher) loop() { f.waitlist[hash][ann.origin] = struct{}{} if waitslots := f.waitslots[ann.origin]; waitslots != nil { - waitslots[hash] = ann.metas[i] + waitslots[hash] = &txMetadataWithSeq{ + txMetadata: ann.metas[i], + seq: nextSeq(), + } } else { - f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]} + f.waitslots[ann.origin] = map[common.Hash]*txMetadataWithSeq{ + hash: { + txMetadata: ann.metas[i], + seq: nextSeq(), + }, + } } continue } // Transaction unknown to the fetcher, insert it into the waiting list f.waitlist[hash] = map[string]struct{}{ann.origin: {}} - f.waittime[hash] = f.clock.Now() + // Assign the current timestamp as the wait time, but for blob transactions, + // skip the wait time since they are only announced. + if ann.metas[i].kind != types.BlobTxType { + f.waittime[hash] = f.clock.Now() + } else { + hasBlob = true + f.waittime[hash] = f.clock.Now() - mclock.AbsTime(txArriveTimeout) + } if waitslots := f.waitslots[ann.origin]; waitslots != nil { - waitslots[hash] = ann.metas[i] + waitslots[hash] = &txMetadataWithSeq{ + txMetadata: ann.metas[i], + seq: nextSeq(), + } } else { - f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]} + f.waitslots[ann.origin] = map[common.Hash]*txMetadataWithSeq{ + hash: { + txMetadata: ann.metas[i], + seq: nextSeq(), + }, + } } } // If a new item was added to the waitlist, schedule it into the fetcher - if idleWait && len(f.waittime) > 0 { + if hasBlob || (idleWait && len(f.waittime) > 0) { f.rescheduleWait(waitTimer, waitTrigger) } // If this peer is new and announced something already queued, maybe @@ -516,7 +572,7 @@ func (f *TxFetcher) loop() { if announces := f.announces[peer]; announces != nil { announces[hash] = f.waitslots[peer][hash] } else { - f.announces[peer] = map[common.Hash]*txMetadata{hash: f.waitslots[peer][hash]} + f.announces[peer] = map[common.Hash]*txMetadataWithSeq{hash: f.waitslots[peer][hash]} } delete(f.waitslots[peer], hash) if len(f.waitslots[peer]) == 0 { @@ -873,7 +929,7 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, hashes = make([]common.Hash, 0, maxTxRetrievals) bytes uint64 ) - f.forEachAnnounce(f.announces[peer], func(hash common.Hash, meta *txMetadata) bool { + f.forEachAnnounce(f.announces[peer], func(hash common.Hash, meta txMetadata) bool { // If the transaction is already fetching, skip to the next one if _, ok := f.fetching[hash]; ok { return true @@ -938,28 +994,26 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string)) } } -// forEachAnnounce does a range loop over a map of announcements in production, -// but during testing it does a deterministic sorted random to allow reproducing -// issues. -func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadata, do func(hash common.Hash, meta *txMetadata) bool) { - // If we're running production, use whatever Go's map gives us - if f.rand == nil { - for hash, meta := range announces { - if !do(hash, meta) { - return - } - } - return +// forEachAnnounce loops over the given announcements in arrival order, invoking +// the do function for each until it returns false. We enforce an arrival +// ordering to minimize the chances of transaction nonce-gaps, which result in +// transactions being rejected by the txpool. +func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadataWithSeq, do func(hash common.Hash, meta txMetadata) bool) { + type announcement struct { + hash common.Hash + meta txMetadata + seq uint64 } - // We're running the test suite, make iteration deterministic - list := make([]common.Hash, 0, len(announces)) - for hash := range announces { - list = append(list, hash) + // Process announcements by their arrival order + list := make([]announcement, 0, len(announces)) + for hash, entry := range announces { + list = append(list, announcement{hash: hash, meta: entry.txMetadata, seq: entry.seq}) } - sortHashes(list) - rotateHashes(list, f.rand.Intn(len(list))) - for _, hash := range list { - if !do(hash, announces[hash]) { + sort.Slice(list, func(i, j int) bool { + return list[i].seq < list[j].seq + }) + for i := range list { + if !do(list[i].hash, list[i].meta) { return } } @@ -975,26 +1029,3 @@ func rotateStrings(slice []string, n int) { slice[i] = orig[(i+n)%len(orig)] } } - -// sortHashes sorts a slice of hashes. This method is only used in tests in order -// to simulate random map iteration but keep it deterministic. -func sortHashes(slice []common.Hash) { - for i := 0; i < len(slice); i++ { - for j := i + 1; j < len(slice); j++ { - if bytes.Compare(slice[i][:], slice[j][:]) > 0 { - slice[i], slice[j] = slice[j], slice[i] - } - } - } -} - -// rotateHashes rotates the contents of a slice by n steps. This method is only -// used in tests to simulate random map iteration but keep it deterministic. -func rotateHashes(slice []common.Hash, n int) { - orig := make([]common.Hash, len(slice)) - copy(orig, slice) - - for i := 0; i < len(orig); i++ { - slice[i] = orig[(i+n)%len(orig)] - } -} diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 0b47646669..f80b1d6096 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -701,7 +701,7 @@ func TestTransactionFetcherMissingRescheduling(t *testing.T) { }, // Deliver the middle transaction requested, the one before which // should be dropped and the one after re-requested. - doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true}, // This depends on the deterministic random + doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[1]}, direct: true}, isScheduled{ tracking: map[string][]announce{ "A": { @@ -1070,7 +1070,7 @@ func TestTransactionFetcherRateLimiting(t *testing.T) { "A": announces, }, fetching: map[string][]common.Hash{ - "A": hashes[1643 : 1643+maxTxRetrievals], + "A": hashes[:maxTxRetrievals], }, }, }, @@ -1130,9 +1130,9 @@ func TestTransactionFetcherBandwidthLimiting(t *testing.T) { }, }, fetching: map[string][]common.Hash{ - "A": {{0x02}, {0x03}, {0x04}}, - "B": {{0x06}}, - "C": {{0x08}}, + "A": {{0x01}, {0x02}, {0x03}}, + "B": {{0x05}}, + "C": {{0x07}}, }, }, }, @@ -1209,8 +1209,8 @@ func TestTransactionFetcherDoSProtection(t *testing.T) { "B": announceB[:maxTxAnnounces/2-1], }, fetching: map[string][]common.Hash{ - "A": hashesA[1643 : 1643+maxTxRetrievals], - "B": append(append([]common.Hash{}, hashesB[maxTxAnnounces/2-3:maxTxAnnounces/2-1]...), hashesB[:maxTxRetrievals-2]...), + "A": hashesA[:maxTxRetrievals], + "B": hashesB[:maxTxRetrievals], }, }, // Ensure that adding even one more hash results in dropping the hash @@ -1227,8 +1227,8 @@ func TestTransactionFetcherDoSProtection(t *testing.T) { "B": announceB[:maxTxAnnounces/2-1], }, fetching: map[string][]common.Hash{ - "A": hashesA[1643 : 1643+maxTxRetrievals], - "B": append(append([]common.Hash{}, hashesB[maxTxAnnounces/2-3:maxTxAnnounces/2-1]...), hashesB[:maxTxRetrievals-2]...), + "A": hashesA[:maxTxRetrievals], + "B": hashesB[:maxTxRetrievals], }, }, }, @@ -1759,6 +1759,76 @@ func TestTransactionFetcherFuzzCrash04(t *testing.T) { }) } +// This test ensures the blob transactions will be scheduled for fetching +// once they are announced in the network. +func TestBlobTransactionAnnounce(t *testing.T) { + testTransactionFetcherParallel(t, txFetcherTest{ + init: func() *TxFetcher { + return NewTxFetcher( + func(common.Hash) bool { return false }, + nil, + func(string, []common.Hash) error { return nil }, + nil, + ) + }, + steps: []interface{}{ + // Initial announcement to get something into the waitlist + doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 222}}, + isWaiting(map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, + }), + // Announce a blob transaction + doTxNotify{peer: "B", hashes: []common.Hash{{0x03}}, types: []byte{types.BlobTxType}, sizes: []uint32{333}}, + isWaiting(map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, + "B": { + {common.Hash{0x03}, types.BlobTxType, 333}, + }, + }), + doWait{time: 0, step: true}, // zero time, but the blob fetching should be scheduled + isWaiting(map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, + }), + isScheduled{ + tracking: map[string][]announce{ + "B": { + {common.Hash{0x03}, types.BlobTxType, 333}, + }, + }, + fetching: map[string][]common.Hash{ // Depends on deterministic test randomizer + "B": {{0x03}}, + }, + }, + doWait{time: txArriveTimeout, step: true}, // zero time, but the blob fetching should be scheduled + isWaiting(nil), + isScheduled{ + tracking: map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, + "B": { + {common.Hash{0x03}, types.BlobTxType, 333}, + }, + }, + fetching: map[string][]common.Hash{ // Depends on deterministic test randomizer + "A": {{0x01}, {0x02}}, + "B": {{0x03}}, + }, + }, + }, + }) +} + func testTransactionFetcherParallel(t *testing.T, tt txFetcherTest) { t.Parallel() testTransactionFetcher(t, tt)