core/state: parallelise parts of state commit (#29681)

* core/state, internal/workerpool: parallelize parts of state commit

* core, internal: move workerpool into syncx

* core/state: use errgroups, commit accounts concurrently

* core: resurrect detailed commit timers to almost-accuracy
pull/29696/head
Péter Szilágyi 5 months ago committed by GitHub
parent 9f96e07c1c
commit 682ee820fa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      core/blockchain.go
  2. 3
      core/state/state_object.go
  3. 98
      core/state/statedb.go

@ -1963,7 +1963,7 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s
snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them
triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them
blockWriteTimer.Update(time.Since(wstart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits - statedb.TrieDBCommits) blockWriteTimer.Update(time.Since(wstart) - max(statedb.AccountCommits, statedb.StorageCommits) /* concurrent */ - statedb.SnapshotCommits - statedb.TrieDBCommits)
blockInsertTimer.UpdateSince(start) blockInsertTimer.UpdateSince(start)
return &blockProcessingResult{usedGas: usedGas, procTime: proctime, status: status}, nil return &blockProcessingResult{usedGas: usedGas, procTime: proctime, status: status}, nil

@ -403,6 +403,9 @@ func (s *stateObject) updateRoot() {
// commit obtains a set of dirty storage trie nodes and updates the account data. // commit obtains a set of dirty storage trie nodes and updates the account data.
// The returned set can be nil if nothing to commit. This function assumes all // The returned set can be nil if nothing to commit. This function assumes all
// storage mutations have already been flushed into trie by updateRoot. // storage mutations have already been flushed into trie by updateRoot.
//
// Note, commit may run concurrently across all the state objects. Do not assume
// thread-safe access to the statedb.
func (s *stateObject) commit() (*trienode.NodeSet, error) { func (s *stateObject) commit() (*trienode.NodeSet, error) {
// Short circuit if trie is not even loaded, don't bother with committing anything // Short circuit if trie is not even loaded, don't bother with committing anything
if s.trie == nil { if s.trie == nil {

@ -23,6 +23,7 @@ import (
"math/big" "math/big"
"slices" "slices"
"sort" "sort"
"sync"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -37,6 +38,7 @@ import (
"github.com/ethereum/go-ethereum/trie/trienode" "github.com/ethereum/go-ethereum/trie/trienode"
"github.com/ethereum/go-ethereum/trie/triestate" "github.com/ethereum/go-ethereum/trie/triestate"
"github.com/holiman/uint256" "github.com/holiman/uint256"
"golang.org/x/sync/errgroup"
) )
type revision struct { type revision struct {
@ -1146,66 +1148,108 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er
storageTrieNodesUpdated int storageTrieNodesUpdated int
storageTrieNodesDeleted int storageTrieNodesDeleted int
nodes = trienode.NewMergedNodeSet() nodes = trienode.NewMergedNodeSet()
codeWriter = s.db.DiskDB().NewBatch()
) )
// Handle all state deletions first // Handle all state deletions first
if err := s.handleDestruction(nodes); err != nil { if err := s.handleDestruction(nodes); err != nil {
return common.Hash{}, err return common.Hash{}, err
} }
// Handle all state updates afterwards // Handle all state updates afterwards, concurrently to one another to shave
// off some milliseconds from the commit operation. Also accumulate the code
// writes to run in parallel with the computations.
start := time.Now() start := time.Now()
var (
code = s.db.DiskDB().NewBatch()
lock sync.Mutex
root common.Hash
workers errgroup.Group
)
// Schedule the account trie first since that will be the biggest, so give
// it the most time to crunch.
//
// TODO(karalabe): This account trie commit is *very* heavy. 5-6ms at chain
// heads, which seems excessive given that it doesn't do hashing, it just
// shuffles some data. For comparison, the *hashing* at chain head is 2-3ms.
// We need to investigate what's happening as it seems something's wonky.
// Obviously it's not an end of the world issue, just something the original
// code didn't anticipate for.
workers.Go(func() error {
// Write the account trie changes, measuring the amount of wasted time
newroot, set, err := s.trie.Commit(true)
if err != nil {
return err
}
root = newroot
// Merge the dirty nodes of account trie into global set
lock.Lock()
defer lock.Unlock()
if set != nil {
if err = nodes.Merge(set); err != nil {
return err
}
accountTrieNodesUpdated, accountTrieNodesDeleted = set.Size()
}
s.AccountCommits = time.Since(start)
return nil
})
// Schedule each of the storage tries that need to be updated, so they can
// run concurrently to one another.
//
// TODO(karalabe): Experimentally, the account commit takes approximately the
// same time as all the storage commits combined, so we could maybe only have
// 2 threads in total. But that kind of depends on the account commit being
// more expensive than it should be, so let's fix that and revisit this todo.
for addr, op := range s.mutations { for addr, op := range s.mutations {
if op.isDelete() { if op.isDelete() {
continue continue
} }
obj := s.stateObjects[addr]
// Write any contract code associated with the state object // Write any contract code associated with the state object
obj := s.stateObjects[addr]
if obj.code != nil && obj.dirtyCode { if obj.code != nil && obj.dirtyCode {
rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code) rawdb.WriteCode(code, common.BytesToHash(obj.CodeHash()), obj.code)
obj.dirtyCode = false obj.dirtyCode = false
} }
// Run the storage updates concurrently to one another
workers.Go(func() error {
// Write any storage changes in the state object to its storage trie // Write any storage changes in the state object to its storage trie
set, err := obj.commit() set, err := obj.commit()
if err != nil { if err != nil {
return common.Hash{}, err return err
} }
// Merge the dirty nodes of storage trie into global set. It is possible // Merge the dirty nodes of storage trie into global set. It is possible
// that the account was destructed and then resurrected in the same block. // that the account was destructed and then resurrected in the same block.
// In this case, the node set is shared by both accounts. // In this case, the node set is shared by both accounts.
lock.Lock()
defer lock.Unlock()
if set != nil { if set != nil {
if err := nodes.Merge(set); err != nil { if err = nodes.Merge(set); err != nil {
return common.Hash{}, err return err
} }
updates, deleted := set.Size() updates, deleted := set.Size()
storageTrieNodesUpdated += updates storageTrieNodesUpdated += updates
storageTrieNodesDeleted += deleted storageTrieNodesDeleted += deleted
} }
s.StorageCommits = time.Since(start) // overwrite with the longest storage commit runtime
return nil
})
} }
s.StorageCommits += time.Since(start) // Schedule the code commits to run concurrently too. This shouldn't really
// take much since we don't often commit code, but since it's disk access,
if codeWriter.ValueSize() > 0 { // it's always yolo.
if err := codeWriter.Write(); err != nil { workers.Go(func() error {
if code.ValueSize() > 0 {
if err := code.Write(); err != nil {
log.Crit("Failed to commit dirty codes", "error", err) log.Crit("Failed to commit dirty codes", "error", err)
} }
} }
// Write the account trie changes, measuring the amount of wasted time return nil
start = time.Now() })
// Wait for everything to finish and update the metrics
root, set, err := s.trie.Commit(true) if err := workers.Wait(); err != nil {
if err != nil {
return common.Hash{}, err
}
// Merge the dirty nodes of account trie into global set
if set != nil {
if err := nodes.Merge(set); err != nil {
return common.Hash{}, err return common.Hash{}, err
} }
accountTrieNodesUpdated, accountTrieNodesDeleted = set.Size()
}
// Report the commit metrics
s.AccountCommits += time.Since(start)
accountUpdatedMeter.Mark(int64(s.AccountUpdated)) accountUpdatedMeter.Mark(int64(s.AccountUpdated))
storageUpdatedMeter.Mark(int64(s.StorageUpdated)) storageUpdatedMeter.Mark(int64(s.StorageUpdated))
accountDeletedMeter.Mark(int64(s.AccountDeleted)) accountDeletedMeter.Mark(int64(s.AccountDeleted))

Loading…
Cancel
Save