core: use atomic type (#27011)

pull/27014/head
s7v7nislands 2 years ago committed by GitHub
parent a03490c6b2
commit 949cee2fe3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      core/blockchain.go
  2. 6
      core/chain_indexer.go
  3. 4
      core/state_prefetcher.go
  4. 4
      core/types.go

@ -174,7 +174,7 @@ type BlockChain struct {
triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping gcproc time.Duration // Accumulates canonical block processing for trie dumping
lastWrite uint64 // Last block when the state was flushed lastWrite uint64 // Last block when the state was flushed
flushInterval int64 // Time interval (processing time) after which to flush a state flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state
triedb *trie.Database // The database handler for maintaining trie nodes. triedb *trie.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache) stateCache state.Database // State database to reuse between imports (contains state cache)
@ -215,8 +215,8 @@ type BlockChain struct {
wg sync.WaitGroup // wg sync.WaitGroup //
quit chan struct{} // shutdown signal, closed in Stop. quit chan struct{} // shutdown signal, closed in Stop.
running int32 // 0 if chain is running, 1 when stopped stopping atomic.Bool // false if chain is running, true when stopped
procInterrupt int32 // interrupt signaler for block processing procInterrupt atomic.Bool // interrupt signaler for block processing
engine consensus.Engine engine consensus.Engine
validator Validator // Block and state validator interface validator Validator // Block and state validator interface
@ -260,7 +260,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
cacheConfig: cacheConfig, cacheConfig: cacheConfig,
db: db, db: db,
triedb: triedb, triedb: triedb,
flushInterval: int64(cacheConfig.TrieTimeLimit),
triegc: prque.New[int64, common.Hash](nil), triegc: prque.New[int64, common.Hash](nil),
quit: make(chan struct{}), quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(), chainmu: syncx.NewClosableMutex(),
@ -273,6 +272,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
engine: engine, engine: engine,
vmConfig: vmConfig, vmConfig: vmConfig,
} }
bc.flushInterval.Store(int64(cacheConfig.TrieTimeLimit))
bc.forker = NewForkChoice(bc, shouldPreserve) bc.forker = NewForkChoice(bc, shouldPreserve)
bc.stateCache = state.NewDatabaseWithNodeDB(bc.db, bc.triedb) bc.stateCache = state.NewDatabaseWithNodeDB(bc.db, bc.triedb)
bc.validator = NewBlockValidator(chainConfig, bc, engine) bc.validator = NewBlockValidator(chainConfig, bc, engine)
@ -916,7 +916,7 @@ func (bc *BlockChain) writeHeadBlock(block *types.Block) {
// This method has been exposed to allow tests to stop the blockchain while simulating // This method has been exposed to allow tests to stop the blockchain while simulating
// a crash. // a crash.
func (bc *BlockChain) stopWithoutSaving() { func (bc *BlockChain) stopWithoutSaving() {
if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) { if !bc.stopping.CompareAndSwap(false, true) {
return return
} }
@ -998,12 +998,12 @@ func (bc *BlockChain) Stop() {
// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after // errInsertionInterrupted as soon as possible. Insertion is permanently disabled after
// calling this method. // calling this method.
func (bc *BlockChain) StopInsert() { func (bc *BlockChain) StopInsert() {
atomic.StoreInt32(&bc.procInterrupt, 1) bc.procInterrupt.Store(true)
} }
// insertStopped returns true after StopInsert has been called. // insertStopped returns true after StopInsert has been called.
func (bc *BlockChain) insertStopped() bool { func (bc *BlockChain) insertStopped() bool {
return atomic.LoadInt32(&bc.procInterrupt) == 1 return bc.procInterrupt.Load()
} }
func (bc *BlockChain) procFutureBlocks() { func (bc *BlockChain) procFutureBlocks() {
@ -1382,7 +1382,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
} }
// Find the next state trie we need to commit // Find the next state trie we need to commit
chosen := current - TriesInMemory chosen := current - TriesInMemory
flushInterval := time.Duration(atomic.LoadInt64(&bc.flushInterval)) flushInterval := time.Duration(bc.flushInterval.Load())
// If we exceeded time allowance, flush an entire trie to disk // If we exceeded time allowance, flush an entire trie to disk
if bc.gcproc > flushInterval { if bc.gcproc > flushInterval {
// If the header is missing (canonical chain behind), we're reorging a low // If the header is missing (canonical chain behind), we're reorging a low
@ -1735,7 +1735,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
// If we have a followup block, run that against the current state to pre-cache // If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes. // transactions and probabilistically some of the account/storage trie nodes.
var followupInterrupt uint32 var followupInterrupt atomic.Bool
if !bc.cacheConfig.TrieCleanNoPrefetch { if !bc.cacheConfig.TrieCleanNoPrefetch {
if followup, err := it.peek(); followup != nil && err == nil { if followup, err := it.peek(); followup != nil && err == nil {
throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps) throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps)
@ -1744,7 +1744,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt) bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
blockPrefetchExecuteTimer.Update(time.Since(start)) blockPrefetchExecuteTimer.Update(time.Since(start))
if atomic.LoadUint32(&followupInterrupt) == 1 { if followupInterrupt.Load() {
blockPrefetchInterruptMeter.Mark(1) blockPrefetchInterruptMeter.Mark(1)
} }
}(time.Now(), followup, throwaway) }(time.Now(), followup, throwaway)
@ -1756,7 +1756,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
if err != nil { if err != nil {
bc.reportBlock(block, receipts, err) bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1) followupInterrupt.Store(true)
return it.index, err return it.index, err
} }
ptime := time.Since(pstart) ptime := time.Since(pstart)
@ -1764,7 +1764,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
vstart := time.Now() vstart := time.Now()
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err) bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1) followupInterrupt.Store(true)
return it.index, err return it.index, err
} }
vtime := time.Since(vstart) vtime := time.Since(vstart)
@ -1797,7 +1797,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
} else { } else {
status, err = bc.writeBlockAndSetHead(block, receipts, logs, statedb, false) status, err = bc.writeBlockAndSetHead(block, receipts, logs, statedb, false)
} }
atomic.StoreUint32(&followupInterrupt, 1) followupInterrupt.Store(true)
if err != nil { if err != nil {
return it.index, err return it.index, err
} }
@ -2497,5 +2497,5 @@ func (bc *BlockChain) SetBlockValidatorAndProcessorForTesting(v Validator, p Pro
// The interval is in terms of block processing time, not wall clock. // The interval is in terms of block processing time, not wall clock.
// It is thread-safe and can be called repeatedly without side effects. // It is thread-safe and can be called repeatedly without side effects.
func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) { func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) {
atomic.StoreInt64(&bc.flushInterval, int64(interval)) bc.flushInterval.Store(int64(interval))
} }

