|
|
@ -36,23 +36,26 @@ import ( |
|
|
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
var ( |
|
|
|
// Transaction Pool Errors
|
|
|
|
// Transaction Pool Errors
|
|
|
|
ErrInvalidSender = errors.New("Invalid sender") |
|
|
|
ErrInvalidSender = errors.New("invalid sender") |
|
|
|
ErrNonce = errors.New("Nonce too low") |
|
|
|
ErrNonce = errors.New("nonce too low") |
|
|
|
ErrCheap = errors.New("Gas price too low for acceptance") |
|
|
|
ErrUnderpriced = errors.New("transaction underpriced") |
|
|
|
ErrBalance = errors.New("Insufficient balance") |
|
|
|
ErrReplaceUnderpriced = errors.New("replacement transaction underpriced") |
|
|
|
ErrInsufficientFunds = errors.New("Insufficient funds for gas * price + value") |
|
|
|
ErrBalance = errors.New("insufficient balance") |
|
|
|
ErrIntrinsicGas = errors.New("Intrinsic gas too low") |
|
|
|
ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value") |
|
|
|
ErrGasLimit = errors.New("Exceeds block gas limit") |
|
|
|
ErrIntrinsicGas = errors.New("intrinsic gas too low") |
|
|
|
ErrNegativeValue = errors.New("Negative value") |
|
|
|
ErrGasLimit = errors.New("exceeds block gas limit") |
|
|
|
|
|
|
|
ErrNegativeValue = errors.New("negative value") |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
var ( |
|
|
|
minPendingPerAccount = uint64(16) // Min number of guaranteed transaction slots per address
|
|
|
|
minPendingPerAccount = uint64(16) // Min number of guaranteed transaction slots per address
|
|
|
|
maxPendingTotal = uint64(4096) // Max limit of pending transactions from all accounts (soft)
|
|
|
|
maxPendingTotal = uint64(4096) // Max limit of pending transactions from all accounts (soft)
|
|
|
|
maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address
|
|
|
|
maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address
|
|
|
|
maxQueuedInTotal = uint64(1024) // Max limit of queued transactions from all accounts
|
|
|
|
maxQueuedTotal = uint64(1024) // Max limit of queued transactions from all accounts
|
|
|
|
maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued
|
|
|
|
maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued
|
|
|
|
|
|
|
|
minPriceBumpPercent = int64(10) // Minimum price bump needed to replace an old transaction
|
|
|
|
evictionInterval = time.Minute // Time interval to check for evictable transactions
|
|
|
|
evictionInterval = time.Minute // Time interval to check for evictable transactions
|
|
|
|
|
|
|
|
statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
|
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
var ( |
|
|
@ -70,6 +73,7 @@ var ( |
|
|
|
|
|
|
|
|
|
|
|
// General tx metrics
|
|
|
|
// General tx metrics
|
|
|
|
invalidTxCounter = metrics.NewCounter("txpool/invalid") |
|
|
|
invalidTxCounter = metrics.NewCounter("txpool/invalid") |
|
|
|
|
|
|
|
underpricedTxCounter = metrics.NewCounter("txpool/underpriced") |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
type stateFn func() (*state.StateDB, error) |
|
|
|
type stateFn func() (*state.StateDB, error) |
|
|
@ -86,17 +90,18 @@ type TxPool struct { |
|
|
|
currentState stateFn // The state function which will allow us to do some pre checks
|
|
|
|
currentState stateFn // The state function which will allow us to do some pre checks
|
|
|
|
pendingState *state.ManagedState |
|
|
|
pendingState *state.ManagedState |
|
|
|
gasLimit func() *big.Int // The current gas limit function callback
|
|
|
|
gasLimit func() *big.Int // The current gas limit function callback
|
|
|
|
minGasPrice *big.Int |
|
|
|
gasPrice *big.Int |
|
|
|
eventMux *event.TypeMux |
|
|
|
eventMux *event.TypeMux |
|
|
|
events *event.TypeMuxSubscription |
|
|
|
events *event.TypeMuxSubscription |
|
|
|
localTx *txSet |
|
|
|
locals *txSet |
|
|
|
signer types.Signer |
|
|
|
signer types.Signer |
|
|
|
mu sync.RWMutex |
|
|
|
mu sync.RWMutex |
|
|
|
|
|
|
|
|
|
|
|
pending map[common.Address]*txList // All currently processable transactions
|
|
|
|
pending map[common.Address]*txList // All currently processable transactions
|
|
|
|
queue map[common.Address]*txList // Queued but non-processable transactions
|
|
|
|
queue map[common.Address]*txList // Queued but non-processable transactions
|
|
|
|
all map[common.Hash]*types.Transaction // All transactions to allow lookups
|
|
|
|
|
|
|
|
beats map[common.Address]time.Time // Last heartbeat from each known account
|
|
|
|
beats map[common.Address]time.Time // Last heartbeat from each known account
|
|
|
|
|
|
|
|
all map[common.Hash]*types.Transaction // All transactions to allow lookups
|
|
|
|
|
|
|
|
priced *txPricedList // All transactions sorted by price
|
|
|
|
|
|
|
|
|
|
|
|
wg sync.WaitGroup // for shutdown sync
|
|
|
|
wg sync.WaitGroup // for shutdown sync
|
|
|
|
quit chan struct{} |
|
|
|
quit chan struct{} |
|
|
@ -110,18 +115,18 @@ func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, currentState |
|
|
|
signer: types.NewEIP155Signer(config.ChainId), |
|
|
|
signer: types.NewEIP155Signer(config.ChainId), |
|
|
|
pending: make(map[common.Address]*txList), |
|
|
|
pending: make(map[common.Address]*txList), |
|
|
|
queue: make(map[common.Address]*txList), |
|
|
|
queue: make(map[common.Address]*txList), |
|
|
|
all: make(map[common.Hash]*types.Transaction), |
|
|
|
|
|
|
|
beats: make(map[common.Address]time.Time), |
|
|
|
beats: make(map[common.Address]time.Time), |
|
|
|
|
|
|
|
all: make(map[common.Hash]*types.Transaction), |
|
|
|
eventMux: eventMux, |
|
|
|
eventMux: eventMux, |
|
|
|
currentState: currentStateFn, |
|
|
|
currentState: currentStateFn, |
|
|
|
gasLimit: gasLimitFn, |
|
|
|
gasLimit: gasLimitFn, |
|
|
|
minGasPrice: new(big.Int), |
|
|
|
gasPrice: big.NewInt(1), |
|
|
|
pendingState: nil, |
|
|
|
pendingState: nil, |
|
|
|
localTx: newTxSet(), |
|
|
|
locals: newTxSet(), |
|
|
|
events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}), |
|
|
|
events: eventMux.Subscribe(ChainHeadEvent{}, RemovedTransactionEvent{}), |
|
|
|
quit: make(chan struct{}), |
|
|
|
quit: make(chan struct{}), |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
pool.priced = newTxPricedList(&pool.all) |
|
|
|
pool.resetState() |
|
|
|
pool.resetState() |
|
|
|
|
|
|
|
|
|
|
|
pool.wg.Add(2) |
|
|
|
pool.wg.Add(2) |
|
|
@ -134,10 +139,22 @@ func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, currentState |
|
|
|
func (pool *TxPool) eventLoop() { |
|
|
|
func (pool *TxPool) eventLoop() { |
|
|
|
defer pool.wg.Done() |
|
|
|
defer pool.wg.Done() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Start a ticker and keep track of interesting pool stats to report
|
|
|
|
|
|
|
|
var prevPending, prevQueued, prevStales int |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
report := time.NewTicker(statsReportInterval) |
|
|
|
|
|
|
|
defer report.Stop() |
|
|
|
|
|
|
|
|
|
|
|
// Track chain events. When a chain events occurs (new chain canon block)
|
|
|
|
// Track chain events. When a chain events occurs (new chain canon block)
|
|
|
|
// we need to know the new state. The new state will help us determine
|
|
|
|
// we need to know the new state. The new state will help us determine
|
|
|
|
// the nonces in the managed state
|
|
|
|
// the nonces in the managed state
|
|
|
|
for ev := range pool.events.Chan() { |
|
|
|
for { |
|
|
|
|
|
|
|
select { |
|
|
|
|
|
|
|
// Handle any events fired by the system
|
|
|
|
|
|
|
|
case ev, ok := <-pool.events.Chan(): |
|
|
|
|
|
|
|
if !ok { |
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
} |
|
|
|
switch ev := ev.Data.(type) { |
|
|
|
switch ev := ev.Data.(type) { |
|
|
|
case ChainHeadEvent: |
|
|
|
case ChainHeadEvent: |
|
|
|
pool.mu.Lock() |
|
|
|
pool.mu.Lock() |
|
|
@ -146,16 +163,25 @@ func (pool *TxPool) eventLoop() { |
|
|
|
pool.homestead = true |
|
|
|
pool.homestead = true |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pool.resetState() |
|
|
|
pool.resetState() |
|
|
|
pool.mu.Unlock() |
|
|
|
pool.mu.Unlock() |
|
|
|
case GasPriceChanged: |
|
|
|
|
|
|
|
pool.mu.Lock() |
|
|
|
|
|
|
|
pool.minGasPrice = ev.Price |
|
|
|
|
|
|
|
pool.mu.Unlock() |
|
|
|
|
|
|
|
case RemovedTransactionEvent: |
|
|
|
case RemovedTransactionEvent: |
|
|
|
pool.AddBatch(ev.Txs) |
|
|
|
pool.AddBatch(ev.Txs) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Handle stats reporting ticks
|
|
|
|
|
|
|
|
case <-report.C: |
|
|
|
|
|
|
|
pool.mu.RLock() |
|
|
|
|
|
|
|
pending, queued := pool.stats() |
|
|
|
|
|
|
|
stales := pool.priced.stales |
|
|
|
|
|
|
|
pool.mu.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if pending != prevPending || queued != prevQueued || stales != prevStales { |
|
|
|
|
|
|
|
log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales) |
|
|
|
|
|
|
|
prevPending, prevQueued, prevStales = pending, queued, stales |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -191,6 +217,27 @@ func (pool *TxPool) Stop() { |
|
|
|
log.Info("Transaction pool stopped") |
|
|
|
log.Info("Transaction pool stopped") |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// GasPrice returns the current gas price enforced by the transaction pool.
|
|
|
|
|
|
|
|
func (pool *TxPool) GasPrice() *big.Int { |
|
|
|
|
|
|
|
pool.mu.RLock() |
|
|
|
|
|
|
|
defer pool.mu.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return new(big.Int).Set(pool.gasPrice) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// SetGasPrice updates the minimum price required by the transaction pool for a
|
|
|
|
|
|
|
|
// new transaction, and drops all transactions below this threshold.
|
|
|
|
|
|
|
|
func (pool *TxPool) SetGasPrice(price *big.Int) { |
|
|
|
|
|
|
|
pool.mu.Lock() |
|
|
|
|
|
|
|
defer pool.mu.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pool.gasPrice = price |
|
|
|
|
|
|
|
for _, tx := range pool.priced.Cap(price, pool.locals) { |
|
|
|
|
|
|
|
pool.removeTx(tx.Hash()) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
log.Info("Transaction pool price threshold updated", "price", price) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (pool *TxPool) State() *state.ManagedState { |
|
|
|
func (pool *TxPool) State() *state.ManagedState { |
|
|
|
pool.mu.RLock() |
|
|
|
pool.mu.RLock() |
|
|
|
defer pool.mu.RUnlock() |
|
|
|
defer pool.mu.RUnlock() |
|
|
@ -200,17 +247,25 @@ func (pool *TxPool) State() *state.ManagedState { |
|
|
|
|
|
|
|
|
|
|
|
// Stats retrieves the current pool stats, namely the number of pending and the
|
|
|
|
// Stats retrieves the current pool stats, namely the number of pending and the
|
|
|
|
// number of queued (non-executable) transactions.
|
|
|
|
// number of queued (non-executable) transactions.
|
|
|
|
func (pool *TxPool) Stats() (pending int, queued int) { |
|
|
|
func (pool *TxPool) Stats() (int, int) { |
|
|
|
pool.mu.RLock() |
|
|
|
pool.mu.RLock() |
|
|
|
defer pool.mu.RUnlock() |
|
|
|
defer pool.mu.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return pool.stats() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// stats retrieves the current pool stats, namely the number of pending and the
|
|
|
|
|
|
|
|
// number of queued (non-executable) transactions.
|
|
|
|
|
|
|
|
func (pool *TxPool) stats() (int, int) { |
|
|
|
|
|
|
|
pending := 0 |
|
|
|
for _, list := range pool.pending { |
|
|
|
for _, list := range pool.pending { |
|
|
|
pending += list.Len() |
|
|
|
pending += list.Len() |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
queued := 0 |
|
|
|
for _, list := range pool.queue { |
|
|
|
for _, list := range pool.queue { |
|
|
|
queued += list.Len() |
|
|
|
queued += list.Len() |
|
|
|
} |
|
|
|
} |
|
|
|
return |
|
|
|
return pending, queued |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Content retrieves the data content of the transaction pool, returning all the
|
|
|
|
// Content retrieves the data content of the transaction pool, returning all the
|
|
|
@ -260,16 +315,16 @@ func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) { |
|
|
|
func (pool *TxPool) SetLocal(tx *types.Transaction) { |
|
|
|
func (pool *TxPool) SetLocal(tx *types.Transaction) { |
|
|
|
pool.mu.Lock() |
|
|
|
pool.mu.Lock() |
|
|
|
defer pool.mu.Unlock() |
|
|
|
defer pool.mu.Unlock() |
|
|
|
pool.localTx.add(tx.Hash()) |
|
|
|
pool.locals.add(tx.Hash()) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// validateTx checks whether a transaction is valid according
|
|
|
|
// validateTx checks whether a transaction is valid according
|
|
|
|
// to the consensus rules.
|
|
|
|
// to the consensus rules.
|
|
|
|
func (pool *TxPool) validateTx(tx *types.Transaction) error { |
|
|
|
func (pool *TxPool) validateTx(tx *types.Transaction) error { |
|
|
|
local := pool.localTx.contains(tx.Hash()) |
|
|
|
local := pool.locals.contains(tx.Hash()) |
|
|
|
// Drop transactions under our own minimal accepted gas price
|
|
|
|
// Drop transactions under our own minimal accepted gas price
|
|
|
|
if !local && pool.minGasPrice.Cmp(tx.GasPrice()) > 0 { |
|
|
|
if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 { |
|
|
|
return ErrCheap |
|
|
|
return ErrUnderpriced |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
currentState, err := pool.currentState() |
|
|
|
currentState, err := pool.currentState() |
|
|
@ -314,31 +369,72 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// add validates a transaction and inserts it into the non-executable queue for
|
|
|
|
// add validates a transaction and inserts it into the non-executable queue for
|
|
|
|
// later pending promotion and execution.
|
|
|
|
// later pending promotion and execution. If the transaction is a replacement for
|
|
|
|
func (pool *TxPool) add(tx *types.Transaction) error { |
|
|
|
// an already pending or queued one, it overwrites the previous and returns this
|
|
|
|
|
|
|
|
// so outer code doesn't uselessly call promote.
|
|
|
|
|
|
|
|
func (pool *TxPool) add(tx *types.Transaction) (bool, error) { |
|
|
|
// If the transaction is already known, discard it
|
|
|
|
// If the transaction is already known, discard it
|
|
|
|
hash := tx.Hash() |
|
|
|
hash := tx.Hash() |
|
|
|
if pool.all[hash] != nil { |
|
|
|
if pool.all[hash] != nil { |
|
|
|
log.Trace("Discarding already known transaction", "hash", hash) |
|
|
|
log.Trace("Discarding already known transaction", "hash", hash) |
|
|
|
return fmt.Errorf("known transaction: %x", hash) |
|
|
|
return false, fmt.Errorf("known transaction: %x", hash) |
|
|
|
} |
|
|
|
} |
|
|
|
// Otherwise ensure basic validation passes and queue it up
|
|
|
|
// If the transaction fails basic validation, discard it
|
|
|
|
if err := pool.validateTx(tx); err != nil { |
|
|
|
if err := pool.validateTx(tx); err != nil { |
|
|
|
log.Trace("Discarding invalid transaction", "hash", hash, "err", err) |
|
|
|
log.Trace("Discarding invalid transaction", "hash", hash, "err", err) |
|
|
|
invalidTxCounter.Inc(1) |
|
|
|
invalidTxCounter.Inc(1) |
|
|
|
return err |
|
|
|
return false, err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// If the transaction pool is full, discard underpriced transactions
|
|
|
|
|
|
|
|
if uint64(len(pool.all)) >= maxPendingTotal+maxQueuedTotal { |
|
|
|
|
|
|
|
// If the new transaction is underpriced, don't accept it
|
|
|
|
|
|
|
|
if pool.priced.Underpriced(tx, pool.locals) { |
|
|
|
|
|
|
|
log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice()) |
|
|
|
|
|
|
|
underpricedTxCounter.Inc(1) |
|
|
|
|
|
|
|
return false, ErrUnderpriced |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// New transaction is better than our worse ones, make room for it
|
|
|
|
|
|
|
|
drop := pool.priced.Discard(len(pool.all)-int(maxPendingTotal+maxQueuedTotal-1), pool.locals) |
|
|
|
|
|
|
|
for _, tx := range drop { |
|
|
|
|
|
|
|
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice()) |
|
|
|
|
|
|
|
underpricedTxCounter.Inc(1) |
|
|
|
|
|
|
|
pool.removeTx(tx.Hash()) |
|
|
|
} |
|
|
|
} |
|
|
|
pool.enqueueTx(hash, tx) |
|
|
|
} |
|
|
|
|
|
|
|
// If the transaction is replacing an already pending one, do directly
|
|
|
|
|
|
|
|
from, _ := types.Sender(pool.signer, tx) // already validated
|
|
|
|
|
|
|
|
if list := pool.pending[from]; list != nil && list.Overlaps(tx) { |
|
|
|
|
|
|
|
// Nonce already pending, check if required price bump is met
|
|
|
|
|
|
|
|
inserted, old := list.Add(tx) |
|
|
|
|
|
|
|
if !inserted { |
|
|
|
|
|
|
|
pendingDiscardCounter.Inc(1) |
|
|
|
|
|
|
|
return false, ErrReplaceUnderpriced |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// New transaction is better, replace old one
|
|
|
|
|
|
|
|
if old != nil { |
|
|
|
|
|
|
|
delete(pool.all, old.Hash()) |
|
|
|
|
|
|
|
pool.priced.Removed() |
|
|
|
|
|
|
|
pendingReplaceCounter.Inc(1) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
pool.all[tx.Hash()] = tx |
|
|
|
|
|
|
|
pool.priced.Put(tx) |
|
|
|
|
|
|
|
|
|
|
|
// Print a log message if low enough level is set
|
|
|
|
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) |
|
|
|
log.Debug("Pooled new transaction", "hash", hash, "from", log.Lazy{Fn: func() common.Address { from, _ := types.Sender(pool.signer, tx); return from }}, "to", tx.To()) |
|
|
|
return old != nil, nil |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// New transaction isn't replacing a pending one, push into queue
|
|
|
|
|
|
|
|
replace, err := pool.enqueueTx(hash, tx) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
return false, err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) |
|
|
|
|
|
|
|
return replace, nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// enqueueTx inserts a new transaction into the non-executable transaction queue.
|
|
|
|
// enqueueTx inserts a new transaction into the non-executable transaction queue.
|
|
|
|
//
|
|
|
|
//
|
|
|
|
// Note, this method assumes the pool lock is held!
|
|
|
|
// Note, this method assumes the pool lock is held!
|
|
|
|
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) { |
|
|
|
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) { |
|
|
|
// Try to insert the transaction into the future queue
|
|
|
|
// Try to insert the transaction into the future queue
|
|
|
|
from, _ := types.Sender(pool.signer, tx) // already validated
|
|
|
|
from, _ := types.Sender(pool.signer, tx) // already validated
|
|
|
|
if pool.queue[from] == nil { |
|
|
|
if pool.queue[from] == nil { |
|
|
@ -346,15 +442,19 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) { |
|
|
|
} |
|
|
|
} |
|
|
|
inserted, old := pool.queue[from].Add(tx) |
|
|
|
inserted, old := pool.queue[from].Add(tx) |
|
|
|
if !inserted { |
|
|
|
if !inserted { |
|
|
|
|
|
|
|
// An older transaction was better, discard this
|
|
|
|
queuedDiscardCounter.Inc(1) |
|
|
|
queuedDiscardCounter.Inc(1) |
|
|
|
return // An older transaction was better, discard this
|
|
|
|
return false, ErrReplaceUnderpriced |
|
|
|
} |
|
|
|
} |
|
|
|
// Discard any previous transaction and mark this
|
|
|
|
// Discard any previous transaction and mark this
|
|
|
|
if old != nil { |
|
|
|
if old != nil { |
|
|
|
delete(pool.all, old.Hash()) |
|
|
|
delete(pool.all, old.Hash()) |
|
|
|
|
|
|
|
pool.priced.Removed() |
|
|
|
queuedReplaceCounter.Inc(1) |
|
|
|
queuedReplaceCounter.Inc(1) |
|
|
|
} |
|
|
|
} |
|
|
|
pool.all[hash] = tx |
|
|
|
pool.all[hash] = tx |
|
|
|
|
|
|
|
pool.priced.Put(tx) |
|
|
|
|
|
|
|
return old != nil, nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// promoteTx adds a transaction to the pending (processable) list of transactions.
|
|
|
|
// promoteTx adds a transaction to the pending (processable) list of transactions.
|
|
|
@ -371,16 +471,23 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T |
|
|
|
if !inserted { |
|
|
|
if !inserted { |
|
|
|
// An older transaction was better, discard this
|
|
|
|
// An older transaction was better, discard this
|
|
|
|
delete(pool.all, hash) |
|
|
|
delete(pool.all, hash) |
|
|
|
|
|
|
|
pool.priced.Removed() |
|
|
|
|
|
|
|
|
|
|
|
pendingDiscardCounter.Inc(1) |
|
|
|
pendingDiscardCounter.Inc(1) |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
// Otherwise discard any previous transaction and mark this
|
|
|
|
// Otherwise discard any previous transaction and mark this
|
|
|
|
if old != nil { |
|
|
|
if old != nil { |
|
|
|
delete(pool.all, old.Hash()) |
|
|
|
delete(pool.all, old.Hash()) |
|
|
|
|
|
|
|
pool.priced.Removed() |
|
|
|
|
|
|
|
|
|
|
|
pendingReplaceCounter.Inc(1) |
|
|
|
pendingReplaceCounter.Inc(1) |
|
|
|
} |
|
|
|
} |
|
|
|
pool.all[hash] = tx // Failsafe to work around direct pending inserts (tests)
|
|
|
|
// Failsafe to work around direct pending inserts (tests)
|
|
|
|
|
|
|
|
if pool.all[hash] == nil { |
|
|
|
|
|
|
|
pool.all[hash] = tx |
|
|
|
|
|
|
|
pool.priced.Put(tx) |
|
|
|
|
|
|
|
} |
|
|
|
// Set the potentially new pending nonce and notify any subsystems of the new tx
|
|
|
|
// Set the potentially new pending nonce and notify any subsystems of the new tx
|
|
|
|
pool.beats[addr] = time.Now() |
|
|
|
pool.beats[addr] = time.Now() |
|
|
|
pool.pendingState.SetNonce(addr, tx.Nonce()+1) |
|
|
|
pool.pendingState.SetNonce(addr, tx.Nonce()+1) |
|
|
@ -392,16 +499,19 @@ func (pool *TxPool) Add(tx *types.Transaction) error { |
|
|
|
pool.mu.Lock() |
|
|
|
pool.mu.Lock() |
|
|
|
defer pool.mu.Unlock() |
|
|
|
defer pool.mu.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
if err := pool.add(tx); err != nil { |
|
|
|
// Try to inject the transaction and update any state
|
|
|
|
|
|
|
|
replace, err := pool.add(tx) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
state, err := pool.currentState() |
|
|
|
state, err := pool.currentState() |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// If we added a new transaction, run promotion checks and return
|
|
|
|
|
|
|
|
if !replace { |
|
|
|
pool.promoteExecutables(state) |
|
|
|
pool.promoteExecutables(state) |
|
|
|
|
|
|
|
} |
|
|
|
return nil |
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -411,10 +521,13 @@ func (pool *TxPool) AddBatch(txs []*types.Transaction) error { |
|
|
|
defer pool.mu.Unlock() |
|
|
|
defer pool.mu.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
// Add the batch of transaction, tracking the accepted ones
|
|
|
|
// Add the batch of transaction, tracking the accepted ones
|
|
|
|
added := 0 |
|
|
|
replaced, added := true, 0 |
|
|
|
for _, tx := range txs { |
|
|
|
for _, tx := range txs { |
|
|
|
if err := pool.add(tx); err == nil { |
|
|
|
if replace, err := pool.add(tx); err == nil { |
|
|
|
added++ |
|
|
|
added++ |
|
|
|
|
|
|
|
if !replace { |
|
|
|
|
|
|
|
replaced = false |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
// Only reprocess the internal state if something was actually added
|
|
|
|
// Only reprocess the internal state if something was actually added
|
|
|
@ -423,8 +536,10 @@ func (pool *TxPool) AddBatch(txs []*types.Transaction) error { |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if !replaced { |
|
|
|
pool.promoteExecutables(state) |
|
|
|
pool.promoteExecutables(state) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
return nil |
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -467,6 +582,7 @@ func (pool *TxPool) removeTx(hash common.Hash) { |
|
|
|
|
|
|
|
|
|
|
|
// Remove it from the list of known transactions
|
|
|
|
// Remove it from the list of known transactions
|
|
|
|
delete(pool.all, hash) |
|
|
|
delete(pool.all, hash) |
|
|
|
|
|
|
|
pool.priced.Removed() |
|
|
|
|
|
|
|
|
|
|
|
// Remove the transaction from the pending lists and reset the account nonce
|
|
|
|
// Remove the transaction from the pending lists and reset the account nonce
|
|
|
|
if pending := pool.pending[addr]; pending != nil { |
|
|
|
if pending := pool.pending[addr]; pending != nil { |
|
|
@ -506,28 +622,31 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) { |
|
|
|
// Drop all transactions that are deemed too old (low nonce)
|
|
|
|
// Drop all transactions that are deemed too old (low nonce)
|
|
|
|
for _, tx := range list.Forward(state.GetNonce(addr)) { |
|
|
|
for _, tx := range list.Forward(state.GetNonce(addr)) { |
|
|
|
hash := tx.Hash() |
|
|
|
hash := tx.Hash() |
|
|
|
log.Debug("Removed old queued transaction", "hash", hash) |
|
|
|
log.Trace("Removed old queued transaction", "hash", hash) |
|
|
|
delete(pool.all, hash) |
|
|
|
delete(pool.all, hash) |
|
|
|
|
|
|
|
pool.priced.Removed() |
|
|
|
} |
|
|
|
} |
|
|
|
// Drop all transactions that are too costly (low balance)
|
|
|
|
// Drop all transactions that are too costly (low balance)
|
|
|
|
drops, _ := list.Filter(state.GetBalance(addr)) |
|
|
|
drops, _ := list.Filter(state.GetBalance(addr)) |
|
|
|
for _, tx := range drops { |
|
|
|
for _, tx := range drops { |
|
|
|
hash := tx.Hash() |
|
|
|
hash := tx.Hash() |
|
|
|
log.Debug("Removed unpayable queued transaction", "hash", hash) |
|
|
|
log.Trace("Removed unpayable queued transaction", "hash", hash) |
|
|
|
delete(pool.all, hash) |
|
|
|
delete(pool.all, hash) |
|
|
|
|
|
|
|
pool.priced.Removed() |
|
|
|
queuedNofundsCounter.Inc(1) |
|
|
|
queuedNofundsCounter.Inc(1) |
|
|
|
} |
|
|
|
} |
|
|
|
// Gather all executable transactions and promote them
|
|
|
|
// Gather all executable transactions and promote them
|
|
|
|
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { |
|
|
|
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { |
|
|
|
hash := tx.Hash() |
|
|
|
hash := tx.Hash() |
|
|
|
log.Debug("Promoting queued transaction", "hash", hash) |
|
|
|
log.Trace("Promoting queued transaction", "hash", hash) |
|
|
|
pool.promoteTx(addr, hash, tx) |
|
|
|
pool.promoteTx(addr, hash, tx) |
|
|
|
} |
|
|
|
} |
|
|
|
// Drop all transactions over the allowed limit
|
|
|
|
// Drop all transactions over the allowed limit
|
|
|
|
for _, tx := range list.Cap(int(maxQueuedPerAccount)) { |
|
|
|
for _, tx := range list.Cap(int(maxQueuedPerAccount)) { |
|
|
|
hash := tx.Hash() |
|
|
|
hash := tx.Hash() |
|
|
|
log.Debug("Removed cap-exceeding queued transaction", "hash", hash) |
|
|
|
log.Trace("Removed cap-exceeding queued transaction", "hash", hash) |
|
|
|
delete(pool.all, hash) |
|
|
|
delete(pool.all, hash) |
|
|
|
|
|
|
|
pool.priced.Removed() |
|
|
|
queuedRLCounter.Inc(1) |
|
|
|
queuedRLCounter.Inc(1) |
|
|
|
} |
|
|
|
} |
|
|
|
queued += uint64(list.Len()) |
|
|
|
queued += uint64(list.Len()) |
|
|
@ -551,7 +670,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) { |
|
|
|
if uint64(list.Len()) > minPendingPerAccount { |
|
|
|
if uint64(list.Len()) > minPendingPerAccount { |
|
|
|
// Skip local accounts as pools should maintain backlogs for themselves
|
|
|
|
// Skip local accounts as pools should maintain backlogs for themselves
|
|
|
|
for _, tx := range list.txs.items { |
|
|
|
for _, tx := range list.txs.items { |
|
|
|
if !pool.localTx.contains(tx.Hash()) { |
|
|
|
if !pool.locals.contains(tx.Hash()) { |
|
|
|
spammers.Push(addr, float32(list.Len())) |
|
|
|
spammers.Push(addr, float32(list.Len())) |
|
|
|
} |
|
|
|
} |
|
|
|
break // Checking on transaction for locality is enough
|
|
|
|
break // Checking on transaction for locality is enough
|
|
|
@ -593,7 +712,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) { |
|
|
|
pendingRLCounter.Inc(int64(pendingBeforeCap - pending)) |
|
|
|
pendingRLCounter.Inc(int64(pendingBeforeCap - pending)) |
|
|
|
} |
|
|
|
} |
|
|
|
// If we've queued more transactions than the hard limit, drop oldest ones
|
|
|
|
// If we've queued more transactions than the hard limit, drop oldest ones
|
|
|
|
if queued > maxQueuedInTotal { |
|
|
|
if queued > maxQueuedTotal { |
|
|
|
// Sort all accounts with queued transactions by heartbeat
|
|
|
|
// Sort all accounts with queued transactions by heartbeat
|
|
|
|
addresses := make(addresssByHeartbeat, 0, len(pool.queue)) |
|
|
|
addresses := make(addresssByHeartbeat, 0, len(pool.queue)) |
|
|
|
for addr := range pool.queue { |
|
|
|
for addr := range pool.queue { |
|
|
@ -602,7 +721,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) { |
|
|
|
sort.Sort(addresses) |
|
|
|
sort.Sort(addresses) |
|
|
|
|
|
|
|
|
|
|
|
// Drop transactions until the total is below the limit
|
|
|
|
// Drop transactions until the total is below the limit
|
|
|
|
for drop := queued - maxQueuedInTotal; drop > 0; { |
|
|
|
for drop := queued - maxQueuedTotal; drop > 0; { |
|
|
|
addr := addresses[len(addresses)-1] |
|
|
|
addr := addresses[len(addresses)-1] |
|
|
|
list := pool.queue[addr.address] |
|
|
|
list := pool.queue[addr.address] |
|
|
|
|
|
|
|
|
|
|
@ -639,20 +758,22 @@ func (pool *TxPool) demoteUnexecutables(state *state.StateDB) { |
|
|
|
// Drop all transactions that are deemed too old (low nonce)
|
|
|
|
// Drop all transactions that are deemed too old (low nonce)
|
|
|
|
for _, tx := range list.Forward(nonce) { |
|
|
|
for _, tx := range list.Forward(nonce) { |
|
|
|
hash := tx.Hash() |
|
|
|
hash := tx.Hash() |
|
|
|
log.Debug("Removed old pending transaction", "hash", hash) |
|
|
|
log.Trace("Removed old pending transaction", "hash", hash) |
|
|
|
delete(pool.all, hash) |
|
|
|
delete(pool.all, hash) |
|
|
|
|
|
|
|
pool.priced.Removed() |
|
|
|
} |
|
|
|
} |
|
|
|
// Drop all transactions that are too costly (low balance), and queue any invalids back for later
|
|
|
|
// Drop all transactions that are too costly (low balance), and queue any invalids back for later
|
|
|
|
drops, invalids := list.Filter(state.GetBalance(addr)) |
|
|
|
drops, invalids := list.Filter(state.GetBalance(addr)) |
|
|
|
for _, tx := range drops { |
|
|
|
for _, tx := range drops { |
|
|
|
hash := tx.Hash() |
|
|
|
hash := tx.Hash() |
|
|
|
log.Debug("Removed unpayable pending transaction", "hash", hash) |
|
|
|
log.Trace("Removed unpayable pending transaction", "hash", hash) |
|
|
|
delete(pool.all, hash) |
|
|
|
delete(pool.all, hash) |
|
|
|
|
|
|
|
pool.priced.Removed() |
|
|
|
pendingNofundsCounter.Inc(1) |
|
|
|
pendingNofundsCounter.Inc(1) |
|
|
|
} |
|
|
|
} |
|
|
|
for _, tx := range invalids { |
|
|
|
for _, tx := range invalids { |
|
|
|
hash := tx.Hash() |
|
|
|
hash := tx.Hash() |
|
|
|
log.Debug("Demoting pending transaction", "hash", hash) |
|
|
|
log.Trace("Demoting pending transaction", "hash", hash) |
|
|
|
pool.enqueueTx(hash, tx) |
|
|
|
pool.enqueueTx(hash, tx) |
|
|
|
} |
|
|
|
} |
|
|
|
// Delete the entire queue entry if it became empty.
|
|
|
|
// Delete the entire queue entry if it became empty.
|
|
|
|