diff --git a/cmd/devp2p/internal/ethtest/helpers.go b/cmd/devp2p/internal/ethtest/helpers.go index b57649ade9..eeeb4f93ca 100644 --- a/cmd/devp2p/internal/ethtest/helpers.go +++ b/cmd/devp2p/internal/ethtest/helpers.go @@ -357,13 +357,9 @@ func (s *Suite) waitAnnounce(conn *Conn, blockAnnouncement *NewBlock) error { return fmt.Errorf("wrong block hash in announcement: expected %v, got %v", blockAnnouncement.Block.Hash(), hashes[0].Hash) } return nil - - // ignore tx announcements from previous tests case *NewPooledTransactionHashes: + // ignore tx announcements from previous tests continue - case *Transactions: - continue - default: return fmt.Errorf("unexpected: %s", pretty.Sdump(msg)) } diff --git a/cmd/devp2p/internal/ethtest/suite.go b/cmd/devp2p/internal/ethtest/suite.go index 4497478d72..7059b4ba73 100644 --- a/cmd/devp2p/internal/ethtest/suite.go +++ b/cmd/devp2p/internal/ethtest/suite.go @@ -544,13 +544,9 @@ func (s *Suite) TestNewPooledTxs(t *utesting.T) { t.Fatalf("unexpected number of txs requested: wanted %d, got %d", len(hashes), len(msg.GetPooledTransactionsPacket)) } return - // ignore propagated txs from previous tests case *NewPooledTransactionHashes: continue - case *Transactions: - continue - // ignore block announcements from previous tests case *NewBlockHashes: continue diff --git a/cmd/devp2p/internal/ethtest/transaction.go b/cmd/devp2p/internal/ethtest/transaction.go index baa55bd492..c4748bf8f7 100644 --- a/cmd/devp2p/internal/ethtest/transaction.go +++ b/cmd/devp2p/internal/ethtest/transaction.go @@ -29,7 +29,7 @@ import ( "github.com/ethereum/go-ethereum/params" ) -// var faucetAddr = common.HexToAddress("0x71562b71999873DB5b286dF957af199Ec94617F7") +//var faucetAddr = common.HexToAddress("0x71562b71999873DB5b286dF957af199Ec94617F7") var faucetKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") func (s *Suite) sendSuccessfulTxs(t *utesting.T) error { @@ -192,10 +192,10 @@ func sendMultipleSuccessfulTxs(t *utesting.T, s *Suite, txs []*types.Transaction nonce = txs[len(txs)-1].Nonce() // Wait for the transaction announcement(s) and make sure all sent txs are being propagated. - // all txs should be announced within a couple announcements. + // all txs should be announced within 3 announcements. recvHashes := make([]common.Hash, 0) - for i := 0; i < 20; i++ { + for i := 0; i < 3; i++ { switch msg := recvConn.readAndServe(s.chain, timeout).(type) { case *Transactions: for _, tx := range *msg { diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 7c8f16df53..035e0c2ec7 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -262,79 +262,57 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error { // direct request replies. The differentiation is important so the fetcher can // re-schedule missing transactions as soon as possible. func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error { - var ( - inMeter = txReplyInMeter - knownMeter = txReplyKnownMeter - underpricedMeter = txReplyUnderpricedMeter - otherRejectMeter = txReplyOtherRejectMeter - ) - if !direct { - inMeter = txBroadcastInMeter - knownMeter = txBroadcastKnownMeter - underpricedMeter = txBroadcastUnderpricedMeter - otherRejectMeter = txBroadcastOtherRejectMeter - } // Keep track of all the propagated transactions - inMeter.Mark(int64(len(txs))) - + if direct { + txReplyInMeter.Mark(int64(len(txs))) + } else { + txBroadcastInMeter.Mark(int64(len(txs))) + } // Push all the transactions into the pool, tracking underpriced ones to avoid // re-requesting them and dropping the peer in case of malicious transfers. var ( - added = make([]common.Hash, 0, len(txs)) - delay time.Duration + added = make([]common.Hash, 0, len(txs)) + duplicate int64 + underpriced int64 + otherreject int64 ) - // proceed in batches - for i := 0; i < len(txs); i += 128 { - end := i + 128 - if end > len(txs) { - end = len(txs) - } - var ( - duplicate int64 - underpriced int64 - otherreject int64 - ) - batch := txs[i:end] - for j, err := range f.addTxs(batch) { - // Track the transaction hash if the price is too low for us. - // Avoid re-request this transaction when we receive another - // announcement. - if errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced) { - for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize { - f.underpriced.Pop() - } - f.underpriced.Add(batch[j].Hash()) + errs := f.addTxs(txs) + for i, err := range errs { + // Track the transaction hash if the price is too low for us. + // Avoid re-request this transaction when we receive another + // announcement. + if errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced) { + for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize { + f.underpriced.Pop() } - // Track a few interesting failure types - switch { - case err == nil: // Noop, but need to handle to not count these + f.underpriced.Add(txs[i].Hash()) + } + // Track a few interesting failure types + switch { + case err == nil: // Noop, but need to handle to not count these - case errors.Is(err, core.ErrAlreadyKnown): - duplicate++ + case errors.Is(err, core.ErrAlreadyKnown): + duplicate++ - case errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced): - underpriced++ + case errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced): + underpriced++ - default: - otherreject++ - } - added = append(added, batch[j].Hash()) - } - knownMeter.Mark(duplicate) - underpricedMeter.Mark(underpriced) - otherRejectMeter.Mark(otherreject) - - // If 'other reject' is >25% of the deliveries in any batch, abort. Either we are - // out of sync with the chain or the peer is griefing us. - if otherreject > 128/4 { - delay = 200 * time.Millisecond - log.Warn("Peer delivering useless transactions", "peer", peer, "ignored", len(txs)-end) - break + default: + otherreject++ } + added = append(added, txs[i].Hash()) + } + if direct { + txReplyKnownMeter.Mark(duplicate) + txReplyUnderpricedMeter.Mark(underpriced) + txReplyOtherRejectMeter.Mark(otherreject) + } else { + txBroadcastKnownMeter.Mark(duplicate) + txBroadcastUnderpricedMeter.Mark(underpriced) + txBroadcastOtherRejectMeter.Mark(otherreject) } select { case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}: - time.Sleep(delay) return nil case <-f.quit: return errTerminated