core/state: blocking prefetcher on term signal, parallel updates (#29519)

* core/state: trie prefetcher change: calling trie() doesn't stop the associated subfetcher

Co-authored-by: Martin HS <martin@swende.se>
Co-authored-by: Péter Szilágyi <peterke@gmail.com>

* core/state: improve prefetcher

* core/state: restore async prefetcher stask scheduling

* core/state: finish prefetching async and process storage updates async

* core/state: don't use the prefetcher for missing snapshot items

* core/state: remove update concurrency for Verkle tries

* core/state: add some termination checks to prefetcher async shutdowns

* core/state: differentiate db tries and prefetched tries

* core/state: teh teh teh

---------

Co-authored-by: Jared Wasinger <j-wasinger@hotmail.com>
Co-authored-by: Martin HS <martin@swende.se>
Co-authored-by: Gary Rong <garyrong0905@gmail.com>
pull/29767/head
Péter Szilágyi 6 months ago committed by GitHub
parent 44a50c9f96
commit 2ac83e197b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 8
      core/blockchain.go
  2. 97
      core/state/state_object.go
  3. 106
      core/state/statedb.go
  4. 292
      core/state/trie_prefetcher.go
  5. 65
      core/state/trie_prefetcher_test.go

@ -1805,8 +1805,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
} }
statedb.SetLogger(bc.logger) statedb.SetLogger(bc.logger)
// Enable prefetching to pull in trie node paths while processing transactions // If we are past Byzantium, enable prefetching to pull in trie node paths
statedb.StartPrefetcher("chain") // while processing transactions. Before Byzantium the prefetcher is mostly
// useless due to the intermediate root hashing after each transaction.
if bc.chainConfig.IsByzantium(block.Number()) {
statedb.StartPrefetcher("chain")
}
activeState = statedb activeState = statedb
// If we have a followup block, run that against the current state to pre-cache // If we have a followup block, run that against the current state to pre-cache

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"io" "io"
"maps" "maps"
"sync"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -33,6 +34,14 @@ import (
"github.com/holiman/uint256" "github.com/holiman/uint256"
) )
// hasherPool holds a pool of hashers used by state objects during concurrent
// trie updates.
var hasherPool = sync.Pool{
New: func() interface{} {
return crypto.NewKeccakState()
},
}
type Storage map[common.Hash]common.Hash type Storage map[common.Hash]common.Hash
func (s Storage) Copy() Storage { func (s Storage) Copy() Storage {
@ -118,27 +127,39 @@ func (s *stateObject) touch() {
} }
} }
// getTrie returns the associated storage trie. The trie will be opened // getTrie returns the associated storage trie. The trie will be opened if it'
// if it's not loaded previously. An error will be returned if trie can't // not loaded previously. An error will be returned if trie can't be loaded.
// be loaded. //
// If a new trie is opened, it will be cached within the state object to allow
// subsequent reads to expand the same trie instead of reloading from disk.
func (s *stateObject) getTrie() (Trie, error) { func (s *stateObject) getTrie() (Trie, error) {
if s.trie == nil { if s.trie == nil {
// Try fetching from prefetcher first tr, err := s.db.db.OpenStorageTrie(s.db.originalRoot, s.address, s.data.Root, s.db.trie)
if s.data.Root != types.EmptyRootHash && s.db.prefetcher != nil { if err != nil {
// When the miner is creating the pending state, there is no prefetcher return nil, err
s.trie = s.db.prefetcher.trie(s.addrHash, s.data.Root)
}
if s.trie == nil {
tr, err := s.db.db.OpenStorageTrie(s.db.originalRoot, s.address, s.data.Root, s.db.trie)
if err != nil {
return nil, err
}
s.trie = tr
} }
s.trie = tr
} }
return s.trie, nil return s.trie, nil
} }
// getPrefetchedTrie returns the associated trie, as populated by the prefetcher
// if it's available.
//
// Note, opposed to getTrie, this method will *NOT* blindly cache the resulting
// trie in the state object. The caller might want to do that, but it's cleaner
// to break the hidden interdependency between retrieving tries from the db or
// from the prefetcher.
func (s *stateObject) getPrefetchedTrie() (Trie, error) {
// If there's nothing to meaningfully return, let the user figure it out by
// pulling the trie from disk.
if s.data.Root == types.EmptyRootHash || s.db.prefetcher == nil {
return nil, nil
}
// Attempt to retrieve the trie from the pretecher
return s.db.prefetcher.trie(s.addrHash, s.data.Root)
}
// GetState retrieves a value from the account storage trie. // GetState retrieves a value from the account storage trie.
func (s *stateObject) GetState(key common.Hash) common.Hash { func (s *stateObject) GetState(key common.Hash) common.Hash {
value, _ := s.getState(key) value, _ := s.getState(key)
@ -248,7 +269,7 @@ func (s *stateObject) setState(key common.Hash, value common.Hash, origin common
// finalise moves all dirty storage slots into the pending area to be hashed or // finalise moves all dirty storage slots into the pending area to be hashed or
// committed later. It is invoked at the end of every transaction. // committed later. It is invoked at the end of every transaction.
func (s *stateObject) finalise(prefetch bool) { func (s *stateObject) finalise() {
slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage)) slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage))
for key, value := range s.dirtyStorage { for key, value := range s.dirtyStorage {
// If the slot is different from its original value, move it into the // If the slot is different from its original value, move it into the
@ -263,8 +284,10 @@ func (s *stateObject) finalise(prefetch bool) {
delete(s.pendingStorage, key) delete(s.pendingStorage, key)
} }
} }
if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash { if s.db.prefetcher != nil && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash {
s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch) if err := s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch); err != nil {
log.Error("Failed to prefetch slots", "addr", s.address, "slots", len(slotsToPrefetch), "err", err)
}
} }
if len(s.dirtyStorage) > 0 { if len(s.dirtyStorage) > 0 {
s.dirtyStorage = make(Storage) s.dirtyStorage = make(Storage)
@ -283,25 +306,43 @@ func (s *stateObject) finalise(prefetch bool) {
// storage change at all. // storage change at all.
func (s *stateObject) updateTrie() (Trie, error) { func (s *stateObject) updateTrie() (Trie, error) {
// Make sure all dirty slots are finalized into the pending storage area // Make sure all dirty slots are finalized into the pending storage area
s.finalise(false) s.finalise()
// Short circuit if nothing changed, don't bother with hashing anything // Short circuit if nothing changed, don't bother with hashing anything
if len(s.pendingStorage) == 0 { if len(s.pendingStorage) == 0 {
return s.trie, nil return s.trie, nil
} }
// Retrieve a pretecher populated trie, or fall back to the database
tr, err := s.getPrefetchedTrie()
switch {
case err != nil:
// Fetcher retrieval failed, something's very wrong, abort
s.db.setError(err)
return nil, err
case tr == nil:
// Fetcher not running or empty trie, fallback to the database trie
tr, err = s.getTrie()
if err != nil {
s.db.setError(err)
return nil, err
}
default:
// Prefetcher returned a live trie, swap it out for the current one
s.trie = tr
}
// The snapshot storage map for the object // The snapshot storage map for the object
var ( var (
storage map[common.Hash][]byte storage map[common.Hash][]byte
origin map[common.Hash][]byte origin map[common.Hash][]byte
) )
tr, err := s.getTrie()
if err != nil {
s.db.setError(err)
return nil, err
}
// Insert all the pending storage updates into the trie // Insert all the pending storage updates into the trie
usedStorage := make([][]byte, 0, len(s.pendingStorage)) usedStorage := make([][]byte, 0, len(s.pendingStorage))
hasher := hasherPool.Get().(crypto.KeccakState)
defer hasherPool.Put(hasher)
// Perform trie updates before deletions. This prevents resolution of unnecessary trie nodes // Perform trie updates before deletions. This prevents resolution of unnecessary trie nodes
// in circumstances similar to the following: // in circumstances similar to the following:
// //
@ -330,26 +371,30 @@ func (s *stateObject) updateTrie() (Trie, error) {
s.db.setError(err) s.db.setError(err)
return nil, err return nil, err
} }
s.db.StorageUpdated += 1 s.db.StorageUpdated.Add(1)
} else { } else {
deletions = append(deletions, key) deletions = append(deletions, key)
} }
// Cache the mutated storage slots until commit // Cache the mutated storage slots until commit
if storage == nil { if storage == nil {
s.db.storagesLock.Lock()
if storage = s.db.storages[s.addrHash]; storage == nil { if storage = s.db.storages[s.addrHash]; storage == nil {
storage = make(map[common.Hash][]byte) storage = make(map[common.Hash][]byte)
s.db.storages[s.addrHash] = storage s.db.storages[s.addrHash] = storage
} }
s.db.storagesLock.Unlock()
} }
khash := crypto.HashData(s.db.hasher, key[:]) khash := crypto.HashData(hasher, key[:])
storage[khash] = encoded // encoded will be nil if it's deleted storage[khash] = encoded // encoded will be nil if it's deleted
// Cache the original value of mutated storage slots // Cache the original value of mutated storage slots
if origin == nil { if origin == nil {
s.db.storagesLock.Lock()
if origin = s.db.storagesOrigin[s.address]; origin == nil { if origin = s.db.storagesOrigin[s.address]; origin == nil {
origin = make(map[common.Hash][]byte) origin = make(map[common.Hash][]byte)
s.db.storagesOrigin[s.address] = origin s.db.storagesOrigin[s.address] = origin
} }
s.db.storagesLock.Unlock()
} }
// Track the original value of slot only if it's mutated first time // Track the original value of slot only if it's mutated first time
if _, ok := origin[khash]; !ok { if _, ok := origin[khash]; !ok {
@ -369,7 +414,7 @@ func (s *stateObject) updateTrie() (Trie, error) {
s.db.setError(err) s.db.setError(err)
return nil, err return nil, err
} }
s.db.StorageDeleted += 1 s.db.StorageDeleted.Add(1)
} }
// If no slots were touched, issue a warning as we shouldn't have done all // If no slots were touched, issue a warning as we shouldn't have done all
// the above work in the first place // the above work in the first place

@ -24,6 +24,7 @@ import (
"slices" "slices"
"sort" "sort"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -96,10 +97,12 @@ type StateDB struct {
// These maps hold the state changes (including the corresponding // These maps hold the state changes (including the corresponding
// original value) that occurred in this **block**. // original value) that occurred in this **block**.
accounts map[common.Hash][]byte // The mutated accounts in 'slim RLP' encoding accounts map[common.Hash][]byte // The mutated accounts in 'slim RLP' encoding
accountsOrigin map[common.Address][]byte // The original value of mutated accounts in 'slim RLP' encoding
storages map[common.Hash]map[common.Hash][]byte // The mutated slots in prefix-zero trimmed rlp format storages map[common.Hash]map[common.Hash][]byte // The mutated slots in prefix-zero trimmed rlp format
accountsOrigin map[common.Address][]byte // The original value of mutated accounts in 'slim RLP' encoding
storagesOrigin map[common.Address]map[common.Hash][]byte // The original value of mutated slots in prefix-zero trimmed rlp format storagesOrigin map[common.Address]map[common.Hash][]byte // The original value of mutated slots in prefix-zero trimmed rlp format
storagesLock sync.Mutex // Mutex protecting the maps during concurrent updates/commits
// This map holds 'live' objects, which will get modified while // This map holds 'live' objects, which will get modified while
// processing a state transition. // processing a state transition.
@ -165,9 +168,9 @@ type StateDB struct {
TrieDBCommits time.Duration TrieDBCommits time.Duration
AccountUpdated int AccountUpdated int
StorageUpdated int StorageUpdated atomic.Int64
AccountDeleted int AccountDeleted int
StorageDeleted int StorageDeleted atomic.Int64
// Testing hooks // Testing hooks
onCommit func(states *triestate.Set) // Hook invoked when commit is performed onCommit func(states *triestate.Set) // Hook invoked when commit is performed
@ -214,7 +217,8 @@ func (s *StateDB) SetLogger(l *tracing.Hooks) {
// commit phase, most of the needed data is already hot. // commit phase, most of the needed data is already hot.
func (s *StateDB) StartPrefetcher(namespace string) { func (s *StateDB) StartPrefetcher(namespace string) {
if s.prefetcher != nil { if s.prefetcher != nil {
s.prefetcher.close() s.prefetcher.terminate(false)
s.prefetcher.report()
s.prefetcher = nil s.prefetcher = nil
} }
if s.snap != nil { if s.snap != nil {
@ -226,7 +230,8 @@ func (s *StateDB) StartPrefetcher(namespace string) {
// from the gathered metrics. // from the gathered metrics.
func (s *StateDB) StopPrefetcher() { func (s *StateDB) StopPrefetcher() {
if s.prefetcher != nil { if s.prefetcher != nil {
s.prefetcher.close() s.prefetcher.terminate(false)
s.prefetcher.report()
s.prefetcher = nil s.prefetcher = nil
} }
} }
@ -544,9 +549,6 @@ func (s *StateDB) GetTransientState(addr common.Address, key common.Hash) common
// updateStateObject writes the given object to the trie. // updateStateObject writes the given object to the trie.
func (s *StateDB) updateStateObject(obj *stateObject) { func (s *StateDB) updateStateObject(obj *stateObject) {
// Track the amount of time wasted on updating the account from the trie
defer func(start time.Time) { s.AccountUpdates += time.Since(start) }(time.Now())
// Encode the account and update the account trie // Encode the account and update the account trie
addr := obj.Address() addr := obj.Address()
if err := s.trie.UpdateAccount(addr, &obj.data); err != nil { if err := s.trie.UpdateAccount(addr, &obj.data); err != nil {
@ -575,10 +577,6 @@ func (s *StateDB) updateStateObject(obj *stateObject) {
// deleteStateObject removes the given object from the state trie. // deleteStateObject removes the given object from the state trie.
func (s *StateDB) deleteStateObject(addr common.Address) { func (s *StateDB) deleteStateObject(addr common.Address) {
// Track the amount of time wasted on deleting the account from the trie
defer func(start time.Time) { s.AccountUpdates += time.Since(start) }(time.Now())
// Delete the account from the trie
if err := s.trie.DeleteAccount(addr); err != nil { if err := s.trie.DeleteAccount(addr); err != nil {
s.setError(fmt.Errorf("deleteStateObject (%x) error: %v", addr[:], err)) s.setError(fmt.Errorf("deleteStateObject (%x) error: %v", addr[:], err))
} }
@ -743,13 +741,6 @@ func (s *StateDB) Copy() *StateDB {
// in the middle of a transaction. // in the middle of a transaction.
state.accessList = s.accessList.Copy() state.accessList = s.accessList.Copy()
state.transientStorage = s.transientStorage.Copy() state.transientStorage = s.transientStorage.Copy()
// If there's a prefetcher running, make an inactive copy of it that can
// only access data but does not actively preload (since the user will not
// know that they need to explicitly terminate an active copy).
if s.prefetcher != nil {
state.prefetcher = s.prefetcher.copy()
}
return state return state
} }
@ -820,7 +811,7 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
delete(s.accountsOrigin, obj.address) // Clear out any previously updated account data (may be recreated via a resurrect) delete(s.accountsOrigin, obj.address) // Clear out any previously updated account data (may be recreated via a resurrect)
delete(s.storagesOrigin, obj.address) // Clear out any previously updated storage data (may be recreated via a resurrect) delete(s.storagesOrigin, obj.address) // Clear out any previously updated storage data (may be recreated via a resurrect)
} else { } else {
obj.finalise(true) // Prefetch slots in the background obj.finalise()
s.markUpdate(addr) s.markUpdate(addr)
} }
// At this point, also ship the address off to the precacher. The precacher // At this point, also ship the address off to the precacher. The precacher
@ -829,7 +820,9 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure
} }
if s.prefetcher != nil && len(addressesToPrefetch) > 0 { if s.prefetcher != nil && len(addressesToPrefetch) > 0 {
s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, addressesToPrefetch) if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, addressesToPrefetch); err != nil {
log.Error("Failed to prefetch addresses", "addresses", len(addressesToPrefetch), "err", err)
}
} }
// Invalidate journal because reverting across transactions is not allowed. // Invalidate journal because reverting across transactions is not allowed.
s.clearJournalAndRefund() s.clearJournalAndRefund()
@ -842,42 +835,52 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
// Finalise all the dirty storage states and write them into the tries // Finalise all the dirty storage states and write them into the tries
s.Finalise(deleteEmptyObjects) s.Finalise(deleteEmptyObjects)
// If there was a trie prefetcher operating, it gets aborted and irrevocably // If there was a trie prefetcher operating, terminate it async so that the
// modified after we start retrieving tries. Remove it from the statedb after // individual storage tries can be updated as soon as the disk load finishes.
// this round of use.
//
// This is weird pre-byzantium since the first tx runs with a prefetcher and
// the remainder without, but pre-byzantium even the initial prefetcher is
// useless, so no sleep lost.
prefetcher := s.prefetcher
if s.prefetcher != nil { if s.prefetcher != nil {
s.prefetcher.terminate(true)
defer func() { defer func() {
s.prefetcher.close() s.prefetcher.report()
s.prefetcher = nil s.prefetcher = nil // Pre-byzantium, unset any used up prefetcher
}() }()
} }
// Although naively it makes sense to retrieve the account trie and then do // Process all storage updates concurrently. The state object update root
// the contract storage and account updates sequentially, that short circuits // method will internally call a blocking trie fetch from the prefetcher,
// the account prefetcher. Instead, let's process all the storage updates // so there's no need to explicitly wait for the prefetchers to finish.
// first, giving the account prefetches just a few more milliseconds of time var (
// to pull useful data from disk. start = time.Now()
start := time.Now() workers errgroup.Group
)
if s.db.TrieDB().IsVerkle() {
// Whilst MPT storage tries are independent, Verkle has one single trie
// for all the accounts and all the storage slots merged together. The
// former can thus be simply parallelized, but updating the latter will
// need concurrency support within the trie itself. That's a TODO for a
// later time.
workers.SetLimit(1)
}
for addr, op := range s.mutations { for addr, op := range s.mutations {
if op.applied { if op.applied || op.isDelete() {
continue
}
if op.isDelete() {
continue continue
} }
s.stateObjects[addr].updateRoot() obj := s.stateObjects[addr] // closure for the task runner below
workers.Go(func() error {
obj.updateRoot()
return nil
})
} }
workers.Wait()
s.StorageUpdates += time.Since(start) s.StorageUpdates += time.Since(start)
// Now we're about to start to write changes to the trie. The trie is so far // Now we're about to start to write changes to the trie. The trie is so far
// _untouched_. We can check with the prefetcher, if it can give us a trie // _untouched_. We can check with the prefetcher, if it can give us a trie
// which has the same root, but also has some content loaded into it. // which has the same root, but also has some content loaded into it.
if prefetcher != nil { start = time.Now()
if trie := prefetcher.trie(common.Hash{}, s.originalRoot); trie != nil {
if s.prefetcher != nil {
if trie, err := s.prefetcher.trie(common.Hash{}, s.originalRoot); err != nil {
log.Error("Failed to retrieve account pre-fetcher trie", "err", err)
} else if trie != nil {
s.trie = trie s.trie = trie
} }
} }
@ -913,8 +916,10 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
s.deleteStateObject(deletedAddr) s.deleteStateObject(deletedAddr)
s.AccountDeleted += 1 s.AccountDeleted += 1
} }
if prefetcher != nil { s.AccountUpdates += time.Since(start)
prefetcher.used(common.Hash{}, s.originalRoot, usedAddrs)
if s.prefetcher != nil {
s.prefetcher.used(common.Hash{}, s.originalRoot, usedAddrs)
} }
// Track the amount of time wasted on hashing the account trie // Track the amount of time wasted on hashing the account trie
defer func(start time.Time) { s.AccountHashes += time.Since(start) }(time.Now()) defer func(start time.Time) { s.AccountHashes += time.Since(start) }(time.Now())
@ -1255,15 +1260,16 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er
return common.Hash{}, err return common.Hash{}, err
} }
accountUpdatedMeter.Mark(int64(s.AccountUpdated)) accountUpdatedMeter.Mark(int64(s.AccountUpdated))
storageUpdatedMeter.Mark(int64(s.StorageUpdated)) storageUpdatedMeter.Mark(s.StorageUpdated.Load())
accountDeletedMeter.Mark(int64(s.AccountDeleted)) accountDeletedMeter.Mark(int64(s.AccountDeleted))
storageDeletedMeter.Mark(int64(s.StorageDeleted)) storageDeletedMeter.Mark(s.StorageDeleted.Load())
accountTrieUpdatedMeter.Mark(int64(accountTrieNodesUpdated)) accountTrieUpdatedMeter.Mark(int64(accountTrieNodesUpdated))
accountTrieDeletedMeter.Mark(int64(accountTrieNodesDeleted)) accountTrieDeletedMeter.Mark(int64(accountTrieNodesDeleted))
storageTriesUpdatedMeter.Mark(int64(storageTrieNodesUpdated)) storageTriesUpdatedMeter.Mark(int64(storageTrieNodesUpdated))
storageTriesDeletedMeter.Mark(int64(storageTrieNodesDeleted)) storageTriesDeletedMeter.Mark(int64(storageTrieNodesDeleted))
s.AccountUpdated, s.AccountDeleted = 0, 0 s.AccountUpdated, s.AccountDeleted = 0, 0
s.StorageUpdated, s.StorageDeleted = 0, 0 s.StorageUpdated.Store(0)
s.StorageDeleted.Store(0)
// If snapshotting is enabled, update the snapshot tree with this new version // If snapshotting is enabled, update the snapshot tree with this new version
if s.snap != nil { if s.snap != nil {

@ -17,6 +17,7 @@
package state package state
import ( import (
"errors"
"sync" "sync"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -27,6 +28,10 @@ import (
var ( var (
// triePrefetchMetricsPrefix is the prefix under which to publish the metrics. // triePrefetchMetricsPrefix is the prefix under which to publish the metrics.
triePrefetchMetricsPrefix = "trie/prefetch/" triePrefetchMetricsPrefix = "trie/prefetch/"
// errTerminated is returned if a fetcher is attempted to be operated after it
// has already terminated.
errTerminated = errors.New("fetcher is already terminated")
) )
// triePrefetcher is an active prefetcher, which receives accounts or storage // triePrefetcher is an active prefetcher, which receives accounts or storage
@ -37,160 +42,126 @@ var (
type triePrefetcher struct { type triePrefetcher struct {
db Database // Database to fetch trie nodes through db Database // Database to fetch trie nodes through
root common.Hash // Root hash of the account trie for metrics root common.Hash // Root hash of the account trie for metrics
fetches map[string]Trie // Partially or fully fetched tries. Only populated for inactive copies.
fetchers map[string]*subfetcher // Subfetchers for each trie fetchers map[string]*subfetcher // Subfetchers for each trie
term chan struct{} // Channel to signal interruption
deliveryMissMeter metrics.Meter deliveryMissMeter metrics.Meter
accountLoadMeter metrics.Meter accountLoadMeter metrics.Meter
accountDupMeter metrics.Meter accountDupMeter metrics.Meter
accountSkipMeter metrics.Meter
accountWasteMeter metrics.Meter accountWasteMeter metrics.Meter
storageLoadMeter metrics.Meter storageLoadMeter metrics.Meter
storageDupMeter metrics.Meter storageDupMeter metrics.Meter
storageSkipMeter metrics.Meter
storageWasteMeter metrics.Meter storageWasteMeter metrics.Meter
} }
func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace prefix := triePrefetchMetricsPrefix + namespace
p := &triePrefetcher{ return &triePrefetcher{
db: db, db: db,
root: root, root: root,
fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map
term: make(chan struct{}),
deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil), deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil), accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil), accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil),
accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil),
accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil), accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),
storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil), storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil),
storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil), storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
} }
return p
} }
// close iterates over all the subfetchers, aborts any that were left spinning // terminate iterates over all the subfetchers and issues a terminateion request
// and reports the stats to the metrics subsystem. // to all of them. Depending on the async parameter, the method will either block
func (p *triePrefetcher) close() { // until all subfetchers spin down, or return immediately.
func (p *triePrefetcher) terminate(async bool) {
// Short circuit if the fetcher is already closed
select {
case <-p.term:
return
default:
}
// Termiante all sub-fetchers, sync or async, depending on the request
for _, fetcher := range p.fetchers { for _, fetcher := range p.fetchers {
fetcher.abort() // safe to do multiple times fetcher.terminate(async)
if metrics.Enabled {
if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
} else {
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
}
}
} }
// Clear out all fetchers (will crash on a second call, deliberate) close(p.term)
p.fetchers = nil
} }
// copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data // report aggregates the pre-fetching and usage metrics and reports them.
// already loaded will be copied over, but no goroutines will be started. This func (p *triePrefetcher) report() {
// is mostly used in the miner which creates a copy of it's actively mutated if !metrics.Enabled {
// state to be sealed while it may further mutate the state. return
func (p *triePrefetcher) copy() *triePrefetcher {
copy := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[string]Trie), // Active prefetchers use the fetches map
deliveryMissMeter: p.deliveryMissMeter,
accountLoadMeter: p.accountLoadMeter,
accountDupMeter: p.accountDupMeter,
accountSkipMeter: p.accountSkipMeter,
accountWasteMeter: p.accountWasteMeter,
storageLoadMeter: p.storageLoadMeter,
storageDupMeter: p.storageDupMeter,
storageSkipMeter: p.storageSkipMeter,
storageWasteMeter: p.storageWasteMeter,
} }
// If the prefetcher is already a copy, duplicate the data for _, fetcher := range p.fetchers {
if p.fetches != nil { fetcher.wait() // ensure the fetcher's idle before poking in its internals
for root, fetch := range p.fetches {
if fetch == nil { if fetcher.root == p.root {
continue p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
} }
copy.fetches[root] = p.db.CopyTrie(fetch) p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
} else {
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
} }
return copy
}
// Otherwise we're copying an active fetcher, retrieve the current states
for id, fetcher := range p.fetchers {
copy.fetches[id] = fetcher.peek()
} }
return copy
} }
// prefetch schedules a batch of trie items to prefetch. // prefetch schedules a batch of trie items to prefetch. After the prefetcher is
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte) { // closed, all the following tasks scheduled will not be executed and an error
// If the prefetcher is an inactive one, bail out // will be returned.
if p.fetches != nil { //
return // prefetch is called from two locations:
//
// 1. Finalize of the state-objects storage roots. This happens at the end
// of every transaction, meaning that if several transactions touches
// upon the same contract, the parameters invoking this method may be
// repeated.
// 2. Finalize of the main account trie. This happens only once per block.
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte) error {
// Ensure the subfetcher is still alive
select {
case <-p.term:
return errTerminated
default:
} }
// Active fetcher, schedule the retrievals
id := p.trieID(owner, root) id := p.trieID(owner, root)
fetcher := p.fetchers[id] fetcher := p.fetchers[id]
if fetcher == nil { if fetcher == nil {
fetcher = newSubfetcher(p.db, p.root, owner, root, addr) fetcher = newSubfetcher(p.db, p.root, owner, root, addr)
p.fetchers[id] = fetcher p.fetchers[id] = fetcher
} }
fetcher.schedule(keys) return fetcher.schedule(keys)
} }
// trie returns the trie matching the root hash, or nil if the prefetcher doesn't // trie returns the trie matching the root hash, blocking until the fetcher of
// have it. // the given trie terminates. If no fetcher exists for the request, nil will be
func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie { // returned.
// If the prefetcher is inactive, return from existing deep copies func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) (Trie, error) {
id := p.trieID(owner, root) // Bail if no trie was prefetched for this root
if p.fetches != nil { fetcher := p.fetchers[p.trieID(owner, root)]
trie := p.fetches[id]
if trie == nil {
p.deliveryMissMeter.Mark(1)
return nil
}
return p.db.CopyTrie(trie)
}
// Otherwise the prefetcher is active, bail if no trie was prefetched for this root
fetcher := p.fetchers[id]
if fetcher == nil { if fetcher == nil {
log.Error("Prefetcher missed to load trie", "owner", owner, "root", root)
p.deliveryMissMeter.Mark(1) p.deliveryMissMeter.Mark(1)
return nil return nil, nil
} }
// Interrupt the prefetcher if it's by any chance still running and return // Subfetcher exists, retrieve its trie
// a copy of any pre-loaded trie. return fetcher.peek(), nil
fetcher.abort() // safe to do multiple times
trie := fetcher.peek()
if trie == nil {
p.deliveryMissMeter.Mark(1)
return nil
}
return trie
} }
// used marks a batch of state items used to allow creating statistics as to // used marks a batch of state items used to allow creating statistics as to
// how useful or wasteful the prefetcher is. // how useful or wasteful the fetcher is.
func (p *triePrefetcher) used(owner common.Hash, root common.Hash, used [][]byte) { func (p *triePrefetcher) used(owner common.Hash, root common.Hash, used [][]byte) {
if fetcher := p.fetchers[p.trieID(owner, root)]; fetcher != nil { if fetcher := p.fetchers[p.trieID(owner, root)]; fetcher != nil {
fetcher.wait() // ensure the fetcher's idle before poking in its internals
fetcher.used = used fetcher.used = used
} }
} }
@ -218,10 +189,9 @@ type subfetcher struct {
tasks [][]byte // Items queued up for retrieval tasks [][]byte // Items queued up for retrieval
lock sync.Mutex // Lock protecting the task queue lock sync.Mutex // Lock protecting the task queue
wake chan struct{} // Wake channel if a new task is scheduled wake chan struct{} // Wake channel if a new task is scheduled
stop chan struct{} // Channel to interrupt processing stop chan struct{} // Channel to interrupt processing
term chan struct{} // Channel to signal interruption term chan struct{} // Channel to signal interruption
copy chan chan Trie // Channel to request a copy of the current trie
seen map[string]struct{} // Tracks the entries already loaded seen map[string]struct{} // Tracks the entries already loaded
dups int // Number of duplicate preload tasks dups int // Number of duplicate preload tasks
@ -240,7 +210,6 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo
wake: make(chan struct{}, 1), wake: make(chan struct{}, 1),
stop: make(chan struct{}), stop: make(chan struct{}),
term: make(chan struct{}), term: make(chan struct{}),
copy: make(chan chan Trie),
seen: make(map[string]struct{}), seen: make(map[string]struct{}),
} }
go sf.loop() go sf.loop()
@ -248,50 +217,61 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo
} }
// schedule adds a batch of trie keys to the queue to prefetch. // schedule adds a batch of trie keys to the queue to prefetch.
func (sf *subfetcher) schedule(keys [][]byte) { func (sf *subfetcher) schedule(keys [][]byte) error {
// Ensure the subfetcher is still alive
select {
case <-sf.term:
return errTerminated
default:
}
// Append the tasks to the current queue // Append the tasks to the current queue
sf.lock.Lock() sf.lock.Lock()
sf.tasks = append(sf.tasks, keys...) sf.tasks = append(sf.tasks, keys...)
sf.lock.Unlock() sf.lock.Unlock()
// Notify the prefetcher, it's fine if it's already terminated // Notify the background thread to execute scheduled tasks
select { select {
case sf.wake <- struct{}{}: case sf.wake <- struct{}{}:
// Wake signal sent
default: default:
// Wake signal not sent as a previous is already queued
} }
return nil
} }
// peek tries to retrieve a deep copy of the fetcher's trie in whatever form it // wait blocks until the subfetcher terminates. This method is used to block on
// is currently. // an async termination before accessing internal fields from the fetcher.
func (sf *subfetcher) peek() Trie { func (sf *subfetcher) wait() {
ch := make(chan Trie) <-sf.term
select { }
case sf.copy <- ch:
// Subfetcher still alive, return copy from it
return <-ch
case <-sf.term: // peek retrieves the fetcher's trie, populated with any pre-fetched data. The
// Subfetcher already terminated, return a copy directly // returned trie will be a shallow copy, so modifying it will break subsequent
if sf.trie == nil { // peeks for the original data. The method will block until all the scheduled
return nil // data has been loaded and the fethcer terminated.
} func (sf *subfetcher) peek() Trie {
return sf.db.CopyTrie(sf.trie) // Block until the fertcher terminates, then retrieve the trie
} sf.wait()
return sf.trie
} }
// abort interrupts the subfetcher immediately. It is safe to call abort multiple // terminate requests the subfetcher to stop accepting new tasks and spin down
// times but it is not thread safe. // as soon as everything is loaded. Depending on the async parameter, the method
func (sf *subfetcher) abort() { // will either block until all disk loads finish or return immediately.
func (sf *subfetcher) terminate(async bool) {
select { select {
case <-sf.stop: case <-sf.stop:
default: default:
close(sf.stop) close(sf.stop)
} }
if async {
return
}
<-sf.term <-sf.term
} }
// loop waits for new tasks to be scheduled and keeps loading them until it runs // loop loads newly-scheduled trie tasks as they are received and loads them, stopping
// out of tasks or its underlying trie is retrieved for committing. // when requested.
func (sf *subfetcher) loop() { func (sf *subfetcher) loop() {
// No matter how the loop stops, signal anyone waiting that it's terminated // No matter how the loop stops, signal anyone waiting that it's terminated
defer close(sf.term) defer close(sf.term)
@ -305,8 +285,6 @@ func (sf *subfetcher) loop() {
} }
sf.trie = trie sf.trie = trie
} else { } else {
// The trie argument can be nil as verkle doesn't support prefetching
// yet. TODO FIX IT(rjl493456442), otherwise code will panic here.
trie, err := sf.db.OpenStorageTrie(sf.state, sf.addr, sf.root, nil) trie, err := sf.db.OpenStorageTrie(sf.state, sf.addr, sf.root, nil)
if err != nil { if err != nil {
log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err) log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
@ -318,48 +296,38 @@ func (sf *subfetcher) loop() {
for { for {
select { select {
case <-sf.wake: case <-sf.wake:
// Subfetcher was woken up, retrieve any tasks to avoid spinning the lock // Execute all remaining tasks in single run
sf.lock.Lock() sf.lock.Lock()
tasks := sf.tasks tasks := sf.tasks
sf.tasks = nil sf.tasks = nil
sf.lock.Unlock() sf.lock.Unlock()
// Prefetch any tasks until the loop is interrupted for _, task := range tasks {
for i, task := range tasks { if _, ok := sf.seen[string(task)]; ok {
select { sf.dups++
case <-sf.stop: continue
// If termination is requested, add any leftover back and return }
sf.lock.Lock() if len(task) == common.AddressLength {
sf.tasks = append(sf.tasks, tasks[i:]...) sf.trie.GetAccount(common.BytesToAddress(task))
sf.lock.Unlock() } else {
return sf.trie.GetStorage(sf.addr, task)
case ch := <-sf.copy:
// Somebody wants a copy of the current trie, grant them
ch <- sf.db.CopyTrie(sf.trie)
default:
// No termination request yet, prefetch the next entry
if _, ok := sf.seen[string(task)]; ok {
sf.dups++
} else {
if len(task) == common.AddressLength {
sf.trie.GetAccount(common.BytesToAddress(task))
} else {
sf.trie.GetStorage(sf.addr, task)
}
sf.seen[string(task)] = struct{}{}
}
} }
sf.seen[string(task)] = struct{}{}
} }
case ch := <-sf.copy:
// Somebody wants a copy of the current trie, grant them
ch <- sf.db.CopyTrie(sf.trie)
case <-sf.stop: case <-sf.stop:
// Termination is requested, abort and leave remaining tasks // Termination is requested, abort if no more tasks are pending. If
return // there are some, exhaust them first.
sf.lock.Lock()
done := sf.tasks == nil
sf.lock.Unlock()
if done {
return
}
// Some tasks are pending, loop and pick them up (that wake branch
// will be selected eventually, whilst stop remains closed to this
// branch will also run afterwards).
} }
} }
} }

@ -19,7 +19,6 @@ package state
import ( import (
"math/big" "math/big"
"testing" "testing"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
@ -46,68 +45,20 @@ func filledStateDB() *StateDB {
return state return state
} }
func TestCopyAndClose(t *testing.T) { func TestUseAfterTerminate(t *testing.T) {
db := filledStateDB() db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "") prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
skey := common.HexToHash("aaa") skey := common.HexToHash("aaa")
prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()})
prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()})
time.Sleep(1 * time.Second)
a := prefetcher.trie(common.Hash{}, db.originalRoot)
prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()})
b := prefetcher.trie(common.Hash{}, db.originalRoot)
cpy := prefetcher.copy()
cpy.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()})
cpy.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()})
c := cpy.trie(common.Hash{}, db.originalRoot)
prefetcher.close()
cpy2 := cpy.copy()
cpy2.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()})
d := cpy2.trie(common.Hash{}, db.originalRoot)
cpy.close()
cpy2.close()
if a.Hash() != b.Hash() || a.Hash() != c.Hash() || a.Hash() != d.Hash() {
t.Fatalf("Invalid trie, hashes should be equal: %v %v %v %v", a.Hash(), b.Hash(), c.Hash(), d.Hash())
}
}
func TestUseAfterClose(t *testing.T) { if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}); err != nil {
db := filledStateDB() t.Errorf("Prefetch failed before terminate: %v", err)
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
skey := common.HexToHash("aaa")
prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()})
a := prefetcher.trie(common.Hash{}, db.originalRoot)
prefetcher.close()
b := prefetcher.trie(common.Hash{}, db.originalRoot)
if a == nil {
t.Fatal("Prefetching before close should not return nil")
}
if b != nil {
t.Fatal("Trie after close should return nil")
} }
} prefetcher.terminate(false)
func TestCopyClose(t *testing.T) { if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}); err == nil {
db := filledStateDB() t.Errorf("Prefetch succeeded after terminate: %v", err)
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
skey := common.HexToHash("aaa")
prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()})
cpy := prefetcher.copy()
a := prefetcher.trie(common.Hash{}, db.originalRoot)
b := cpy.trie(common.Hash{}, db.originalRoot)
prefetcher.close()
c := prefetcher.trie(common.Hash{}, db.originalRoot)
d := cpy.trie(common.Hash{}, db.originalRoot)
if a == nil {
t.Fatal("Prefetching before close should not return nil")
}
if b == nil {
t.Fatal("Copy trie should return nil")
}
if c != nil {
t.Fatal("Trie after close should return nil")
} }
if d == nil { if _, err := prefetcher.trie(common.Hash{}, db.originalRoot); err != nil {
t.Fatal("Copy trie should not return nil") t.Errorf("Trie retrieval failed after terminate: %v", err)
} }
} }

Loading…
Cancel
Save