|
|
|
@ -262,40 +262,22 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error { |
|
|
|
|
// direct request replies. The differentiation is important so the fetcher can
|
|
|
|
|
// re-schedule missing transactions as soon as possible.
|
|
|
|
|
func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error { |
|
|
|
|
var ( |
|
|
|
|
inMeter = txReplyInMeter |
|
|
|
|
knownMeter = txReplyKnownMeter |
|
|
|
|
underpricedMeter = txReplyUnderpricedMeter |
|
|
|
|
otherRejectMeter = txReplyOtherRejectMeter |
|
|
|
|
) |
|
|
|
|
if !direct { |
|
|
|
|
inMeter = txBroadcastInMeter |
|
|
|
|
knownMeter = txBroadcastKnownMeter |
|
|
|
|
underpricedMeter = txBroadcastUnderpricedMeter |
|
|
|
|
otherRejectMeter = txBroadcastOtherRejectMeter |
|
|
|
|
} |
|
|
|
|
// Keep track of all the propagated transactions
|
|
|
|
|
inMeter.Mark(int64(len(txs))) |
|
|
|
|
|
|
|
|
|
if direct { |
|
|
|
|
txReplyInMeter.Mark(int64(len(txs))) |
|
|
|
|
} else { |
|
|
|
|
txBroadcastInMeter.Mark(int64(len(txs))) |
|
|
|
|
} |
|
|
|
|
// Push all the transactions into the pool, tracking underpriced ones to avoid
|
|
|
|
|
// re-requesting them and dropping the peer in case of malicious transfers.
|
|
|
|
|
var ( |
|
|
|
|
added = make([]common.Hash, 0, len(txs)) |
|
|
|
|
delay time.Duration |
|
|
|
|
) |
|
|
|
|
// proceed in batches
|
|
|
|
|
for i := 0; i < len(txs); i += 128 { |
|
|
|
|
end := i + 128 |
|
|
|
|
if end > len(txs) { |
|
|
|
|
end = len(txs) |
|
|
|
|
} |
|
|
|
|
var ( |
|
|
|
|
duplicate int64 |
|
|
|
|
underpriced int64 |
|
|
|
|
otherreject int64 |
|
|
|
|
) |
|
|
|
|
batch := txs[i:end] |
|
|
|
|
for j, err := range f.addTxs(batch) { |
|
|
|
|
errs := f.addTxs(txs) |
|
|
|
|
for i, err := range errs { |
|
|
|
|
// Track the transaction hash if the price is too low for us.
|
|
|
|
|
// Avoid re-request this transaction when we receive another
|
|
|
|
|
// announcement.
|
|
|
|
@ -303,7 +285,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) |
|
|
|
|
for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize { |
|
|
|
|
f.underpriced.Pop() |
|
|
|
|
} |
|
|
|
|
f.underpriced.Add(batch[j].Hash()) |
|
|
|
|
f.underpriced.Add(txs[i].Hash()) |
|
|
|
|
} |
|
|
|
|
// Track a few interesting failure types
|
|
|
|
|
switch { |
|
|
|
@ -318,23 +300,19 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) |
|
|
|
|
default: |
|
|
|
|
otherreject++ |
|
|
|
|
} |
|
|
|
|
added = append(added, batch[j].Hash()) |
|
|
|
|
} |
|
|
|
|
knownMeter.Mark(duplicate) |
|
|
|
|
underpricedMeter.Mark(underpriced) |
|
|
|
|
otherRejectMeter.Mark(otherreject) |
|
|
|
|
|
|
|
|
|
// If 'other reject' is >25% of the deliveries in any batch, abort. Either we are
|
|
|
|
|
// out of sync with the chain or the peer is griefing us.
|
|
|
|
|
if otherreject > 128/4 { |
|
|
|
|
delay = 200 * time.Millisecond |
|
|
|
|
log.Warn("Peer delivering useless transactions", "peer", peer, "ignored", len(txs)-end) |
|
|
|
|
break |
|
|
|
|
added = append(added, txs[i].Hash()) |
|
|
|
|
} |
|
|
|
|
if direct { |
|
|
|
|
txReplyKnownMeter.Mark(duplicate) |
|
|
|
|
txReplyUnderpricedMeter.Mark(underpriced) |
|
|
|
|
txReplyOtherRejectMeter.Mark(otherreject) |
|
|
|
|
} else { |
|
|
|
|
txBroadcastKnownMeter.Mark(duplicate) |
|
|
|
|
txBroadcastUnderpricedMeter.Mark(underpriced) |
|
|
|
|
txBroadcastOtherRejectMeter.Mark(otherreject) |
|
|
|
|
} |
|
|
|
|
select { |
|
|
|
|
case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}: |
|
|
|
|
time.Sleep(delay) |
|
|
|
|
return nil |
|
|
|
|
case <-f.quit: |
|
|
|
|
return errTerminated |
|
|
|
|