diff --git a/core/types/transaction.go b/core/types/transaction.go index 642b125a30..b826a8900a 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -368,41 +368,58 @@ func (s *TxByPrice) Pop() interface{} { return x } -// SortByPriceAndNonce sorts the transactions by price in such a way that the -// nonce orderings within a single account are maintained. -// -// Note, this is not as trivial as it seems from the first look as there are three -// different criteria that need to be taken into account (price, nonce, account -// match), which cannot be done with any plain sorting method, as certain items -// cannot be compared without context. +// TransactionsByPriceAndNonce represents a set of transactions that can return +// transactions in a profit-maximising sorted order, while supporting removing +// entire batches of transactions for non-executable accounts. +type TransactionsByPriceAndNonce struct { + txs map[common.Address]Transactions // Per account nonce-sorted list of transactions + heads TxByPrice // Next transaction for each unique account (price heap) +} + +// NewTransactionsByPriceAndNonce creates a transaction set that can retrieve +// price sorted transactions in a nonce-honouring way. // -// This method first sorts the separates the list of transactions into individual -// sender accounts and sorts them by nonce. After the account nonce ordering is -// satisfied, the results are merged back together by price, always comparing only -// the head transaction from each account. This is done via a heap to keep it fast. -func SortByPriceAndNonce(txs map[common.Address]Transactions) Transactions { +// Note, the input map is reowned so the caller should not interact any more with +// if after providng it to the constructor. +func NewTransactionsByPriceAndNonce(txs map[common.Address]Transactions) *TransactionsByPriceAndNonce { // Initialize a price based heap with the head transactions - byPrice := make(TxByPrice, 0, len(txs)) + heads := make(TxByPrice, 0, len(txs)) for acc, accTxs := range txs { - byPrice = append(byPrice, accTxs[0]) + heads = append(heads, accTxs[0]) txs[acc] = accTxs[1:] } - heap.Init(&byPrice) - - // Merge by replacing the best with the next from the same account - var sorted Transactions - for len(byPrice) > 0 { - // Retrieve the next best transaction by price - best := heap.Pop(&byPrice).(*Transaction) - - // Push in its place the next transaction from the same account - acc, _ := best.From() // we only sort valid txs so this cannot fail - if accTxs, ok := txs[acc]; ok && len(accTxs) > 0 { - heap.Push(&byPrice, accTxs[0]) - txs[acc] = accTxs[1:] - } - // Accumulate the best priced transaction - sorted = append(sorted, best) + heap.Init(&heads) + + // Assemble and return the transaction set + return &TransactionsByPriceAndNonce{ + txs: txs, + heads: heads, + } +} + +// Peek returns the next transaction by price. +func (t *TransactionsByPriceAndNonce) Peek() *Transaction { + if len(t.heads) == 0 { + return nil + } + return t.heads[0] +} + +// Shift replaces the current best head with the next one from the same account. +func (t *TransactionsByPriceAndNonce) Shift() { + acc, _ := t.heads[0].From() // we only sort valid txs so this cannot fail + + if txs, ok := t.txs[acc]; ok && len(txs) > 0 { + t.heads[0], t.txs[acc] = txs[0], txs[1:] + heap.Fix(&t.heads, 0) + } else { + heap.Pop(&t.heads) } - return sorted +} + +// Pop removes the best transaction, *not* replacing it with the next one from +// the same account. This should be used when a transaction cannot be executed +// and hence all subsequent ones should be discarded from the same account. +func (t *TransactionsByPriceAndNonce) Pop() { + heap.Pop(&t.heads) } diff --git a/core/types/transaction_test.go b/core/types/transaction_test.go index c6e6e37901..8b0b02c3ea 100644 --- a/core/types/transaction_test.go +++ b/core/types/transaction_test.go @@ -137,7 +137,16 @@ func TestTransactionPriceNonceSort(t *testing.T) { } } // Sort the transactions and cross check the nonce ordering - txs := SortByPriceAndNonce(groups) + txset := NewTransactionsByPriceAndNonce(groups) + + txs := Transactions{} + for { + if tx := txset.Peek(); tx != nil { + txs = append(txs, tx) + txset.Shift() + } + break + } for i, txi := range txs { fromi, _ := txi.From() diff --git a/miner/worker.go b/miner/worker.go index 32f98bb67f..4d8878fabd 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -63,18 +63,16 @@ type uint64RingBuffer struct { // environment is the workers current environment and holds // all of the current state information type Work struct { - config *core.ChainConfig - state *state.StateDB // apply state changes here - ancestors *set.Set // ancestor set (used for checking uncle parent validity) - family *set.Set // family set (used for checking uncle invalidity) - uncles *set.Set // uncle set - tcount int // tx count in cycle - ignoredTransactors *set.Set - lowGasTransactors *set.Set - ownedAccounts *set.Set - lowGasTxs types.Transactions - failedTxs types.Transactions - localMinedBlocks *uint64RingBuffer // the most recent block numbers that were mined locally (used to check block inclusion) + config *core.ChainConfig + state *state.StateDB // apply state changes here + ancestors *set.Set // ancestor set (used for checking uncle parent validity) + family *set.Set // family set (used for checking uncle invalidity) + uncles *set.Set // uncle set + tcount int // tx count in cycle + ownedAccounts *set.Set + lowGasTxs types.Transactions + failedTxs types.Transactions + localMinedBlocks *uint64RingBuffer // the most recent block numbers that were mined locally (used to check block inclusion) Block *types.Block // the new block @@ -236,7 +234,12 @@ func (self *worker) update() { // Apply transaction to the pending state if we're not mining if atomic.LoadInt32(&self.mining) == 0 { self.currentMu.Lock() - self.current.commitTransactions(self.mux, types.Transactions{ev.Tx}, self.gasPrice, self.chain) + + acc, _ := ev.Tx.From() + txs := map[common.Address]types.Transactions{acc: types.Transactions{ev.Tx}} + txset := types.NewTransactionsByPriceAndNonce(txs) + + self.current.commitTransactions(self.mux, txset, self.gasPrice, self.chain) self.currentMu.Unlock() } } @@ -384,8 +387,6 @@ func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error // Keep track of transactions which return errors so they can be removed work.tcount = 0 - work.ignoredTransactors = set.New() - work.lowGasTransactors = set.New() work.ownedAccounts = accountAddressesSet(accounts) if self.current != nil { work.localMinedBlocks = self.current.localMinedBlocks @@ -494,43 +495,8 @@ func (self *worker) commitNewWork() { if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 { core.ApplyDAOHardFork(work.state) } - - /* //approach 1 - transactions := self.eth.TxPool().GetTransactions() - sort.Sort(types.TxByNonce(transactions)) - */ - - //approach 2 - transactions := types.SortByPriceAndNonce(self.eth.TxPool().Pending()) - - /* // approach 3 - // commit transactions for this run. - txPerOwner := make(map[common.Address]types.Transactions) - // Sort transactions by owner - for _, tx := range self.eth.TxPool().GetTransactions() { - from, _ := tx.From() // we can ignore the sender error - txPerOwner[from] = append(txPerOwner[from], tx) - } - var ( - singleTxOwner types.Transactions - multiTxOwner types.Transactions - ) - // Categorise transactions by - // 1. 1 owner tx per block - // 2. multi txs owner per block - for _, txs := range txPerOwner { - if len(txs) == 1 { - singleTxOwner = append(singleTxOwner, txs[0]) - } else { - multiTxOwner = append(multiTxOwner, txs...) - } - } - sort.Sort(types.TxByPrice(singleTxOwner)) - sort.Sort(types.TxByNonce(multiTxOwner)) - transactions := append(singleTxOwner, multiTxOwner...) - */ - - work.commitTransactions(self.mux, transactions, self.gasPrice, self.chain) + txs := types.NewTransactionsByPriceAndNonce(self.eth.TxPool().Pending()) + work.commitTransactions(self.mux, txs, self.gasPrice, self.chain) self.eth.TxPool().RemoveBatch(work.lowGasTxs) self.eth.TxPool().RemoveBatch(work.failedTxs) @@ -591,64 +557,51 @@ func (self *worker) commitUncle(work *Work, uncle *types.Header) error { return nil } -func (env *Work) commitTransactions(mux *event.TypeMux, transactions types.Transactions, gasPrice *big.Int, bc *core.BlockChain) { +func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, gasPrice *big.Int, bc *core.BlockChain) { gp := new(core.GasPool).AddGas(env.header.GasLimit) var coalescedLogs vm.Logs - for _, tx := range transactions { + for { + // Retrieve the next transaction and abort if all done + tx := txs.Peek() + if tx == nil { + break + } // Error may be ignored here. The error has already been checked // during transaction acceptance is the transaction pool. from, _ := tx.From() - // Check if it falls within margin. Txs from owned accounts are always processed. + // Ignore any transactions (and accounts subsequently) with low gas limits if tx.GasPrice().Cmp(gasPrice) < 0 && !env.ownedAccounts.Has(from) { - // ignore the transaction and transactor. We ignore the transactor - // because nonce will fail after ignoring this transaction so there's - // no point - env.lowGasTransactors.Add(from) - - glog.V(logger.Info).Infof("transaction(%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(gasPrice), from[:4]) - } + // Pop the current low-priced transaction without shifting in the next from the account + glog.V(logger.Info).Infof("Transaction (%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(gasPrice), from[:4]) - // Continue with the next transaction if the transaction sender is included in - // the low gas tx set. This will also remove the tx and all sequential transaction - // from this transactor - if env.lowGasTransactors.Has(from) { - // add tx to the low gas set. This will be removed at the end of the run - // owned accounts are ignored - if !env.ownedAccounts.Has(from) { - env.lowGasTxs = append(env.lowGasTxs, tx) - } - continue - } + env.lowGasTxs = append(env.lowGasTxs, tx) + txs.Pop() - // Move on to the next transaction when the transactor is in ignored transactions set - // This may occur when a transaction hits the gas limit. When a gas limit is hit and - // the transaction is processed (that could potentially be included in the block) it - // will throw a nonce error because the previous transaction hasn't been processed. - // Therefor we need to ignore any transaction after the ignored one. - if env.ignoredTransactors.Has(from) { continue } - + // Start executing the transaction env.state.StartRecord(tx.Hash(), common.Hash{}, 0) err, logs := env.commitTransaction(tx, bc, gp) switch { case core.IsGasLimitErr(err): - // ignore the transactor so no nonce errors will be thrown for this account - // next time the worker is run, they'll be picked up again. - env.ignoredTransactors.Add(from) + // Pop the current out-of-gas transaction without shifting in the next from the account glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4]) + txs.Pop() case err != nil: + // Pop the current failed transaction without shifting in the next from the account + glog.V(logger.Detail).Infof("Transaction (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err) env.failedTxs = append(env.failedTxs, tx) - if glog.V(logger.Detail) { - glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err) - } + txs.Pop() + default: - env.tcount++ + // Everything ok, collect the logs and shift in the next transaction from the same account coalescedLogs = append(coalescedLogs, logs...) + env.tcount++ + txs.Shift() } } if len(coalescedLogs) > 0 || env.tcount > 0 {