core: reduce CPU load by reducing calls to checkQueue

* Reduced maxQueue count
* Added proper deletion past maxQueue limit
* Added cheap stats method to txpool

queueCheck was called for **every** transaction instead of:
1. add all txs
2. check queue

previously

1. add txs[i]
2. check queue
3. if i < len(txs) goto 1.
release/0.9.36
Jeffrey Wilcke 10 years ago
parent 9226369b5d
commit 61ca780f3b
  1. 75
      core/transaction_pool.go
  2. 2
      core/transaction_pool_test.go
  3. 5
      rpc/api/txpool.go

@ -29,7 +29,7 @@ var (
) )
const ( const (
maxQueued = 200 // max limit of queued txs per address maxQueued = 64 // max limit of queued txs per address
) )
type stateFn func() *state.StateDB type stateFn func() *state.StateDB
@ -129,6 +129,17 @@ func (pool *TxPool) State() *state.ManagedState {
return pool.pendingState return pool.pendingState
} }
func (pool *TxPool) Stats() (pending int, queued int) {
pool.mu.RLock()
defer pool.mu.RUnlock()
pending = len(pool.pending)
for _, txs := range pool.queue {
queued += len(txs)
}
return
}
// validateTx checks whether a transaction is valid according // validateTx checks whether a transaction is valid according
// to the consensus rules. // to the consensus rules.
func (pool *TxPool) validateTx(tx *types.Transaction) error { func (pool *TxPool) validateTx(tx *types.Transaction) error {
@ -214,9 +225,6 @@ func (self *TxPool) add(tx *types.Transaction) error {
glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash) glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
} }
// check and validate the queueue
self.checkQueue()
return nil return nil
} }
@ -245,11 +253,17 @@ func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Trans
} }
// Add queues a single transaction in the pool if it is valid. // Add queues a single transaction in the pool if it is valid.
func (self *TxPool) Add(tx *types.Transaction) error { func (self *TxPool) Add(tx *types.Transaction) (err error) {
self.mu.Lock() self.mu.Lock()
defer self.mu.Unlock() defer self.mu.Unlock()
return self.add(tx) err = self.add(tx)
if err == nil {
// check and validate the queueue
self.checkQueue()
}
return
} }
// AddTransactions attempts to queue all valid transactions in txs. // AddTransactions attempts to queue all valid transactions in txs.
@ -265,6 +279,9 @@ func (self *TxPool) AddTransactions(txs []*types.Transaction) {
glog.V(logger.Debug).Infof("tx %x\n", h[:4]) glog.V(logger.Debug).Infof("tx %x\n", h[:4])
} }
} }
// check and validate the queueue
self.checkQueue()
} }
// GetTransaction returns a transaction if it is contained in the pool // GetTransaction returns a transaction if it is contained in the pool
@ -327,6 +344,23 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) {
} }
} }
func (pool *TxPool) removeTx(hash common.Hash) {
// delete from pending pool
delete(pool.pending, hash)
// delete from queue
for address, txs := range pool.queue {
if _, ok := txs[hash]; ok {
if len(txs) == 1 {
// if only one tx, remove entire address entry.
delete(pool.queue, address)
} else {
delete(txs, hash)
}
break
}
}
}
// checkQueue moves transactions that have become processable to main pool. // checkQueue moves transactions that have become processable to main pool.
func (pool *TxPool) checkQueue() { func (pool *TxPool) checkQueue() {
state := pool.pendingState state := pool.pendingState
@ -354,13 +388,19 @@ func (pool *TxPool) checkQueue() {
for i, e := range addq { for i, e := range addq {
// start deleting the transactions from the queue if they exceed the limit // start deleting the transactions from the queue if they exceed the limit
if i > maxQueued { if i > maxQueued {
if glog.V(logger.Debug) {
glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(e.hash[:]))
}
delete(pool.queue[address], e.hash) delete(pool.queue[address], e.hash)
continue continue
} }
if e.Nonce() > guessedNonce { if e.Nonce() > guessedNonce {
if len(addq)-i > maxQueued {
if glog.V(logger.Debug) {
glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(e.hash[:]))
}
for j := i + maxQueued; j < len(addq); j++ {
delete(txs, addq[j].hash)
}
}
break break
} }
delete(txs, e.hash) delete(txs, e.hash)
@ -373,23 +413,6 @@ func (pool *TxPool) checkQueue() {
} }
} }
func (pool *TxPool) removeTx(hash common.Hash) {
// delete from pending pool
delete(pool.pending, hash)
// delete from queue
for address, txs := range pool.queue {
if _, ok := txs[hash]; ok {
if len(txs) == 1 {
// if only one tx, remove entire address entry.
delete(pool.queue, address)
} else {
delete(txs, hash)
}
break
}
}
}
// validatePool removes invalid and processed transactions from the main pool. // validatePool removes invalid and processed transactions from the main pool.
func (pool *TxPool) validatePool() { func (pool *TxPool) validatePool() {
state := pool.currentState() state := pool.currentState()

@ -181,6 +181,8 @@ func TestTransactionDoubleNonce(t *testing.T) {
if err := pool.add(tx2); err != nil { if err := pool.add(tx2); err != nil {
t.Error("didn't expect error", err) t.Error("didn't expect error", err)
} }
pool.checkQueue()
if len(pool.pending) != 2 { if len(pool.pending) != 2 {
t.Error("expected 2 pending txs. Got", len(pool.pending)) t.Error("expected 2 pending txs. Got", len(pool.pending))
} }

@ -68,8 +68,9 @@ func (self *txPoolApi) ApiVersion() string {
} }
func (self *txPoolApi) Status(req *shared.Request) (interface{}, error) { func (self *txPoolApi) Status(req *shared.Request) (interface{}, error) {
pending, queue := self.ethereum.TxPool().Stats()
return map[string]int{ return map[string]int{
"pending": self.ethereum.TxPool().GetTransactions().Len(), "pending": pending,
"queued": self.ethereum.TxPool().GetQueuedTransactions().Len(), "queued": queue,
}, nil }, nil
} }

Loading…
Cancel
Save