core/txpool: remove locals-tracking from subpools

Martin Holst Swende 4 months ago
parent 887d51b900
commit b594bae8da
No known key found for this signature in database
GPG Key ID: 683B438C05A5DDF0
  1. 2
      core/txpool/blobpool/blobpool.go
  2. 324
      core/txpool/legacypool/legacypool.go
  3. 15
      core/txpool/legacypool/list.go
  4. 5
      core/txpool/subpool.go
  5. 19
      core/txpool/txpool.go
  6. 3
      eth/backend.go

@ -1262,7 +1262,7 @@ func (p *BlobPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.
// Add inserts a set of blob transactions into the pool if they pass validation (both
// consensus validity and pool restrictions).
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error {
var (
adds = make([]*types.Transaction, 0, len(txs))
errs = make([]error, len(txs))

@ -99,7 +99,6 @@ var (
pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil)
queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil)
localGauge = metrics.NewRegisteredGauge("txpool/local", nil)
slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil)
reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil)
@ -159,10 +158,6 @@ var DefaultConfig = Config{
// unreasonable or unworkable.
func (config *Config) sanitize() Config {
conf := *config
if conf.Rejournal < time.Second {
log.Warn("Sanitizing invalid txpool journal time", "provided", conf.Rejournal, "updated", time.Second)
conf.Rejournal = time.Second
}
if conf.PriceLimit < 1 {
log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultConfig.PriceLimit)
conf.PriceLimit = DefaultConfig.PriceLimit
@ -214,9 +209,6 @@ type LegacyPool struct {
currentState *state.StateDB // Current state in the blockchain head
pendingNonces *noncer // Pending state tracking virtual nonces
locals *accountSet // Set of local transaction to exempt from eviction rules
journal *journal // Journal of local transaction to back up to disk
reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools
pending map[common.Address]*list // All currently processable transactions
queue map[common.Address]*list // Queued but non-processable transactions
@ -245,11 +237,6 @@ func New(config Config, chain BlockChain) *LegacyPool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize()
// Disable locals-handling in this pool. This is a precursor to fully
// deleting locals-related code
config.NoLocals = true
config.Locals = nil
// Create the transaction pool with its initial settings
pool := &LegacyPool{
config: config,
@ -267,16 +254,8 @@ func New(config Config, chain BlockChain) *LegacyPool {
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
log.Info("Setting new local account", "address", addr)
pool.locals.add(addr)
}
pool.priced = newPricedList(pool.all)
if !config.NoLocals && config.Journal != "" {
pool.journal = newTxJournal(config.Journal)
}
return pool
}
@ -292,8 +271,7 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool {
}
// Init sets the gas price needed to keep a transaction in the pool and the chain
// head to allow balance / nonce checks. The transaction journal will be loaded
// from disk and filtered based on the provided starting settings. The internal
// head to allow balance / nonce checks. The internal
// goroutines will be spun up and the pool deemed operational afterwards.
func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.AddressReserver) error {
// Set the address reserver to request exclusive access to pooled accounts
@ -316,20 +294,9 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
pool.currentState = statedb
pool.pendingNonces = newNoncer(statedb)
// Start the reorg loop early, so it can handle requests generated during
// journal loading.
pool.wg.Add(1)
go pool.scheduleReorgLoop()
// If local transactions and journaling is enabled, load from disk
if pool.journal != nil {
if err := pool.journal.load(pool.addLocals); err != nil {
log.Warn("Failed to load transaction journal", "err", err)
}
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate transaction journal", "err", err)
}
}
pool.wg.Add(1)
go pool.loop()
return nil
@ -345,13 +312,11 @@ func (pool *LegacyPool) loop() {
prevPending, prevQueued, prevStales int
// Start the stats reporting and transaction eviction tickers
report = time.NewTicker(statsReportInterval)
evict = time.NewTicker(evictionInterval)
journal = time.NewTicker(pool.config.Rejournal)
report = time.NewTicker(statsReportInterval)
evict = time.NewTicker(evictionInterval)
)
defer report.Stop()
defer evict.Stop()
defer journal.Stop()
// Notify tests that the init phase is done
close(pool.initDoneCh)
@ -377,11 +342,7 @@ func (pool *LegacyPool) loop() {
case <-evict.C:
pool.mu.Lock()
for addr := range pool.queue {
// Skip local transactions from the eviction mechanism
if pool.locals.contains(addr) {
continue
}
// Any non-locals old enough should be removed
// Any old enough should be removed
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
list := pool.queue[addr].Flatten()
for _, tx := range list {
@ -391,16 +352,6 @@ func (pool *LegacyPool) loop() {
}
}
pool.mu.Unlock()
// Handle local transaction journal rotation
case <-journal.C:
if pool.journal != nil {
pool.mu.Lock()
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate local tx journal", "err", err)
}
pool.mu.Unlock()
}
}
}
}
@ -411,9 +362,6 @@ func (pool *LegacyPool) Close() error {
close(pool.reorgShutdownCh)
pool.wg.Wait()
if pool.journal != nil {
pool.journal.close()
}
log.Info("Transaction pool stopped")
return nil
}
@ -554,7 +502,7 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address]
txs := list.Flatten()
// If the miner requests tip enforcement, cap the lists now
if minTipBig != nil && !pool.locals.contains(addr) {
if minTipBig != nil {
for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 {
txs = txs[:i]
@ -582,35 +530,11 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address]
return pending
}
// Locals retrieves the accounts currently considered local by the pool.
func (pool *LegacyPool) Locals() []common.Address {
pool.mu.Lock()
defer pool.mu.Unlock()
return pool.locals.flatten()
}
// local retrieves all currently known local transactions, grouped by origin
// account and sorted by nonce. The returned transaction set is a copy and can be
// freely modified by calling code.
func (pool *LegacyPool) local() map[common.Address]types.Transactions {
txs := make(map[common.Address]types.Transactions)
for addr := range pool.locals.accounts {
if pending := pool.pending[addr]; pending != nil {
txs[addr] = append(txs[addr], pending.Flatten()...)
}
if queued := pool.queue[addr]; queued != nil {
txs[addr] = append(txs[addr], queued.Flatten()...)
}
}
return txs
}
// validateTxBasics checks whether a transaction is valid according to the consensus
// rules, but does not check state-dependent validation such as sufficient balance.
// This check is meant as an early check which only needs to be performed once,
// and does not require the pool mutex to be held.
func (pool *LegacyPool) validateTxBasics(tx *types.Transaction, local bool) error {
func (pool *LegacyPool) validateTxBasics(tx *types.Transaction) error {
opts := &txpool.ValidationOptions{
Config: pool.chainconfig,
Accept: 0 |
@ -620,9 +544,6 @@ func (pool *LegacyPool) validateTxBasics(tx *types.Transaction, local bool) erro
MaxSize: txMaxSize,
MinTip: pool.gasTip.Load().ToBig(),
}
if local {
opts.MinTip = new(big.Int)
}
if err := txpool.ValidateTransaction(tx, pool.currentHead.Load(), pool.signer, opts); err != nil {
return err
}
@ -631,7 +552,7 @@ func (pool *LegacyPool) validateTxBasics(tx *types.Transaction, local bool) erro
// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
func (pool *LegacyPool) validateTx(tx *types.Transaction, local bool) error {
func (pool *LegacyPool) validateTx(tx *types.Transaction) error {
opts := &txpool.ValidationOptionsWithState{
State: pool.currentState,
@ -670,11 +591,7 @@ func (pool *LegacyPool) validateTx(tx *types.Transaction, local bool) error {
// add validates a transaction and inserts it into the non-executable queue for later
// pending promotion and execution. If the transaction is a replacement for an already
// pending or queued one, it overwrites the previous transaction if its price is higher.
//
// If a newly added transaction is marked as local, its sending account will be
// added to the allowlist, preventing any associated transaction from being dropped
// out of the pool due to pricing constraints.
func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
// If the transaction is already known, discard it
hash := tx.Hash()
if pool.all.Get(hash) != nil {
@ -682,12 +599,9 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
knownTxMeter.Mark(1)
return false, txpool.ErrAlreadyKnown
}
// Make the local flag. If it's from local source or it's from the network but
// the sender is marked as local previously, treat it as the local transaction.
isLocal := local || pool.locals.containsTx(tx)
// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, isLocal); err != nil {
if err := pool.validateTx(tx); err != nil {
log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
invalidTxMeter.Mark(1)
return false, err
@ -720,7 +634,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
// If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it
if !isLocal && pool.priced.Underpriced(tx) {
if pool.priced.Underpriced(tx) {
log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
underpricedTxMeter.Mark(1)
return false, txpool.ErrUnderpriced
@ -736,19 +650,18 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
}
// New transaction is better than our worse ones, make room for it.
// If it's a local transaction, forcibly discard all available transactions.
// Otherwise if we can't make enough room for new one, abort the operation.
drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal)
// If we can't make enough room for new one, abort the operation.
drop, success := pool.priced.Discard(pool.all.Slots() - int(pool.config.GlobalSlots+pool.config.GlobalQueue) + numSlots(tx))
// Special case, we still can't make the room for the new remote one.
if !isLocal && !success {
if !success {
log.Trace("Discarding overflown transaction", "hash", hash)
overflowedTxMeter.Mark(1)
return false, ErrTxPoolOverflow
}
// If the new transaction is a future transaction it should never churn pending transactions
if !isLocal && pool.isGapped(from, tx) {
if pool.isGapped(from, tx) {
var replacesPending bool
for _, dropTx := range drop {
dropSender, _ := types.Sender(pool.signer, dropTx)
@ -760,7 +673,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
// Add all transactions back to the priced queue
if replacesPending {
for _, dropTx := range drop {
pool.priced.Put(dropTx, false)
pool.priced.Put(dropTx)
}
log.Trace("Discarding future transaction replacing pending tx", "hash", hash)
return false, txpool.ErrFutureReplacePending
@ -793,9 +706,8 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
pool.priced.Removed(1)
pendingReplaceMeter.Mark(1)
}
pool.all.Add(tx, isLocal)
pool.priced.Put(tx, isLocal)
pool.journalTx(from, tx)
pool.all.Add(tx)
pool.priced.Put(tx)
pool.queueTxEvent(tx)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
@ -804,20 +716,10 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
return old != nil, nil
}
// New transaction isn't replacing a pending one, push into queue
replaced, err = pool.enqueueTx(hash, tx, isLocal, true)
replaced, err = pool.enqueueTx(hash, tx, true)
if err != nil {
return false, err
}
// Mark local addresses and journal local transactions
if local && !pool.locals.contains(from) {
log.Info("Setting new local account", "address", from)
pool.locals.add(from)
pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time.
}
if isLocal {
localGauge.Inc(1)
}
pool.journalTx(from, tx)
log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
return replaced, nil
@ -850,7 +752,7 @@ func (pool *LegacyPool) isGapped(from common.Address, tx *types.Transaction) boo
// enqueueTx inserts a new transaction into the non-executable transaction queue.
//
// Note, this method assumes the pool lock is held!
func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local bool, addAll bool) (bool, error) {
func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAll bool) (bool, error) {
// Try to insert the transaction into the future queue
from, _ := types.Sender(pool.signer, tx) // already validated
if pool.queue[from] == nil {
@ -877,8 +779,8 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local
log.Error("Missing transaction in lookup set, please report the issue", "hash", hash)
}
if addAll {
pool.all.Add(tx, local)
pool.priced.Put(tx, local)
pool.all.Add(tx)
pool.priced.Put(tx)
}
// If we never record the heartbeat, do it right now.
if _, exist := pool.beats[from]; !exist {
@ -887,18 +789,6 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local
return old != nil, nil
}
// journalTx adds the specified transaction to the local disk journal if it is
// deemed to have been sent from a local account.
func (pool *LegacyPool) journalTx(from common.Address, tx *types.Transaction) {
// Only journal if it's enabled and the transaction is local
if pool.journal == nil || !pool.locals.contains(from) {
return
}
if err := pool.journal.insert(tx); err != nil {
log.Warn("Failed to journal local transaction", "err", err)
}
}
// promoteTx adds a transaction to the pending (processable) list of transactions
// and returns whether it was inserted or an older was better.
//
@ -935,28 +825,13 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ
return true
}
// addLocals enqueues a batch of transactions into the pool if they are valid, marking the
// senders as local ones, ensuring they go around the local pricing constraints.
//
// This method is used to add transactions from the RPC API and performs synchronous pool
// reorganization and event propagation.
func (pool *LegacyPool) addLocals(txs []*types.Transaction) []error {
return pool.Add(txs, !pool.config.NoLocals, true)
}
// addLocal enqueues a single local transaction into the pool if it is valid. This is
// a convenience wrapper around addLocals.
func (pool *LegacyPool) addLocal(tx *types.Transaction) error {
return pool.addLocals([]*types.Transaction{tx})[0]
}
// addRemotes enqueues a batch of transactions into the pool if they are valid. If the
// senders are not among the locally tracked ones, full pricing constraints will apply.
// addRemotes enqueues a batch of transactions into the pool if they are valid.
// Full pricing constraints will apply.
//
// This method is used to add transactions from the p2p network and does not wait for pool
// reorganization and internal event propagation.
func (pool *LegacyPool) addRemotes(txs []*types.Transaction) []error {
return pool.Add(txs, false, false)
return pool.Add(txs, false)
}
// addRemote enqueues a single transaction into the pool if it is valid. This is a convenience
@ -967,23 +842,19 @@ func (pool *LegacyPool) addRemote(tx *types.Transaction) error {
// addRemotesSync is like addRemotes, but waits for pool reorganization. Tests use this method.
func (pool *LegacyPool) addRemotesSync(txs []*types.Transaction) []error {
return pool.Add(txs, false, true)
return pool.Add(txs, true)
}
// This is like addRemotes with a single transaction, but waits for pool reorganization. Tests use this method.
func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error {
return pool.Add([]*types.Transaction{tx}, false, true)[0]
return pool.Add([]*types.Transaction{tx}, true)[0]
}
// Add enqueues a batch of transactions into the pool if they are valid. Depending
// on the local flag, full pricing constraints will or will not be applied.
// Add enqueues a batch of transactions into the pool if they are valid.
//
// If sync is set, the method will block until all internal maintenance related
// to the add is finished. Only use this during tests for determinism!
func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error {
// Do not treat as local if local transactions have been disabled
local = local && !pool.config.NoLocals
func (pool *LegacyPool) Add(txs []*types.Transaction, sync bool) []error {
// Filter out known ones without obtaining the pool lock or recovering signatures
var (
errs = make([]error, len(txs))
@ -999,7 +870,7 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error
// Exclude transactions with basic errors, e.g invalid signatures and
// insufficient intrinsic gas as soon as possible and cache senders
// in transactions before obtaining lock
if err := pool.validateTxBasics(tx, local); err != nil {
if err := pool.validateTxBasics(tx); err != nil {
errs[i] = err
log.Trace("Discarding invalid transaction", "hash", tx.Hash(), "err", err)
invalidTxMeter.Mark(1)
@ -1014,7 +885,7 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error
// Process all the new transaction and merge any errors into the original slice
pool.mu.Lock()
newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
newErrs, dirtyAddrs := pool.addTxsLocked(news)
pool.mu.Unlock()
var nilSlot = 0
@ -1035,11 +906,11 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error
// addTxsLocked attempts to queue a batch of transactions if they are valid.
// The transaction pool lock must be held.
func (pool *LegacyPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) {
func (pool *LegacyPool) addTxsLocked(txs []*types.Transaction) ([]error, *accountSet) {
dirty := newAccountSet(pool.signer)
errs := make([]error, len(txs))
for i, tx := range txs {
replaced, err := pool.add(tx, local)
replaced, err := pool.add(tx)
errs[i] = err
if err == nil && !replaced {
dirty.addTx(tx)
@ -1131,9 +1002,6 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
if outofbound {
pool.priced.Removed(1)
}
if pool.locals.contains(addr) {
localGauge.Dec(1)
}
// Remove the transaction from the pending lists and reset the account nonce
if pending := pool.pending[addr]; pending != nil {
if removed, invalids := pending.Remove(tx); removed {
@ -1144,7 +1012,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
// Postpone any invalidated transactions
for _, tx := range invalids {
// Internal shuffle shouldn't touch the lookup set.
pool.enqueueTx(tx.Hash(), tx, false, false)
pool.enqueueTx(tx.Hash(), tx, false)
}
// Update the account nonce if needed
pool.pendingNonces.setIfLower(addr, tx.Nonce())
@ -1446,7 +1314,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
// Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject))
core.SenderCacher.Recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)
pool.addTxsLocked(reinject)
}
// promoteExecutables moves transactions that have become processable from the
@ -1491,22 +1359,17 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
queuedGauge.Dec(int64(len(readies)))
// Drop all transactions over the allowed limit
var caps types.Transactions
if !pool.locals.contains(addr) {
caps = list.Cap(int(pool.config.AccountQueue))
for _, tx := range caps {
hash := tx.Hash()
pool.all.Remove(hash)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
queuedRateLimitMeter.Mark(int64(len(caps)))
var caps = list.Cap(int(pool.config.AccountQueue))
for _, tx := range caps {
hash := tx.Hash()
pool.all.Remove(hash)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
queuedRateLimitMeter.Mark(int64(len(caps)))
// Mark all the items dropped as removed
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
}
// Delete the entire queue entry if it became empty.
if list.Empty() {
delete(pool.queue, addr)
@ -1536,14 +1399,14 @@ func (pool *LegacyPool) truncatePending() {
spammers := prque.New[int64, common.Address](nil)
for addr, list := range pool.pending {
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
if uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, int64(list.Len()))
}
}
// Gradually drop transactions from offenders
offenders := []common.Address{}
for pending > pool.config.GlobalSlots && !spammers.Empty() {
// Retrieve the next offender if not local address
// Retrieve the next offender
offender, _ := spammers.Pop()
offenders = append(offenders, offender)
@ -1569,9 +1432,7 @@ func (pool *LegacyPool) truncatePending() {
}
pool.priced.Removed(len(caps))
pendingGauge.Dec(int64(len(caps)))
if pool.locals.contains(offenders[i]) {
localGauge.Dec(int64(len(caps)))
}
pending--
}
}
@ -1596,9 +1457,6 @@ func (pool *LegacyPool) truncatePending() {
}
pool.priced.Removed(len(caps))
pendingGauge.Dec(int64(len(caps)))
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(caps)))
}
pending--
}
}
@ -1619,13 +1477,11 @@ func (pool *LegacyPool) truncateQueue() {
// Sort all accounts with queued transactions by heartbeat
addresses := make(addressesByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue {
if !pool.locals.contains(addr) { // don't drop locals
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
}
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
}
sort.Sort(sort.Reverse(addresses))
// Drop transactions until the total is below the limit or only locals remain
// Drop transactions until the total is below the limit
for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
addr := addresses[len(addresses)-1]
list := pool.queue[addr.address]
@ -1685,12 +1541,10 @@ func (pool *LegacyPool) demoteUnexecutables() {
log.Trace("Demoting pending transaction", "hash", hash)
// Internal shuffle shouldn't touch the lookup set.
pool.enqueueTx(hash, tx, false, false)
pool.enqueueTx(hash, tx, false)
}
pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
}
// If there's a gap in front, alert (should never happen) and postpone all transactions
if list.Len() > 0 && list.txs.Get(nonce) == nil {
gapped := list.Cap(0)
@ -1699,7 +1553,7 @@ func (pool *LegacyPool) demoteUnexecutables() {
log.Error("Demoting invalidated transaction", "hash", hash)
// Internal shuffle shouldn't touch the lookup set.
pool.enqueueTx(hash, tx, false, false)
pool.enqueueTx(hash, tx, false)
}
pendingGauge.Dec(int64(len(gapped)))
}
@ -1798,20 +1652,15 @@ func (as *accountSet) merge(other *accountSet) {
// internal mechanisms. The sole purpose of the type is to permit out-of-bound
// peeking into the pool in LegacyPool.Get without having to acquire the widely scoped
// LegacyPool.mu mutex.
//
// This lookup set combines the notion of "local transactions", which is useful
// to build upper-level structure.
type lookup struct {
slots int
lock sync.RWMutex
locals map[common.Hash]*types.Transaction
remotes map[common.Hash]*types.Transaction
}
// newLookup returns a new lookup structure.
func newLookup() *lookup {
return &lookup{
locals: make(map[common.Hash]*types.Transaction),
remotes: make(map[common.Hash]*types.Transaction),
}
}
@ -1819,22 +1668,12 @@ func newLookup() *lookup {
// Range calls f on each key and value present in the map. The callback passed
// should return the indicator whether the iteration needs to be continued.
// Callers need to specify which set (or both) to be iterated.
func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction, local bool) bool, local bool, remote bool) {
func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) {
t.lock.RLock()
defer t.lock.RUnlock()
if local {
for key, value := range t.locals {
if !f(key, value, true) {
return
}
}
}
if remote {
for key, value := range t.remotes {
if !f(key, value, false) {
return
}
for key, value := range t.remotes {
if !f(key, value) {
return
}
}
}
@ -1843,21 +1682,9 @@ func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction, local boo
func (t *lookup) Get(hash common.Hash) *types.Transaction {
t.lock.RLock()
defer t.lock.RUnlock()
if tx := t.locals[hash]; tx != nil {
return tx
}
return t.remotes[hash]
}
// GetLocal returns a transaction if it exists in the lookup, or nil if not found.
func (t *lookup) GetLocal(hash common.Hash) *types.Transaction {
t.lock.RLock()
defer t.lock.RUnlock()
return t.locals[hash]
}
// GetRemote returns a transaction if it exists in the lookup, or nil if not found.
func (t *lookup) GetRemote(hash common.Hash) *types.Transaction {
t.lock.RLock()
@ -1871,15 +1698,7 @@ func (t *lookup) Count() int {
t.lock.RLock()
defer t.lock.RUnlock()
return len(t.locals) + len(t.remotes)
}
// LocalCount returns the current number of local transactions in the lookup.
func (t *lookup) LocalCount() int {
t.lock.RLock()
defer t.lock.RUnlock()
return len(t.locals)
return len(t.remotes)
}
// RemoteCount returns the current number of remote transactions in the lookup.
@ -1899,18 +1718,14 @@ func (t *lookup) Slots() int {
}
// Add adds a transaction to the lookup.
func (t *lookup) Add(tx *types.Transaction, local bool) {
func (t *lookup) Add(tx *types.Transaction) {
t.lock.Lock()
defer t.lock.Unlock()
t.slots += numSlots(tx)
slotsGauge.Update(int64(t.slots))
if local {
t.locals[tx.Hash()] = tx
} else {
t.remotes[tx.Hash()] = tx
}
t.remotes[tx.Hash()] = tx
}
// Remove removes a transaction from the lookup.
@ -1918,10 +1733,7 @@ func (t *lookup) Remove(hash common.Hash) {
t.lock.Lock()
defer t.lock.Unlock()
tx, ok := t.locals[hash]
if !ok {
tx, ok = t.remotes[hash]
}
tx, ok := t.remotes[hash]
if !ok {
log.Error("No transaction found to be deleted", "hash", hash)
return
@ -1929,36 +1741,18 @@ func (t *lookup) Remove(hash common.Hash) {
t.slots -= numSlots(tx)
slotsGauge.Update(int64(t.slots))
delete(t.locals, hash)
delete(t.remotes, hash)
}
// RemoteToLocals migrates the transactions belongs to the given locals to locals
// set. The assumption is held the locals set is thread-safe to be used.
func (t *lookup) RemoteToLocals(locals *accountSet) int {
t.lock.Lock()
defer t.lock.Unlock()
var migrated int
for hash, tx := range t.remotes {
if locals.containsTx(tx) {
t.locals[hash] = tx
delete(t.remotes, hash)
migrated += 1
}
}
return migrated
}
// RemotesBelowTip finds all remote transactions below the given tip threshold.
func (t *lookup) RemotesBelowTip(threshold *big.Int) types.Transactions {
found := make(types.Transactions, 0, 128)
t.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
t.Range(func(hash common.Hash, tx *types.Transaction) bool {
if tx.GasTipCapIntCmp(threshold) < 0 {
found = append(found, tx)
}
return true
}, false, true) // Only iterate remotes
})
return found
}

@ -556,10 +556,7 @@ func newPricedList(all *lookup) *pricedList {
}
// Put inserts a new transaction into the heap.
func (l *pricedList) Put(tx *types.Transaction, local bool) {
if local {
return
}
func (l *pricedList) Put(tx *types.Transaction) {
// Insert every new transaction to the urgent heap first; Discard will balance the heaps
heap.Push(&l.urgent, tx)
}
@ -612,9 +609,7 @@ func (l *pricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool {
// Discard finds a number of most underpriced transactions, removes them from the
// priced list and returns them for further removal from the entire pool.
// If noPending is set to true, we will only consider the floating list
//
// Note local transaction won't be considered for eviction.
func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) {
func (l *pricedList) Discard(slots int) (types.Transactions, bool) {
drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop
for slots > 0 {
if len(l.urgent.list)*floatingRatio > len(l.floating.list)*urgentRatio {
@ -643,7 +638,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) {
}
}
// If we still can't make enough room for the new transaction
if slots > 0 && !force {
if slots > 0 {
for _, tx := range drop {
heap.Push(&l.urgent, tx)
}
@ -659,10 +654,10 @@ func (l *pricedList) Reheap() {
start := time.Now()
l.stales.Store(0)
l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount())
l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
l.all.Range(func(hash common.Hash, tx *types.Transaction) bool {
l.urgent.list = append(l.urgent.list, tx)
return true
}, false, true) // Only iterate remotes
})
heap.Init(&l.urgent)
// balance out the two heaps by moving the worse half of transactions into the

@ -132,7 +132,7 @@ type SubPool interface {
// Add enqueues a batch of transactions into the pool if they are valid. Due
// to the large transaction churn, add may postpone fully integrating the tx
// to a later point to batch multiple ones together.
Add(txs []*types.Transaction, local bool, sync bool) []error
Add(txs []*types.Transaction, sync bool) []error
// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce.
@ -162,9 +162,6 @@ type SubPool interface {
// pending as well as queued transactions of this address, grouped by nonce.
ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction)
// Locals retrieves the accounts currently considered local by the pool.
Locals() []common.Address
// Status returns the known status (unknown/pending/queued) of a transaction
// identified by their hashes.
Status(hash common.Hash) TxStatus

@ -353,7 +353,7 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
errsets := make([][]error, len(p.subpools))
for i := 0; i < len(p.subpools); i++ {
// Note: local is explicitly set to false here.
errsets[i] = p.subpools[i].Add(txsets[i], false, sync)
errsets[i] = p.subpools[i].Add(txsets[i], sync)
}
errs := make([]error, len(txs))
for i, split := range splits {
@ -454,23 +454,6 @@ func (p *TxPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*type
return []*types.Transaction{}, []*types.Transaction{}
}
// Locals retrieves the accounts currently considered local by the pool.
func (p *TxPool) Locals() []common.Address {
// Retrieve the locals from each subpool and deduplicate them
locals := make(map[common.Address]struct{})
for _, subpool := range p.subpools {
for _, local := range subpool.Locals() {
locals[local] = struct{}{}
}
}
// Flatten and return the deduplicated local set
flat := make([]common.Address, 0, len(locals))
for local := range locals {
flat = append(flat, local)
}
return flat
}
// Status returns the known status (unknown/pending/queued) of a transaction
// identified by its hash.
func (p *TxPool) Status(hash common.Hash) TxStatus {

@ -237,6 +237,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, blobPool})
if !config.TxPool.NoLocals {
// TODO!
// We also need to handle config.Locals, the accounts that are
// to be treated as locals, regardless of how they arrive to geth.
eth.localTxTracker = legacypool.NewTxTracker(config.TxPool.Journal,
config.TxPool.Rejournal,
eth.blockchain.Config(), eth.txPool)

Loading…
Cancel
Save