Merge pull request #15095 from karalabe/txpool-avoid-deep-reorg

core: use blocks and avoid deep reorgs in txpool
pull/15103/head
Péter Szilágyi 7 years ago committed by GitHub
commit 160add8570
  1. 84
      core/tx_pool.go
  2. 16
      core/tx_pool_test.go

@ -19,6 +19,7 @@ package core
import ( import (
"errors" "errors"
"fmt" "fmt"
"math"
"math/big" "math/big"
"sort" "sort"
"sync" "sync"
@ -105,11 +106,11 @@ var (
// blockChain provides the state of blockchain and current gas limit to do // blockChain provides the state of blockchain and current gas limit to do
// some pre checks in tx pool and event subscribers. // some pre checks in tx pool and event subscribers.
type blockChain interface { type blockChain interface {
CurrentHeader() *types.Header CurrentBlock() *types.Block
SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
GetBlock(hash common.Hash, number uint64) *types.Block GetBlock(hash common.Hash, number uint64) *types.Block
StateAt(root common.Hash) (*state.StateDB, error) StateAt(root common.Hash) (*state.StateDB, error)
SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
} }
// TxPoolConfig are the configuration parameters of the transaction pool. // TxPoolConfig are the configuration parameters of the transaction pool.
@ -223,7 +224,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
} }
pool.locals = newAccountSet(pool.signer) pool.locals = newAccountSet(pool.signer)
pool.priced = newTxPricedList(&pool.all) pool.priced = newTxPricedList(&pool.all)
pool.reset(nil, chain.CurrentHeader()) pool.reset(nil, chain.CurrentBlock().Header())
// If local transactions and journaling is enabled, load from disk // If local transactions and journaling is enabled, load from disk
if !config.NoLocals && config.Journal != "" { if !config.NoLocals && config.Journal != "" {
@ -265,7 +266,7 @@ func (pool *TxPool) loop() {
defer journal.Stop() defer journal.Stop()
// Track the previous head headers for transaction reorgs // Track the previous head headers for transaction reorgs
head := pool.chain.CurrentHeader() head := pool.chain.CurrentBlock()
// Keep waiting for and reacting to the various events // Keep waiting for and reacting to the various events
for { for {
@ -277,8 +278,8 @@ func (pool *TxPool) loop() {
if pool.chainconfig.IsHomestead(ev.Block.Number()) { if pool.chainconfig.IsHomestead(ev.Block.Number()) {
pool.homestead = true pool.homestead = true
} }
pool.reset(head, ev.Block.Header()) pool.reset(head.Header(), ev.Block.Header())
head = ev.Block.Header() head = ev.Block
pool.mu.Unlock() pool.mu.Unlock()
} }
@ -344,43 +345,52 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
var reinject types.Transactions var reinject types.Transactions
if oldHead != nil && oldHead.Hash() != newHead.ParentHash { if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
var discarded, included types.Transactions // If the reorg is too deep, avoid doing it (will happen during fast sync)
oldNum := oldHead.Number.Uint64()
var ( newNum := newHead.Number.Uint64()
rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64()) if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
) log.Warn("Skipping deep transaction reorg", "depth", depth)
for rem.NumberU64() > add.NumberU64() { } else {
discarded = append(discarded, rem.Transactions()...) // Reorg seems shallow enough to pull in all transactions into memory
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { var discarded, included types.Transactions
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
return var (
} rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
} add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
for add.NumberU64() > rem.NumberU64() { )
included = append(included, add.Transactions()...) for rem.NumberU64() > add.NumberU64() {
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { discarded = append(discarded, rem.Transactions()...)
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
return log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
return
}
} }
} for add.NumberU64() > rem.NumberU64() {
for rem.Hash() != add.Hash() { included = append(included, add.Transactions()...)
discarded = append(discarded, rem.Transactions()...) if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) return
return }
} }
included = append(included, add.Transactions()...) for rem.Hash() != add.Hash() {
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { discarded = append(discarded, rem.Transactions()...)
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
return log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
return
}
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
return
}
} }
reinject = types.TxDifference(discarded, included)
} }
reinject = types.TxDifference(discarded, included)
} }
// Initialize the internal state to the current head // Initialize the internal state to the current head
if newHead == nil { if newHead == nil {
newHead = pool.chain.CurrentHeader() // Special case during testing newHead = pool.chain.CurrentBlock().Header() // Special case during testing
} }
statedb, err := pool.chain.StateAt(newHead.Root) statedb, err := pool.chain.StateAt(newHead.Root)
if err != nil { if err != nil {

@ -50,24 +50,24 @@ type testBlockChain struct {
chainHeadFeed *event.Feed chainHeadFeed *event.Feed
} }
func (bc *testBlockChain) CurrentHeader() *types.Header { func (bc *testBlockChain) CurrentBlock() *types.Block {
return &types.Header{ return types.NewBlock(&types.Header{
GasLimit: bc.gasLimit, GasLimit: bc.gasLimit,
} }, nil, nil, nil)
}
func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription {
return bc.chainHeadFeed.Subscribe(ch)
} }
func (bc *testBlockChain) GetBlock(hash common.Hash, number uint64) *types.Block { func (bc *testBlockChain) GetBlock(hash common.Hash, number uint64) *types.Block {
return types.NewBlock(bc.CurrentHeader(), nil, nil, nil) return bc.CurrentBlock()
} }
func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) { func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) {
return bc.statedb, nil return bc.statedb, nil
} }
func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription {
return bc.chainHeadFeed.Subscribe(ch)
}
func transaction(nonce uint64, gaslimit *big.Int, key *ecdsa.PrivateKey) *types.Transaction { func transaction(nonce uint64, gaslimit *big.Int, key *ecdsa.PrivateKey) *types.Transaction {
return pricedTransaction(nonce, gaslimit, big.NewInt(1), key) return pricedTransaction(nonce, gaslimit, big.NewInt(1), key)
} }

Loading…
Cancel
Save