From 7d50a821fb82cd8383aa8a157ada8136ea578a25 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Wed, 21 Jun 2023 21:37:36 +0200 Subject: [PATCH] core/txpool/legacypool: implement transaction tracking for handling local txs core/txpool/legacypool: handle the case when journalling is disabled core/txpool: store txs to journal on insert --- core/txpool/legacypool/legacypool.go | 5 + core/txpool/legacypool/tx_tracker.go | 189 +++++++++++++++++++++++++++ core/txpool/txpool.go | 3 +- eth/api_backend.go | 3 + eth/backend.go | 15 ++- 5 files changed, 211 insertions(+), 4 deletions(-) create mode 100644 core/txpool/legacypool/tx_tracker.go diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index f7495dd39f..909f30de15 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -245,6 +245,11 @@ func New(config Config, chain BlockChain) *LegacyPool { // Sanitize the input to ensure no vulnerable gas prices are set config = (&config).sanitize() + // Disable locals-handling in this pool. This is a precursor to fully + // deleting locals-related code + config.NoLocals = true + config.Locals = nil + // Create the transaction pool with its initial settings pool := &LegacyPool{ config: config, diff --git a/core/txpool/legacypool/tx_tracker.go b/core/txpool/legacypool/tx_tracker.go new file mode 100644 index 0000000000..660de4ae30 --- /dev/null +++ b/core/txpool/legacypool/tx_tracker.go @@ -0,0 +1,189 @@ +// Copyright 2023 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package legacypool implements the normal EVM execution transaction pool. +package legacypool + +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" + "golang.org/x/exp/slices" +) + +var recheckInterval = time.Minute + +// TxTracker is a struct used to track priority transactions; it will check from +// time to time if the main pool has forgotten about any of the transaction +// it is tracking, and if so, submit it again. +// This is used to track 'locals'. +// This struct does not care about transaction validity, price-bumps or account limits, +// but optimistically accepts transactions. +type TxTracker struct { + all map[common.Hash]*types.Transaction // All tracked transactions + byAddr map[common.Address]*sortedMap // Transactions by address + + journal *journal // Journal of local transaction to back up to disk + rejournal time.Duration // How often to rotate journal + pool *txpool.TxPool // The tx pool to interact with + signer types.Signer + + shutdownCh chan struct{} + mu sync.Mutex + wg sync.WaitGroup +} + +func NewTxTracker(journalPath string, journalTime time.Duration, chainConfig *params.ChainConfig, next *txpool.TxPool) *TxTracker { + signer := types.LatestSigner(chainConfig) + pool := &TxTracker{ + all: make(map[common.Hash]*types.Transaction), + byAddr: make(map[common.Address]*sortedMap), + signer: signer, + shutdownCh: make(chan struct{}), + pool: next, + } + if journalPath != "" { + pool.journal = newTxJournal(journalPath) + pool.rejournal = journalTime + } + return pool +} + +// Track adds a transaction to the tracked set. +func (tracker *TxTracker) Track(tx *types.Transaction) { + tracker.TrackAll([]*types.Transaction{tx}) +} + +// TrackAll adds a list of transactions to the tracked set. +func (tracker *TxTracker) TrackAll(txs []*types.Transaction) { + tracker.mu.Lock() + defer tracker.mu.Unlock() + for _, tx := range txs { + // If we're already tracking it, it's a no-op + if _, ok := tracker.all[tx.Hash()]; ok { + continue + } + tracker.all[tx.Hash()] = tx + addr, _ := types.Sender(tracker.signer, tx) + if tracker.byAddr[addr] == nil { + tracker.byAddr[addr] = newSortedMap() + } + tracker.byAddr[addr].Put(tx) + _ = tracker.journal.insert(tx) + } +} + +// recheck checks and returns any transactions that needs to be resubmitted. +func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transaction, rejournal map[common.Address]types.Transactions) { + tracker.mu.Lock() + defer tracker.mu.Unlock() + var ( + numStales = 0 + numOk = 0 + ) + for sender, txs := range tracker.byAddr { + stales := txs.Forward(tracker.pool.Nonce(sender)) + // Wipe the stales + for _, tx := range stales { + delete(tracker.all, tx.Hash()) + } + numStales += len(stales) + // Check the non-stale + for _, tx := range txs.Flatten() { + if tracker.pool.Has(tx.Hash()) { + numOk++ + continue + } + resubmits = append(resubmits, tx) + } + } + + if journalCheck { // rejournal + rejournal = make(map[common.Address]types.Transactions) + for _, tx := range tracker.all { + addr, _ := types.Sender(tracker.signer, tx) + rejournal[addr] = append(rejournal[addr], tx) + } + // Sort them + for _, list := range rejournal { + // cmp(a, b) should return a negative number when a < b, + slices.SortFunc(list, func(a, b *types.Transaction) int { + return int(a.Nonce() - b.Nonce()) + }) + } + } + log.Debug("Tx tracker status", "need-resubmit", len(resubmits), "stale", numStales, "ok", numOk) + return resubmits, rejournal +} + +// Start implements node.Lifecycle interface +// Start is called after all services have been constructed and the networking +// layer was also initialized to spawn any goroutines required by the service. +func (tracker *TxTracker) Start() error { + tracker.wg.Add(1) + go tracker.loop() + return nil +} + +// Start implements node.Lifecycle interface +// Stop terminates all goroutines belonging to the service, blocking until they +// are all terminated. +func (tracker *TxTracker) Stop() error { + close(tracker.shutdownCh) + tracker.wg.Wait() + return nil +} + +func (tracker *TxTracker) loop() { + defer tracker.wg.Done() + if tracker.journal != nil { + tracker.journal.load(func(transactions []*types.Transaction) []error { + tracker.TrackAll(transactions) + return nil + }) + defer tracker.journal.close() + } + var lastJournal = time.Now() + // Do initial check after 10 seconds, do rechecks more seldom. + t := time.NewTimer(10 * time.Second) + for { + select { + case <-tracker.shutdownCh: + return + case <-t.C: + checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal + resubmits, rejournal := tracker.recheck(checkJournal) + if len(resubmits) > 0 { + tracker.pool.Add(resubmits, false, false) + } + if checkJournal { + // Lock to prevent journal.rotate <-> journal.insert (via TrackAll) conflicts + tracker.mu.Lock() + lastJournal = time.Now() + if err := tracker.journal.rotate(rejournal); err != nil { + log.Warn("Transaction journal rotation failed", "err", err) + } + tracker.mu.Unlock() + } + t.Reset(recheckInterval) + } + } +} diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 363fa29c02..0f2c1d860a 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -352,7 +352,8 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { // back the errors into the original sort order. errsets := make([][]error, len(p.subpools)) for i := 0; i < len(p.subpools); i++ { - errsets[i] = p.subpools[i].Add(txsets[i], local, sync) + // Note: local is explicitly set to false here. + errsets[i] = p.subpools[i].Add(txsets[i], false, sync) } errs := make([]error, len(txs)) for i, split := range splits { diff --git a/eth/api_backend.go b/eth/api_backend.go index 4e81d68e07..9e23dd1ba8 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -280,6 +280,9 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri } func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { + if locals := b.eth.localTxTracker; locals != nil { + locals.Track(signedTx) + } return b.eth.txPool.Add([]*types.Transaction{signedTx}, true, false)[0] } diff --git a/eth/backend.go b/eth/backend.go index ccfe650f41..b8c4669d6b 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -67,9 +67,10 @@ type Config = ethconfig.Config // Ethereum implements the Ethereum full node service. type Ethereum struct { // core protocol objects - config *ethconfig.Config - txPool *txpool.TxPool - blockchain *core.BlockChain + config *ethconfig.Config + txPool *txpool.TxPool + localTxTracker *legacypool.TxTracker + blockchain *core.BlockChain handler *handler discmix *enode.FairMix @@ -234,6 +235,14 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { legacyPool := legacypool.New(config.TxPool, eth.blockchain) eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, blobPool}) + + if !config.TxPool.NoLocals { + eth.localTxTracker = legacypool.NewTxTracker(config.TxPool.Journal, + config.TxPool.Rejournal, + eth.blockchain.Config(), eth.txPool) + stack.RegisterLifecycle(eth.localTxTracker) + } + if err != nil { return nil, err }