diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 48673a5f3c..1440af5440 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -1638,15 +1638,15 @@ func (as *accountSet) merge(other *accountSet) { // peeking into the pool in LegacyPool.Get without having to acquire the widely scoped // LegacyPool.mu mutex. type lookup struct { - slots int - lock sync.RWMutex - remotes map[common.Hash]*types.Transaction + slots int + lock sync.RWMutex + txs map[common.Hash]*types.Transaction } // newLookup returns a new lookup structure. func newLookup() *lookup { return &lookup{ - remotes: make(map[common.Hash]*types.Transaction), + txs: make(map[common.Hash]*types.Transaction), } } @@ -1656,7 +1656,8 @@ func newLookup() *lookup { func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) { t.lock.RLock() defer t.lock.RUnlock() - for key, value := range t.remotes { + + for key, value := range t.txs { if !f(key, value) { return } @@ -1667,7 +1668,8 @@ func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) { func (t *lookup) Get(hash common.Hash) *types.Transaction { t.lock.RLock() defer t.lock.RUnlock() - return t.remotes[hash] + + return t.txs[hash] } // Count returns the current number of transactions in the lookup. @@ -1675,7 +1677,7 @@ func (t *lookup) Count() int { t.lock.RLock() defer t.lock.RUnlock() - return len(t.remotes) + return len(t.txs) } // Slots returns the current number of slots used in the lookup. @@ -1694,7 +1696,7 @@ func (t *lookup) Add(tx *types.Transaction) { t.slots += numSlots(tx) slotsGauge.Update(int64(t.slots)) - t.remotes[tx.Hash()] = tx + t.txs[tx.Hash()] = tx } // Remove removes a transaction from the lookup. @@ -1702,7 +1704,7 @@ func (t *lookup) Remove(hash common.Hash) { t.lock.Lock() defer t.lock.Unlock() - tx, ok := t.remotes[hash] + tx, ok := t.txs[hash] if !ok { log.Error("No transaction found to be deleted", "hash", hash) return @@ -1710,7 +1712,7 @@ func (t *lookup) Remove(hash common.Hash) { t.slots -= numSlots(tx) slotsGauge.Update(int64(t.slots)) - delete(t.remotes, hash) + delete(t.txs, hash) } // TxsBelowTip finds all remote transactions below the given tip threshold. @@ -1750,7 +1752,7 @@ func (pool *LegacyPool) Clear() { // The transaction addition may attempt to reserve the sender addr which // can't happen until Clear releases the reservation lock. Clear cannot // acquire the subpool lock until the transaction addition is completed. - for _, tx := range pool.all.remotes { + for _, tx := range pool.all.txs { senderAddr, _ := types.Sender(pool.signer, tx) pool.reserve(senderAddr, false) } diff --git a/core/txpool/locals/tx_tracker.go b/core/txpool/locals/tx_tracker.go index e8783780d7..a24fcb1f4e 100644 --- a/core/txpool/locals/tx_tracker.go +++ b/core/txpool/locals/tx_tracker.go @@ -58,11 +58,10 @@ type TxTracker struct { // New creates a new TxTracker func New(journalPath string, journalTime time.Duration, chainConfig *params.ChainConfig, next *txpool.TxPool) *TxTracker { - signer := types.LatestSigner(chainConfig) pool := &TxTracker{ all: make(map[common.Hash]*types.Transaction), byAddr: make(map[common.Address]*legacypool.SortedMap), - signer: signer, + signer: types.LatestSigner(chainConfig), shutdownCh: make(chan struct{}), pool: next, } @@ -84,6 +83,7 @@ func (tracker *TxTracker) Track(tx *types.Transaction) { func (tracker *TxTracker) TrackAll(txs []*types.Transaction) { tracker.mu.Lock() defer tracker.mu.Unlock() + for _, tx := range txs { if tx.Type() == types.BlobTxType { continue @@ -101,6 +101,7 @@ func (tracker *TxTracker) TrackAll(txs []*types.Transaction) { tracker.byAddr[addr] = legacypool.NewSortedMap() } tracker.byAddr[addr].Put(tx) + if tracker.journal != nil { _ = tracker.journal.insert(tx) } @@ -112,17 +113,19 @@ func (tracker *TxTracker) TrackAll(txs []*types.Transaction) { func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transaction, rejournal map[common.Address]types.Transactions) { tracker.mu.Lock() defer tracker.mu.Unlock() + var ( numStales = 0 numOk = 0 ) for sender, txs := range tracker.byAddr { - stales := txs.Forward(tracker.pool.Nonce(sender)) // Wipe the stales + stales := txs.Forward(tracker.pool.Nonce(sender)) for _, tx := range stales { delete(tracker.all, tx.Hash()) } numStales += len(stales) + // Check the non-stale for _, tx := range txs.Flatten() { if tracker.pool.Has(tx.Hash()) { @@ -172,6 +175,7 @@ func (tracker *TxTracker) Stop() error { func (tracker *TxTracker) loop() { defer tracker.wg.Done() + if tracker.journal != nil { tracker.journal.load(func(transactions []*types.Transaction) []error { tracker.TrackAll(transactions) @@ -179,14 +183,15 @@ func (tracker *TxTracker) loop() { }) defer tracker.journal.close() } - var lastJournal = time.Now() - // Do initial check after 10 seconds, do rechecks more seldom. - t := time.NewTimer(10 * time.Second) + var ( + lastJournal = time.Now() + timer = time.NewTimer(10 * time.Second) // Do initial check after 10 seconds, do rechecks more seldom. + ) for { select { case <-tracker.shutdownCh: return - case <-t.C: + case <-timer.C: checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal resubmits, rejournal := tracker.recheck(checkJournal) if len(resubmits) > 0 { @@ -201,7 +206,7 @@ func (tracker *TxTracker) loop() { } tracker.mu.Unlock() } - t.Reset(recheckInterval) + timer.Reset(recheckInterval) } } } diff --git a/eth/backend.go b/eth/backend.go index 5cf1a338b6..fea7e4e1fe 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -237,6 +237,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { legacyPool := legacypool.New(config.TxPool, eth.blockchain) eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, blobPool}) + if err != nil { + return nil, err + } if !config.TxPool.NoLocals { rejournal := config.TxPool.Rejournal @@ -247,10 +250,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { eth.localTxTracker = locals.New(config.TxPool.Journal, rejournal, eth.blockchain.Config(), eth.txPool) stack.RegisterLifecycle(eth.localTxTracker) } - - if err != nil { - return nil, err - } // Permit the downloader to use the trie cache allowance during fast sync cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit if eth.handler, err = newHandler(&handlerConfig{ diff --git a/miner/miner.go b/miner/miner.go index 77cd14653e..595ef8081c 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -110,6 +110,7 @@ func (miner *Miner) SetExtra(extra []byte) error { return nil } +// SetPrioAddresses sets a list of addresses to prioritize for transaction inclusion. func (miner *Miner) SetPrioAddresses(prio []common.Address) { miner.confMu.Lock() miner.prio = prio