@ -75,7 +75,7 @@ type ChainIndexer struct {
backend ChainIndexerBackend // Background processor generating the index data content backend ChainIndexerBackend // Background processor generating the index data content
children []*ChainIndexer // Child indexers to cascade chain updates to children []*ChainIndexer // Child indexers to cascade chain updates to
active uint32 // Flag whether the event loop was started active atomic.Bool // Flag whether the event loop was started
update chan struct{} // Notification channel that headers should be processed update chan struct{} // Notification channel that headers should be processed
quit chan chan error // Quit channel to tear down running goroutines quit chan chan error // Quit channel to tear down running goroutines
ctx context.Context ctx context.Context
@ -166,7 +166,7 @@ func (c *ChainIndexer) Close() error {
errs = append(errs, err) errs = append(errs, err)
} }
// If needed, tear down the secondary event loop // If needed, tear down the secondary event loop
if atomic.LoadUint32(&c.active) != 0 { if c.active.Load() {
c.quit <- errc c.quit <- errc
if err := <-errc; err != nil { if err := <-errc; err != nil {
errs = append(errs, err) errs = append(errs, err)
@ -196,7 +196,7 @@ func (c *ChainIndexer) Close() error {
// queue. // queue.
func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainHeadEvent, sub event.Subscription) { func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainHeadEvent, sub event.Subscription) {
// Mark the chain indexer as active, requiring an additional teardown // Mark the chain indexer as active, requiring an additional teardown
atomic.StoreUint32(&c.active, 1) c.active.Store(true)
defer sub.Unsubscribe() defer sub.Unsubscribe()

@ -47,7 +47,7 @@ func newStatePrefetcher(config *params.ChainConfig, bc *BlockChain, engine conse
// Prefetch processes the state changes according to the Ethereum rules by running // Prefetch processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb, but any changes are discarded. The // the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and state trie nodes. // only goal is to pre-cache transaction signatures and state trie nodes.
func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) { func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *atomic.Bool) {
var ( var (
header = block.Header() header = block.Header()
gaspool = new(GasPool).AddGas(block.GasLimit()) gaspool = new(GasPool).AddGas(block.GasLimit())
@ -59,7 +59,7 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
byzantium := p.config.IsByzantium(block.Number()) byzantium := p.config.IsByzantium(block.Number())
for i, tx := range block.Transactions() { for i, tx := range block.Transactions() {
// If block precaching was interrupted, abort // If block precaching was interrupted, abort
if interrupt != nil && atomic.LoadUint32(interrupt) == 1 { if interrupt != nil && interrupt.Load() {
return return
} }
// Convert the transaction into an executable message and pre-cache its sender // Convert the transaction into an executable message and pre-cache its sender

@ -17,6 +17,8 @@
package core package core
import ( import (
"sync/atomic"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/core/vm"
@ -39,7 +41,7 @@ type Prefetcher interface {
// Prefetch processes the state changes according to the Ethereum rules by running // Prefetch processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb, but any changes are discarded. The // the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and state trie nodes. // only goal is to pre-cache transaction signatures and state trie nodes.
Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *atomic.Bool)
} }
// Processor is an interface for processing blocks using a given initial state. // Processor is an interface for processing blocks using a given initial state.

Loading…
Cancel
Save