|
|
|
@ -51,6 +51,11 @@ const ( |
|
|
|
|
|
|
|
|
|
type stateFn func() (*state.StateDB, error) |
|
|
|
|
|
|
|
|
|
// TxList is a "list" of transactions belonging to an account, sorted by account
|
|
|
|
|
// nonce. To allow gaps and avoid constant copying, the list is represented as a
|
|
|
|
|
// hash map.
|
|
|
|
|
type TxList map[uint64]*types.Transaction |
|
|
|
|
|
|
|
|
|
// TxPool contains all currently known transactions. Transactions
|
|
|
|
|
// enter the pool when they are received from the network or submitted
|
|
|
|
|
// locally. They exit the pool when they are included in the blockchain.
|
|
|
|
@ -68,8 +73,10 @@ type TxPool struct { |
|
|
|
|
events event.Subscription |
|
|
|
|
localTx *txSet |
|
|
|
|
mu sync.RWMutex |
|
|
|
|
pending map[common.Hash]*types.Transaction // processable transactions
|
|
|
|
|
queue map[common.Address]map[common.Hash]*types.Transaction |
|
|
|
|
|
|
|
|
|
pending map[common.Address]TxList // All currently processable transactions
|
|
|
|
|
queue map[common.Address]TxList // Queued but non-processable transactions
|
|
|
|
|
all map[common.Hash]*types.Transaction // All transactions to allow lookups
|
|
|
|
|
|
|
|
|
|
wg sync.WaitGroup // for shutdown sync
|
|
|
|
|
|
|
|
|
@ -79,8 +86,9 @@ type TxPool struct { |
|
|
|
|
func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { |
|
|
|
|
pool := &TxPool{ |
|
|
|
|
config: config, |
|
|
|
|
pending: make(map[common.Hash]*types.Transaction), |
|
|
|
|
queue: make(map[common.Address]map[common.Hash]*types.Transaction), |
|
|
|
|
pending: make(map[common.Address]TxList), |
|
|
|
|
queue: make(map[common.Address]TxList), |
|
|
|
|
all: make(map[common.Hash]*types.Transaction), |
|
|
|
|
eventMux: eventMux, |
|
|
|
|
currentState: currentStateFn, |
|
|
|
|
gasLimit: gasLimitFn, |
|
|
|
@ -143,12 +151,12 @@ func (pool *TxPool) resetState() { |
|
|
|
|
|
|
|
|
|
// Loop over the pending transactions and base the nonce of the new
|
|
|
|
|
// pending transaction set.
|
|
|
|
|
for _, tx := range pool.pending { |
|
|
|
|
if addr, err := tx.From(); err == nil { |
|
|
|
|
for addr, txs := range pool.pending { |
|
|
|
|
// Set the nonce. Transaction nonce can never be lower
|
|
|
|
|
// than the state nonce; validatePool took care of that.
|
|
|
|
|
if pool.pendingState.GetNonce(addr) <= tx.Nonce() { |
|
|
|
|
pool.pendingState.SetNonce(addr, tx.Nonce()+1) |
|
|
|
|
for nonce, _ := range txs { |
|
|
|
|
if pool.pendingState.GetNonce(addr) <= nonce { |
|
|
|
|
pool.pendingState.SetNonce(addr, nonce+1) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -174,7 +182,9 @@ func (pool *TxPool) Stats() (pending int, queued int) { |
|
|
|
|
pool.mu.RLock() |
|
|
|
|
defer pool.mu.RUnlock() |
|
|
|
|
|
|
|
|
|
pending = len(pool.pending) |
|
|
|
|
for _, txs := range pool.pending { |
|
|
|
|
pending += len(txs) |
|
|
|
|
} |
|
|
|
|
for _, txs := range pool.queue { |
|
|
|
|
queued += len(txs) |
|
|
|
|
} |
|
|
|
@ -183,30 +193,27 @@ func (pool *TxPool) Stats() (pending int, queued int) { |
|
|
|
|
|
|
|
|
|
// Content retrieves the data content of the transaction pool, returning all the
|
|
|
|
|
// pending as well as queued transactions, grouped by account and nonce.
|
|
|
|
|
func (pool *TxPool) Content() (map[common.Address]map[uint64][]*types.Transaction, map[common.Address]map[uint64][]*types.Transaction) { |
|
|
|
|
func (pool *TxPool) Content() (map[common.Address]TxList, map[common.Address]TxList) { |
|
|
|
|
pool.mu.RLock() |
|
|
|
|
defer pool.mu.RUnlock() |
|
|
|
|
|
|
|
|
|
// Retrieve all the pending transactions and sort by account and by nonce
|
|
|
|
|
pending := make(map[common.Address]map[uint64][]*types.Transaction) |
|
|
|
|
for _, tx := range pool.pending { |
|
|
|
|
account, _ := tx.From() |
|
|
|
|
|
|
|
|
|
owned, ok := pending[account] |
|
|
|
|
if !ok { |
|
|
|
|
owned = make(map[uint64][]*types.Transaction) |
|
|
|
|
pending[account] = owned |
|
|
|
|
pending := make(map[common.Address]TxList) |
|
|
|
|
for addr, txs := range pool.pending { |
|
|
|
|
copy := make(TxList) |
|
|
|
|
for nonce, tx := range txs { |
|
|
|
|
copy[nonce] = tx |
|
|
|
|
} |
|
|
|
|
owned[tx.Nonce()] = append(owned[tx.Nonce()], tx) |
|
|
|
|
pending[addr] = copy |
|
|
|
|
} |
|
|
|
|
// Retrieve all the queued transactions and sort by account and by nonce
|
|
|
|
|
queued := make(map[common.Address]map[uint64][]*types.Transaction) |
|
|
|
|
for account, txs := range pool.queue { |
|
|
|
|
owned := make(map[uint64][]*types.Transaction) |
|
|
|
|
for _, tx := range txs { |
|
|
|
|
owned[tx.Nonce()] = append(owned[tx.Nonce()], tx) |
|
|
|
|
queued := make(map[common.Address]TxList) |
|
|
|
|
for addr, txs := range pool.queue { |
|
|
|
|
copy := make(TxList) |
|
|
|
|
for nonce, tx := range txs { |
|
|
|
|
copy[nonce] = tx |
|
|
|
|
} |
|
|
|
|
queued[account] = owned |
|
|
|
|
queued[addr] = copy |
|
|
|
|
} |
|
|
|
|
return pending, queued |
|
|
|
|
} |
|
|
|
@ -280,7 +287,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { |
|
|
|
|
func (self *TxPool) add(tx *types.Transaction) error { |
|
|
|
|
hash := tx.Hash() |
|
|
|
|
|
|
|
|
|
if self.pending[hash] != nil { |
|
|
|
|
if self.all[hash] != nil { |
|
|
|
|
return fmt.Errorf("Known transaction (%x)", hash[:4]) |
|
|
|
|
} |
|
|
|
|
err := self.validateTx(tx) |
|
|
|
@ -306,33 +313,63 @@ func (self *TxPool) add(tx *types.Transaction) error { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// queueTx will queue an unknown transaction
|
|
|
|
|
// queueTx will queue an unknown transaction.
|
|
|
|
|
func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { |
|
|
|
|
from, _ := tx.From() // already validated
|
|
|
|
|
if self.queue[from] == nil { |
|
|
|
|
self.queue[from] = make(map[common.Hash]*types.Transaction) |
|
|
|
|
addr, _ := tx.From() // already validated
|
|
|
|
|
if self.queue[addr] == nil { |
|
|
|
|
self.queue[addr] = make(TxList) |
|
|
|
|
} |
|
|
|
|
self.queue[from][hash] = tx |
|
|
|
|
// If the nonce is already used, discard the lower priced transaction
|
|
|
|
|
nonce := tx.Nonce() |
|
|
|
|
|
|
|
|
|
if old, ok := self.queue[addr][nonce]; ok { |
|
|
|
|
if old.GasPrice().Cmp(tx.GasPrice()) >= 0 { |
|
|
|
|
return // Old was better, discard this
|
|
|
|
|
} |
|
|
|
|
delete(self.all, old.Hash()) // New is better, drop and overwrite old one
|
|
|
|
|
} |
|
|
|
|
self.queue[addr][nonce] = tx |
|
|
|
|
self.all[hash] = tx |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// addTx will add a transaction to the pending (processable queue) list of transactions
|
|
|
|
|
func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) { |
|
|
|
|
// init delayed since tx pool could have been started before any state sync
|
|
|
|
|
// addTx will moves a transaction from the non-executable queue to the pending
|
|
|
|
|
// (processable) list of transactions.
|
|
|
|
|
func (pool *TxPool) addTx(addr common.Address, tx *types.Transaction) { |
|
|
|
|
// Init delayed since tx pool could have been started before any state sync
|
|
|
|
|
if pool.pendingState == nil { |
|
|
|
|
pool.resetState() |
|
|
|
|
} |
|
|
|
|
// If the nonce is already used, discard the lower priced transaction
|
|
|
|
|
hash, nonce := tx.Hash(), tx.Nonce() |
|
|
|
|
|
|
|
|
|
if _, ok := pool.pending[hash]; !ok { |
|
|
|
|
pool.pending[hash] = tx |
|
|
|
|
if old, ok := pool.pending[addr][nonce]; ok { |
|
|
|
|
oldHash := old.Hash() |
|
|
|
|
|
|
|
|
|
switch { |
|
|
|
|
case oldHash == hash: // Nothing changed, noop
|
|
|
|
|
return |
|
|
|
|
case old.GasPrice().Cmp(tx.GasPrice()) >= 0: // Old was better, discard this
|
|
|
|
|
delete(pool.all, hash) |
|
|
|
|
return |
|
|
|
|
default: // New is better, discard old
|
|
|
|
|
delete(pool.all, oldHash) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// The transaction is being kept, insert it into the tx pool
|
|
|
|
|
if _, ok := pool.pending[addr]; !ok { |
|
|
|
|
pool.pending[addr] = make(TxList) |
|
|
|
|
} |
|
|
|
|
pool.pending[addr][nonce] = tx |
|
|
|
|
pool.all[hash] = tx |
|
|
|
|
|
|
|
|
|
// Increment the nonce on the pending state. This can only happen if
|
|
|
|
|
// the nonce is +1 to the previous one.
|
|
|
|
|
pool.pendingState.SetNonce(addr, tx.Nonce()+1) |
|
|
|
|
pool.pendingState.SetNonce(addr, nonce+1) |
|
|
|
|
|
|
|
|
|
// Notify the subscribers. This event is posted in a goroutine
|
|
|
|
|
// because it's possible that somewhere during the post "Remove transaction"
|
|
|
|
|
// gets called which will then wait for the global tx pool lock and deadlock.
|
|
|
|
|
go pool.eventMux.Post(TxPreEvent{tx}) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Add queues a single transaction in the pool if it is valid.
|
|
|
|
@ -371,58 +408,39 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction { |
|
|
|
|
tp.mu.RLock() |
|
|
|
|
defer tp.mu.RUnlock() |
|
|
|
|
|
|
|
|
|
// check the txs first
|
|
|
|
|
if tx, ok := tp.pending[hash]; ok { |
|
|
|
|
return tx |
|
|
|
|
} |
|
|
|
|
// check queue
|
|
|
|
|
for _, txs := range tp.queue { |
|
|
|
|
if tx, ok := txs[hash]; ok { |
|
|
|
|
return tx |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
return tp.all[hash] |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetTransactions returns all currently processable transactions.
|
|
|
|
|
// The returned slice may be modified by the caller.
|
|
|
|
|
func (self *TxPool) GetTransactions() (txs types.Transactions) { |
|
|
|
|
func (self *TxPool) GetTransactions() types.Transactions { |
|
|
|
|
self.mu.Lock() |
|
|
|
|
defer self.mu.Unlock() |
|
|
|
|
|
|
|
|
|
// check queue first
|
|
|
|
|
self.checkQueue() |
|
|
|
|
|
|
|
|
|
// invalidate any txs
|
|
|
|
|
self.validatePool() |
|
|
|
|
|
|
|
|
|
txs = make(types.Transactions, len(self.pending)) |
|
|
|
|
i := 0 |
|
|
|
|
for _, tx := range self.pending { |
|
|
|
|
txs[i] = tx |
|
|
|
|
i++ |
|
|
|
|
count := 0 |
|
|
|
|
for _, txs := range self.pending { |
|
|
|
|
count += len(txs) |
|
|
|
|
} |
|
|
|
|
return txs |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetQueuedTransactions returns all non-processable transactions.
|
|
|
|
|
func (self *TxPool) GetQueuedTransactions() types.Transactions { |
|
|
|
|
self.mu.RLock() |
|
|
|
|
defer self.mu.RUnlock() |
|
|
|
|
|
|
|
|
|
var ret types.Transactions |
|
|
|
|
for _, txs := range self.queue { |
|
|
|
|
pending := make(types.Transactions, 0, count) |
|
|
|
|
for _, txs := range self.pending { |
|
|
|
|
for _, tx := range txs { |
|
|
|
|
ret = append(ret, tx) |
|
|
|
|
pending = append(pending, tx) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
sort.Sort(types.TxByNonce(ret)) |
|
|
|
|
return ret |
|
|
|
|
return pending |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// RemoveTransactions removes all given transactions from the pool.
|
|
|
|
|
func (self *TxPool) RemoveTransactions(txs types.Transactions) { |
|
|
|
|
self.mu.Lock() |
|
|
|
|
defer self.mu.Unlock() |
|
|
|
|
|
|
|
|
|
for _, tx := range txs { |
|
|
|
|
self.removeTx(tx.Hash()) |
|
|
|
|
} |
|
|
|
@ -432,29 +450,35 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) { |
|
|
|
|
func (pool *TxPool) RemoveTx(hash common.Hash) { |
|
|
|
|
pool.mu.Lock() |
|
|
|
|
defer pool.mu.Unlock() |
|
|
|
|
|
|
|
|
|
pool.removeTx(hash) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pool *TxPool) removeTx(hash common.Hash) { |
|
|
|
|
// delete from pending pool
|
|
|
|
|
delete(pool.pending, hash) |
|
|
|
|
// delete from queue
|
|
|
|
|
for address, txs := range pool.queue { |
|
|
|
|
if _, ok := txs[hash]; ok { |
|
|
|
|
if len(txs) == 1 { |
|
|
|
|
// if only one tx, remove entire address entry.
|
|
|
|
|
delete(pool.queue, address) |
|
|
|
|
} else { |
|
|
|
|
delete(txs, hash) |
|
|
|
|
// Fetch the transaction we wish to delete
|
|
|
|
|
tx, ok := pool.all[hash] |
|
|
|
|
if !ok { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
break |
|
|
|
|
addr, _ := tx.From() |
|
|
|
|
|
|
|
|
|
// Remove it from all internal lists
|
|
|
|
|
delete(pool.all, hash) |
|
|
|
|
|
|
|
|
|
delete(pool.pending[addr], tx.Nonce()) |
|
|
|
|
if len(pool.pending[addr]) == 0 { |
|
|
|
|
delete(pool.pending, addr) |
|
|
|
|
} |
|
|
|
|
delete(pool.queue[addr], tx.Nonce()) |
|
|
|
|
if len(pool.queue[addr]) == 0 { |
|
|
|
|
delete(pool.queue, addr) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// checkQueue moves transactions that have become processable to main pool.
|
|
|
|
|
// checkQueue moves transactions that have become processable from the pool's
|
|
|
|
|
// queue to the set of pending transactions.
|
|
|
|
|
func (pool *TxPool) checkQueue() { |
|
|
|
|
// init delayed since tx pool could have been started before any state sync
|
|
|
|
|
// Init delayed since tx pool could have been started before any state sync
|
|
|
|
|
if pool.pendingState == nil { |
|
|
|
|
pool.resetState() |
|
|
|
|
} |
|
|
|
@ -473,17 +497,19 @@ func (pool *TxPool) checkQueue() { |
|
|
|
|
trueNonce = currentState.GetNonce(address) // nonce known by the last state
|
|
|
|
|
) |
|
|
|
|
promote = promote[:0] |
|
|
|
|
for hash, tx := range txs { |
|
|
|
|
for nonce, tx := range txs { |
|
|
|
|
// Drop processed or out of fund transactions
|
|
|
|
|
if tx.Nonce() < trueNonce || balance.Cmp(tx.Cost()) < 0 { |
|
|
|
|
if nonce < trueNonce || balance.Cmp(tx.Cost()) < 0 { |
|
|
|
|
if glog.V(logger.Core) { |
|
|
|
|
glog.Infof("removed tx (%v) from pool queue: low tx nonce or out of funds\n", tx) |
|
|
|
|
} |
|
|
|
|
delete(txs, hash) |
|
|
|
|
delete(txs, nonce) |
|
|
|
|
delete(pool.all, tx.Hash()) |
|
|
|
|
|
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
// Collect the remaining transactions for the next pass.
|
|
|
|
|
promote = append(promote, txQueueEntry{hash, address, tx}) |
|
|
|
|
promote = append(promote, txQueueEntry{address, tx}) |
|
|
|
|
} |
|
|
|
|
// Find the next consecutive nonce range starting at the current account nonce,
|
|
|
|
|
// pushing the guessed nonce forward if we add consecutive transactions.
|
|
|
|
@ -493,17 +519,18 @@ func (pool *TxPool) checkQueue() { |
|
|
|
|
if entry.Nonce() > guessedNonce { |
|
|
|
|
if len(promote)-i > maxQueued { |
|
|
|
|
if glog.V(logger.Debug) { |
|
|
|
|
glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(entry.hash[:])) |
|
|
|
|
glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(entry.Hash().Bytes())) |
|
|
|
|
} |
|
|
|
|
for _, drop := range promote[i+maxQueued:] { |
|
|
|
|
delete(txs, drop.hash) |
|
|
|
|
delete(txs, drop.Nonce()) |
|
|
|
|
delete(pool.all, drop.Hash()) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
// Otherwise promote the transaction and move the guess nonce if needed
|
|
|
|
|
pool.addTx(entry.hash, address, entry.Transaction) |
|
|
|
|
delete(txs, entry.hash) |
|
|
|
|
pool.addTx(address, entry.Transaction) |
|
|
|
|
delete(txs, entry.Nonce()) |
|
|
|
|
|
|
|
|
|
if entry.Nonce() == guessedNonce { |
|
|
|
|
guessedNonce++ |
|
|
|
@ -532,40 +559,48 @@ func (pool *TxPool) validatePool() { |
|
|
|
|
// Clean up the pending pool, accumulating invalid nonces
|
|
|
|
|
gaps := make(map[common.Address]uint64) |
|
|
|
|
|
|
|
|
|
for hash, tx := range pool.pending { |
|
|
|
|
sender, _ := tx.From() // err already checked
|
|
|
|
|
|
|
|
|
|
for addr, txs := range pool.pending { |
|
|
|
|
for nonce, tx := range txs { |
|
|
|
|
// Perform light nonce and balance validation
|
|
|
|
|
balance := balanceCache[sender] |
|
|
|
|
balance := balanceCache[addr] |
|
|
|
|
if balance == nil { |
|
|
|
|
balance = state.GetBalance(sender) |
|
|
|
|
balanceCache[sender] = balance |
|
|
|
|
balance = state.GetBalance(addr) |
|
|
|
|
balanceCache[addr] = balance |
|
|
|
|
} |
|
|
|
|
if past := state.GetNonce(sender) > tx.Nonce(); past || balance.Cmp(tx.Cost()) < 0 { |
|
|
|
|
if past := state.GetNonce(addr) > nonce; past || balance.Cmp(tx.Cost()) < 0 { |
|
|
|
|
// Remove an already past it invalidated transaction
|
|
|
|
|
if glog.V(logger.Core) { |
|
|
|
|
glog.Infof("removed tx (%v) from pool: low tx nonce or out of funds\n", tx) |
|
|
|
|
} |
|
|
|
|
delete(pool.pending, hash) |
|
|
|
|
delete(pool.pending[addr], nonce) |
|
|
|
|
if len(pool.pending[addr]) == 0 { |
|
|
|
|
delete(pool.pending, addr) |
|
|
|
|
} |
|
|
|
|
delete(pool.all, tx.Hash()) |
|
|
|
|
|
|
|
|
|
// Track the smallest invalid nonce to postpone subsequent transactions
|
|
|
|
|
if !past { |
|
|
|
|
if prev, ok := gaps[sender]; !ok || tx.Nonce() < prev { |
|
|
|
|
gaps[sender] = tx.Nonce() |
|
|
|
|
if prev, ok := gaps[addr]; !ok || nonce < prev { |
|
|
|
|
gaps[addr] = nonce |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Move all transactions after a gap back to the future queue
|
|
|
|
|
if len(gaps) > 0 { |
|
|
|
|
for hash, tx := range pool.pending { |
|
|
|
|
sender, _ := tx.From() |
|
|
|
|
if gap, ok := gaps[sender]; ok && tx.Nonce() >= gap { |
|
|
|
|
for addr, txs := range pool.pending { |
|
|
|
|
for nonce, tx := range txs { |
|
|
|
|
if gap, ok := gaps[addr]; ok && nonce >= gap { |
|
|
|
|
if glog.V(logger.Core) { |
|
|
|
|
glog.Infof("postponed tx (%v) due to introduced gap\n", tx) |
|
|
|
|
} |
|
|
|
|
pool.queueTx(hash, tx) |
|
|
|
|
delete(pool.pending, hash) |
|
|
|
|
delete(pool.pending[addr], nonce) |
|
|
|
|
if len(pool.pending[addr]) == 0 { |
|
|
|
|
delete(pool.pending, addr) |
|
|
|
|
} |
|
|
|
|
pool.queueTx(tx.Hash(), tx) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -574,7 +609,6 @@ func (pool *TxPool) validatePool() { |
|
|
|
|
type txQueue []txQueueEntry |
|
|
|
|
|
|
|
|
|
type txQueueEntry struct { |
|
|
|
|
hash common.Hash |
|
|
|
|
addr common.Address |
|
|
|
|
*types.Transaction |
|
|
|
|
} |
|
|
|
|