core/txpool, eth, miner: code polish

pull/30559/head
Gary Rong 1 week ago
parent a0283d1e12
commit 926512610a
  1. 24
      core/txpool/legacypool/legacypool.go
  2. 21
      core/txpool/locals/tx_tracker.go
  3. 7
      eth/backend.go
  4. 1
      miner/miner.go

@ -1638,15 +1638,15 @@ func (as *accountSet) merge(other *accountSet) {
// peeking into the pool in LegacyPool.Get without having to acquire the widely scoped // peeking into the pool in LegacyPool.Get without having to acquire the widely scoped
// LegacyPool.mu mutex. // LegacyPool.mu mutex.
type lookup struct { type lookup struct {
slots int slots int
lock sync.RWMutex lock sync.RWMutex
remotes map[common.Hash]*types.Transaction txs map[common.Hash]*types.Transaction
} }
// newLookup returns a new lookup structure. // newLookup returns a new lookup structure.
func newLookup() *lookup { func newLookup() *lookup {
return &lookup{ return &lookup{
remotes: make(map[common.Hash]*types.Transaction), txs: make(map[common.Hash]*types.Transaction),
} }
} }
@ -1656,7 +1656,8 @@ func newLookup() *lookup {
func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) { func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) {
t.lock.RLock() t.lock.RLock()
defer t.lock.RUnlock() defer t.lock.RUnlock()
for key, value := range t.remotes {
for key, value := range t.txs {
if !f(key, value) { if !f(key, value) {
return return
} }
@ -1667,7 +1668,8 @@ func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) {
func (t *lookup) Get(hash common.Hash) *types.Transaction { func (t *lookup) Get(hash common.Hash) *types.Transaction {
t.lock.RLock() t.lock.RLock()
defer t.lock.RUnlock() defer t.lock.RUnlock()
return t.remotes[hash]
return t.txs[hash]
} }
// Count returns the current number of transactions in the lookup. // Count returns the current number of transactions in the lookup.
@ -1675,7 +1677,7 @@ func (t *lookup) Count() int {
t.lock.RLock() t.lock.RLock()
defer t.lock.RUnlock() defer t.lock.RUnlock()
return len(t.remotes) return len(t.txs)
} }
// Slots returns the current number of slots used in the lookup. // Slots returns the current number of slots used in the lookup.
@ -1694,7 +1696,7 @@ func (t *lookup) Add(tx *types.Transaction) {
t.slots += numSlots(tx) t.slots += numSlots(tx)
slotsGauge.Update(int64(t.slots)) slotsGauge.Update(int64(t.slots))
t.remotes[tx.Hash()] = tx t.txs[tx.Hash()] = tx
} }
// Remove removes a transaction from the lookup. // Remove removes a transaction from the lookup.
@ -1702,7 +1704,7 @@ func (t *lookup) Remove(hash common.Hash) {
t.lock.Lock() t.lock.Lock()
defer t.lock.Unlock() defer t.lock.Unlock()
tx, ok := t.remotes[hash] tx, ok := t.txs[hash]
if !ok { if !ok {
log.Error("No transaction found to be deleted", "hash", hash) log.Error("No transaction found to be deleted", "hash", hash)
return return
@ -1710,7 +1712,7 @@ func (t *lookup) Remove(hash common.Hash) {
t.slots -= numSlots(tx) t.slots -= numSlots(tx)
slotsGauge.Update(int64(t.slots)) slotsGauge.Update(int64(t.slots))
delete(t.remotes, hash) delete(t.txs, hash)
} }
// TxsBelowTip finds all remote transactions below the given tip threshold. // TxsBelowTip finds all remote transactions below the given tip threshold.
@ -1750,7 +1752,7 @@ func (pool *LegacyPool) Clear() {
// The transaction addition may attempt to reserve the sender addr which // The transaction addition may attempt to reserve the sender addr which
// can't happen until Clear releases the reservation lock. Clear cannot // can't happen until Clear releases the reservation lock. Clear cannot
// acquire the subpool lock until the transaction addition is completed. // acquire the subpool lock until the transaction addition is completed.
for _, tx := range pool.all.remotes { for _, tx := range pool.all.txs {
senderAddr, _ := types.Sender(pool.signer, tx) senderAddr, _ := types.Sender(pool.signer, tx)
pool.reserve(senderAddr, false) pool.reserve(senderAddr, false)
} }

@ -58,11 +58,10 @@ type TxTracker struct {
// New creates a new TxTracker // New creates a new TxTracker
func New(journalPath string, journalTime time.Duration, chainConfig *params.ChainConfig, next *txpool.TxPool) *TxTracker { func New(journalPath string, journalTime time.Duration, chainConfig *params.ChainConfig, next *txpool.TxPool) *TxTracker {
signer := types.LatestSigner(chainConfig)
pool := &TxTracker{ pool := &TxTracker{
all: make(map[common.Hash]*types.Transaction), all: make(map[common.Hash]*types.Transaction),
byAddr: make(map[common.Address]*legacypool.SortedMap), byAddr: make(map[common.Address]*legacypool.SortedMap),
signer: signer, signer: types.LatestSigner(chainConfig),
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
pool: next, pool: next,
} }
@ -84,6 +83,7 @@ func (tracker *TxTracker) Track(tx *types.Transaction) {
func (tracker *TxTracker) TrackAll(txs []*types.Transaction) { func (tracker *TxTracker) TrackAll(txs []*types.Transaction) {
tracker.mu.Lock() tracker.mu.Lock()
defer tracker.mu.Unlock() defer tracker.mu.Unlock()
for _, tx := range txs { for _, tx := range txs {
if tx.Type() == types.BlobTxType { if tx.Type() == types.BlobTxType {
continue continue
@ -101,6 +101,7 @@ func (tracker *TxTracker) TrackAll(txs []*types.Transaction) {
tracker.byAddr[addr] = legacypool.NewSortedMap() tracker.byAddr[addr] = legacypool.NewSortedMap()
} }
tracker.byAddr[addr].Put(tx) tracker.byAddr[addr].Put(tx)
if tracker.journal != nil { if tracker.journal != nil {
_ = tracker.journal.insert(tx) _ = tracker.journal.insert(tx)
} }
@ -112,17 +113,19 @@ func (tracker *TxTracker) TrackAll(txs []*types.Transaction) {
func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transaction, rejournal map[common.Address]types.Transactions) { func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transaction, rejournal map[common.Address]types.Transactions) {
tracker.mu.Lock() tracker.mu.Lock()
defer tracker.mu.Unlock() defer tracker.mu.Unlock()
var ( var (
numStales = 0 numStales = 0
numOk = 0 numOk = 0
) )
for sender, txs := range tracker.byAddr { for sender, txs := range tracker.byAddr {
stales := txs.Forward(tracker.pool.Nonce(sender))
// Wipe the stales // Wipe the stales
stales := txs.Forward(tracker.pool.Nonce(sender))
for _, tx := range stales { for _, tx := range stales {
delete(tracker.all, tx.Hash()) delete(tracker.all, tx.Hash())
} }
numStales += len(stales) numStales += len(stales)
// Check the non-stale // Check the non-stale
for _, tx := range txs.Flatten() { for _, tx := range txs.Flatten() {
if tracker.pool.Has(tx.Hash()) { if tracker.pool.Has(tx.Hash()) {
@ -172,6 +175,7 @@ func (tracker *TxTracker) Stop() error {
func (tracker *TxTracker) loop() { func (tracker *TxTracker) loop() {
defer tracker.wg.Done() defer tracker.wg.Done()
if tracker.journal != nil { if tracker.journal != nil {
tracker.journal.load(func(transactions []*types.Transaction) []error { tracker.journal.load(func(transactions []*types.Transaction) []error {
tracker.TrackAll(transactions) tracker.TrackAll(transactions)
@ -179,14 +183,15 @@ func (tracker *TxTracker) loop() {
}) })
defer tracker.journal.close() defer tracker.journal.close()
} }
var lastJournal = time.Now() var (
// Do initial check after 10 seconds, do rechecks more seldom. lastJournal = time.Now()
t := time.NewTimer(10 * time.Second) timer = time.NewTimer(10 * time.Second) // Do initial check after 10 seconds, do rechecks more seldom.
)
for { for {
select { select {
case <-tracker.shutdownCh: case <-tracker.shutdownCh:
return return
case <-t.C: case <-timer.C:
checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal
resubmits, rejournal := tracker.recheck(checkJournal) resubmits, rejournal := tracker.recheck(checkJournal)
if len(resubmits) > 0 { if len(resubmits) > 0 {
@ -201,7 +206,7 @@ func (tracker *TxTracker) loop() {
} }
tracker.mu.Unlock() tracker.mu.Unlock()
} }
t.Reset(recheckInterval) timer.Reset(recheckInterval)
} }
} }
} }

@ -237,6 +237,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
legacyPool := legacypool.New(config.TxPool, eth.blockchain) legacyPool := legacypool.New(config.TxPool, eth.blockchain)
eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, blobPool}) eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, blobPool})
if err != nil {
return nil, err
}
if !config.TxPool.NoLocals { if !config.TxPool.NoLocals {
rejournal := config.TxPool.Rejournal rejournal := config.TxPool.Rejournal
@ -247,10 +250,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
eth.localTxTracker = locals.New(config.TxPool.Journal, rejournal, eth.blockchain.Config(), eth.txPool) eth.localTxTracker = locals.New(config.TxPool.Journal, rejournal, eth.blockchain.Config(), eth.txPool)
stack.RegisterLifecycle(eth.localTxTracker) stack.RegisterLifecycle(eth.localTxTracker)
} }
if err != nil {
return nil, err
}
// Permit the downloader to use the trie cache allowance during fast sync // Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit
if eth.handler, err = newHandler(&handlerConfig{ if eth.handler, err = newHandler(&handlerConfig{

@ -110,6 +110,7 @@ func (miner *Miner) SetExtra(extra []byte) error {
return nil return nil
} }
// SetPrioAddresses sets a list of addresses to prioritize for transaction inclusion.
func (miner *Miner) SetPrioAddresses(prio []common.Address) { func (miner *Miner) SetPrioAddresses(prio []common.Address) {
miner.confMu.Lock() miner.confMu.Lock()
miner.prio = prio miner.prio = prio

Loading…
Cancel
Save