diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 4296c79f65..d8debe1c0e 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -61,7 +61,7 @@ type TxPool struct { txs map[common.Hash]*types.Transaction invalidHashes *set.Set - queue map[common.Address]types.Transactions + queue map[common.Address]map[common.Hash]*types.Transaction subscribers []chan TxMsg @@ -71,7 +71,7 @@ type TxPool struct { func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { txPool := &TxPool{ txs: make(map[common.Hash]*types.Transaction), - queue: make(map[common.Address]types.Transactions), + queue: make(map[common.Address]map[common.Hash]*types.Transaction), queueChan: make(chan *types.Transaction, txPoolQueueSize), quit: make(chan bool), eventMux: eventMux, @@ -157,22 +157,20 @@ func (self *TxPool) add(tx *types.Transaction) error { if err != nil { return err } - - self.queueTx(tx) - - var toname string - if to := tx.To(); to != nil { - toname = common.Bytes2Hex(to[:4]) - } else { - toname = "[NEW_CONTRACT]" - } - // we can ignore the error here because From is - // verified in ValidateTransaction. - f, _ := tx.From() - from := common.Bytes2Hex(f[:4]) + self.queueTx(hash, tx) if glog.V(logger.Debug) { - glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash()) + var toname string + if to := tx.To(); to != nil { + toname = common.Bytes2Hex(to[:4]) + } else { + toname = "[NEW_CONTRACT]" + } + // we can ignore the error here because From is + // verified in ValidateTransaction. + f, _ := tx.From() + from := common.Bytes2Hex(f[:4]) + glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash) } return nil @@ -211,16 +209,12 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction { if tx, ok := tp.txs[hash]; ok { return tx } - // check queue for _, txs := range tp.queue { - for _, tx := range txs { - if tx.Hash() == hash { - return tx - } + if tx, ok := txs[hash]; ok { + return tx } } - return nil } @@ -234,26 +228,26 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) { txs[i] = tx i++ } - - return + return txs } func (self *TxPool) GetQueuedTransactions() types.Transactions { self.mu.RLock() defer self.mu.RUnlock() - var txs types.Transactions - for _, ts := range self.queue { - txs = append(txs, ts...) + var ret types.Transactions + for _, txs := range self.queue { + for _, tx := range txs { + ret = append(ret, tx) + } } - - return txs + sort.Sort(types.TxByNonce{ret}) + return ret } func (self *TxPool) RemoveTransactions(txs types.Transactions) { self.mu.Lock() defer self.mu.Unlock() - for _, tx := range txs { self.removeTx(tx.Hash()) } @@ -270,14 +264,17 @@ func (pool *TxPool) Stop() { glog.V(logger.Info).Infoln("TX Pool stopped") } -func (self *TxPool) queueTx(tx *types.Transaction) { +func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { from, _ := tx.From() // already validated - self.queue[from] = append(self.queue[from], tx) + if self.queue[from] == nil { + self.queue[from] = make(map[common.Hash]*types.Transaction) + } + self.queue[from][hash] = tx } -func (pool *TxPool) addTx(tx *types.Transaction) { - if _, ok := pool.txs[tx.Hash()]; !ok { - pool.txs[tx.Hash()] = tx +func (pool *TxPool) addTx(hash common.Hash, tx *types.Transaction) { + if _, ok := pool.txs[hash]; !ok { + pool.txs[hash] = tx // Notify the subscribers. This event is posted in a goroutine // because it's possible that somewhere during the post "Remove transaction" // gets called which will then wait for the global tx pool lock and deadlock. @@ -291,36 +288,33 @@ func (pool *TxPool) checkQueue() { defer pool.mu.Unlock() statedb := pool.currentState() + var addq txQueue for address, txs := range pool.queue { - sort.Sort(types.TxByNonce{txs}) - - var ( - nonce = statedb.GetNonce(address) - start int - ) - // Clean up the transactions first and determine the start of the nonces - for _, tx := range txs { - if tx.Nonce() >= nonce { - break + curnonce := statedb.GetNonce(address) + addq := addq[:0] + for hash, tx := range txs { + if tx.AccountNonce < curnonce { + // Drop queued transactions whose nonce is lower than + // the account nonce because they have been processed. + delete(txs, hash) + } else { + // Collect the remaining transactions for the next pass. + addq = append(addq, txQueueEntry{hash, tx}) } - start++ } - pool.queue[address] = txs[start:] - - // expected nonce - enonce := nonce - for _, tx := range pool.queue[address] { - // If the expected nonce does not match up with the next one - // (i.e. a nonce gap), we stop the loop - if enonce != tx.Nonce() { + // Find the next consecutive nonce range starting at the + // current account nonce. + sort.Sort(addq) + for _, e := range addq { + if e.AccountNonce != curnonce { break } - enonce++ - - pool.addTx(tx) + curnonce++ + delete(txs, e.hash) + pool.addTx(e.hash, e.Transaction) } - // delete the entire queue entry if it's empty. There's no need to keep it - if len(pool.queue[address]) == 0 { + // Delete the entire queue entry if it became empty. + if len(txs) == 0 { delete(pool.queue, address) } } @@ -329,20 +323,16 @@ func (pool *TxPool) checkQueue() { func (pool *TxPool) removeTx(hash common.Hash) { // delete from pending pool delete(pool.txs, hash) - // delete from queue -out: for address, txs := range pool.queue { - for i, tx := range txs { - if tx.Hash() == hash { - if len(txs) == 1 { - // if only one tx, remove entire address entry - delete(pool.queue, address) - } else { - pool.queue[address][len(txs)-1], pool.queue[address] = nil, append(txs[:i], txs[i+1:]...) - } - break out + if _, ok := txs[hash]; ok { + if len(txs) == 1 { + // if only one tx, remove entire address entry. + delete(pool.queue, address) + } else { + delete(txs, hash) } + break } } } @@ -356,8 +346,18 @@ func (pool *TxPool) validatePool() { if glog.V(logger.Info) { glog.Infof("removed tx (%x) from pool: %v\n", hash[:4], err) } - - pool.removeTx(hash) + delete(pool.txs, hash) } } } + +type txQueue []txQueueEntry + +type txQueueEntry struct { + hash common.Hash + *types.Transaction +} + +func (q txQueue) Len() int { return len(q) } +func (q txQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] } +func (q txQueue) Less(i, j int) bool { return q[i].AccountNonce < q[j].AccountNonce } diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go index d6ea4a2a9b..600fd9b4fc 100644 --- a/core/transaction_pool_test.go +++ b/core/transaction_pool_test.go @@ -68,7 +68,7 @@ func TestTransactionQueue(t *testing.T) { tx.SignECDSA(key) from, _ := tx.From() pool.currentState().AddBalance(from, big.NewInt(1)) - pool.queueTx(tx) + pool.queueTx(tx.Hash(), tx) pool.checkQueue() if len(pool.txs) != 1 { @@ -80,7 +80,7 @@ func TestTransactionQueue(t *testing.T) { from, _ = tx.From() pool.currentState().SetNonce(from, 10) tx.SetNonce(1) - pool.queueTx(tx) + pool.queueTx(tx.Hash(), tx) pool.checkQueue() if _, ok := pool.txs[tx.Hash()]; ok { t.Error("expected transaction to be in tx pool") @@ -97,18 +97,18 @@ func TestTransactionQueue(t *testing.T) { tx1.SignECDSA(key) tx2.SignECDSA(key) tx3.SignECDSA(key) - pool.queueTx(tx1) - pool.queueTx(tx2) - pool.queueTx(tx3) + pool.queueTx(tx1.Hash(), tx1) + pool.queueTx(tx2.Hash(), tx2) + pool.queueTx(tx3.Hash(), tx3) from, _ = tx1.From() + pool.checkQueue() if len(pool.txs) != 1 { t.Error("expected tx pool to be 1 =") } - - if len(pool.queue[from]) != 3 { - t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) + if len(pool.queue[from]) != 2 { + t.Error("expected len(queue) == 2, got", len(pool.queue[from])) } } @@ -118,8 +118,8 @@ func TestRemoveTx(t *testing.T) { tx.SignECDSA(key) from, _ := tx.From() pool.currentState().AddBalance(from, big.NewInt(1)) - pool.queueTx(tx) - pool.addTx(tx) + pool.queueTx(tx.Hash(), tx) + pool.addTx(tx.Hash(), tx) if len(pool.queue) != 1 { t.Error("expected queue to be 1, got", len(pool.queue)) }