trie: parallize committer

pull/30461/head
stevemilk 2 months ago
parent 0dd7e82c0a
commit 7c25f135ef
  1. 4
      core/state/statedb.go
  2. 128
      trie/committer.go
  3. 33
      trie/trie.go
  4. 4
      trie/trienode/node.go
  5. 2
      trie/trienode/node_test.go
  6. 2
      trie/verkle.go

@ -945,7 +945,7 @@ func (s *StateDB) fastDeleteStorage(snaps *snapshot.Tree, addrHash common.Hash,
slots = make(map[common.Hash][]byte)
)
stack := trie.NewStackTrie(func(path []byte, hash common.Hash, blob []byte) {
nodes.AddNode(path, trienode.NewDeleted())
nodes.AddNode(string(path), trienode.NewDeleted())
})
for iter.Next() {
slot := common.CopyBytes(iter.Slot())
@ -991,7 +991,7 @@ func (s *StateDB) slowDeleteStorage(addr common.Address, addrHash common.Hash, r
if it.Hash() == (common.Hash{}) {
continue
}
nodes.AddNode(it.Path(), trienode.NewDeleted())
nodes.AddNode(string(it.Path()), trienode.NewDeleted())
}
if err := it.Error(); err != nil {
return nil, nil, err

@ -18,6 +18,8 @@ package trie
import (
"fmt"
"runtime"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/trie/trienode"
@ -30,28 +32,44 @@ type committer struct {
nodes *trienode.NodeSet
tracer *tracer
collectLeaf bool
parallel bool
}
// newCommitter creates a new committer or picks one from the pool.
func newCommitter(nodeset *trienode.NodeSet, tracer *tracer, collectLeaf bool) *committer {
func newCommitter(nodes *trienode.NodeSet, tracer *tracer, collectLeaf bool, parallel bool) *committer {
return &committer{
nodes: nodeset,
nodes: nodes,
tracer: tracer,
collectLeaf: collectLeaf,
parallel: parallel,
}
}
type wrapNode struct {
node *trienode.Node
path string
leafHash common.Hash // optional, the parent hash of the relative leaf
leafBlob []byte // optional, the blob of the relative leaf
}
// Commit collapses a node down into a hash node.
func (c *committer) Commit(n node) hashNode {
return c.commit(nil, n).(hashNode)
hn, wnodes := c.commit(nil, n, true)
for _, wn := range wnodes {
c.nodes.AddNode(wn.path, wn.node)
if wn.leafHash != (common.Hash{}) {
c.nodes.AddLeaf(wn.leafHash, wn.leafBlob)
}
}
return hn.(hashNode)
}
// commit collapses a node down into a hash node and returns it.
func (c *committer) commit(path []byte, n node) node {
func (c *committer) commit(path []byte, n node, topmost bool) (node, []*wrapNode) {
// if this path is clean, use available cached data
hash, dirty := n.cache()
if hash != nil && !dirty {
return hash
return hash, nil
}
// Commit children, then parent, and remove the dirty flag.
switch cn := n.(type) {
@ -61,38 +79,72 @@ func (c *committer) commit(path []byte, n node) node {
// If the child is fullNode, recursively commit,
// otherwise it can only be hashNode or valueNode.
var nodes []*wrapNode
if _, ok := cn.Val.(*fullNode); ok {
collapsed.Val = c.commit(append(path, cn.Key...), cn.Val)
collapsed.Val, nodes = c.commit(append(path, cn.Key...), cn.Val, false)
}
// The key needs to be copied, since we're adding it to the
// modified nodeset.
collapsed.Key = hexToCompact(cn.Key)
hashedNode := c.store(path, collapsed)
hashedNode, wNode := c.store(path, collapsed)
if wNode != nil {
nodes = append(nodes, wNode)
}
if hn, ok := hashedNode.(hashNode); ok {
return hn
return hn, nodes
}
return collapsed
return collapsed, nodes
case *fullNode:
hashedKids := c.commitChildren(path, cn)
hashedKids, nodes := c.commitChildren(path, cn, topmost && c.parallel)
collapsed := cn.copy()
collapsed.Children = hashedKids
hashedNode := c.store(path, collapsed)
hashedNode, wNode := c.store(path, collapsed)
if wNode != nil {
nodes = append(nodes, wNode)
}
if hn, ok := hashedNode.(hashNode); ok {
return hn
return hn, nodes
}
return collapsed
return collapsed, nodes
case hashNode:
return cn
return cn, nil
default:
// nil, valuenode shouldn't be committed
panic(fmt.Sprintf("%T: invalid node: %v", n, n))
}
}
type task struct {
node node
index int
path []byte
}
// commitChildren commits the children of the given fullnode
func (c *committer) commitChildren(path []byte, n *fullNode) [17]node {
var children [17]node
func (c *committer) commitChildren(path []byte, n *fullNode, parallel bool) ([17]node, []*wrapNode) {
var (
wg sync.WaitGroup
children [17]node
results [16][]*wrapNode
tasks = make(chan task)
)
if parallel {
worker := func() {
defer wg.Done()
for t := range tasks {
children[t.index], results[t.index] = c.commit(t.path, t.node, false)
}
}
threads := runtime.NumCPU()
if threads > 16 {
threads = 16
}
for i := 0; i < threads; i++ {
wg.Add(1)
go worker()
}
}
for i := 0; i < 16; i++ {
child := n.Children[i]
if child == nil {
@ -108,18 +160,36 @@ func (c *committer) commitChildren(path []byte, n *fullNode) [17]node {
// Commit the child recursively and store the "hashed" value.
// Note the returned node can be some embedded nodes, so it's
// possible the type is not hashNode.
children[i] = c.commit(append(path, byte(i)), child)
if !parallel {
children[i], results[i] = c.commit(append(path, byte(i)), child, false)
} else {
tasks <- task{
index: i,
node: child,
path: append(path, byte(i)),
}
}
}
if parallel {
close(tasks)
wg.Wait()
}
// For the 17th child, it's possible the type is valuenode.
if n.Children[16] != nil {
children[16] = n.Children[16]
}
return children
var wnodes []*wrapNode
for i := 0; i < 16; i++ {
if results[i] != nil {
wnodes = append(wnodes, results[i]...)
}
}
return children, wnodes
}
// store hashes the node n and adds it to the modified nodeset. If leaf collection
// is enabled, leaf nodes will be tracked in the modified nodeset as well.
func (c *committer) store(path []byte, n node) node {
func (c *committer) store(path []byte, n node) (node, *wrapNode) {
// Larger nodes are replaced by their hash and stored in the database.
var hash, _ = n.cache()
@ -133,25 +203,33 @@ func (c *committer) store(path []byte, n node) node {
// deleted only if the node was existent in database before.
_, ok := c.tracer.accessList[string(path)]
if ok {
c.nodes.AddNode(path, trienode.NewDeleted())
return n, &wrapNode{
path: string(path),
node: trienode.NewDeleted(),
}
}
return n
return n, nil
}
// Collect the dirty node to nodeset for return.
nhash := common.BytesToHash(hash)
c.nodes.AddNode(path, trienode.New(nhash, nodeToBytes(n)))
wNode := &wrapNode{
path: string(path),
node: trienode.New(nhash, nodeToBytes(n)),
}
// Collect the corresponding leaf node if it's required. We don't check
// full node since it's impossible to store value in fullNode. The key
// length of leaves should be exactly same.
// length of leaves should be exactly same..
if c.collectLeaf {
if sn, ok := n.(*shortNode); ok {
if val, ok := sn.Val.(valueNode); ok {
c.nodes.AddLeaf(nhash, val)
wNode.leafHash = nhash
wNode.leafBlob = val
}
}
}
return hash
return hash, wNode
}
// ForGatherChildren decodes the provided node and traverses the children inside.

@ -44,16 +44,17 @@ type Trie struct {
// trie is not usable(latest states is invisible).
committed bool
// Keep track of the number leaves which have been inserted since the last
// hashing operation. This number will not directly map to the number of
// actually unhashed nodes.
unhashed int
// reader is the handler trie can retrieve nodes from.
reader *trieReader
// tracer is the tool to track the trie changes.
tracer *tracer
// The number of trie mutations that have been performed
mutate int
// The number of mutations that have been hashed
hashed int
}
// newFlag returns the cache flag value for a newly created node.
@ -67,9 +68,10 @@ func (t *Trie) Copy() *Trie {
root: t.root,
owner: t.owner,
committed: t.committed,
unhashed: t.unhashed,
reader: t.reader,
tracer: t.tracer.copy(),
mutate: t.mutate,
hashed: t.hashed,
}
}
@ -304,11 +306,12 @@ func (t *Trie) Update(key, value []byte) error {
if t.committed {
return ErrCommitted
}
t.mutate++
return t.update(key, value)
}
func (t *Trie) update(key, value []byte) error {
t.unhashed++
t.mutate++
k := keybytesToHex(key)
if len(value) != 0 {
_, n, err := t.insert(t.root, nil, k, valueNode(value))
@ -422,7 +425,7 @@ func (t *Trie) Delete(key []byte) error {
if t.committed {
return ErrCommitted
}
t.unhashed++
t.mutate++
k := keybytesToHex(key)
_, n, err := t.delete(t.root, nil, k)
if err != nil {
@ -622,7 +625,7 @@ func (t *Trie) Commit(collectLeaf bool) (common.Hash, *trienode.NodeSet) {
}
nodes := trienode.NewNodeSet(t.owner)
for _, path := range paths {
nodes.AddNode([]byte(path), trienode.NewDeleted())
nodes.AddNode(path, trienode.NewDeleted())
}
return types.EmptyRootHash, nodes // case (b)
}
@ -640,9 +643,10 @@ func (t *Trie) Commit(collectLeaf bool) (common.Hash, *trienode.NodeSet) {
}
nodes := trienode.NewNodeSet(t.owner)
for _, path := range t.tracer.deletedNodes() {
nodes.AddNode([]byte(path), trienode.NewDeleted())
nodes.AddNode(path, trienode.NewDeleted())
}
t.root = newCommitter(nodes, t.tracer, collectLeaf).Commit(t.root)
t.root = newCommitter(nodes, t.tracer, collectLeaf, t.mutate > 100).Commit(t.root)
t.mutate = 0
return rootHash, nodes
}
@ -652,10 +656,10 @@ func (t *Trie) hashRoot() (node, node) {
return hashNode(types.EmptyRootHash.Bytes()), nil
}
// If the number of changes is below 100, we let one thread handle it
h := newHasher(t.unhashed >= 100)
h := newHasher(t.mutate-t.hashed >= 100)
defer func() {
returnHasherToPool(h)
t.unhashed = 0
t.hashed = t.mutate
}()
hashed, cached := h.hash(t.root, true)
return hashed, cached
@ -677,7 +681,8 @@ func (t *Trie) Witness() map[string]struct{} {
func (t *Trie) Reset() {
t.root = nil
t.owner = common.Hash{}
t.unhashed = 0
t.tracer.reset()
t.committed = false
t.hashed = 0
t.mutate = 0
}

@ -90,13 +90,13 @@ func (set *NodeSet) ForEachWithOrder(callback func(path string, n *Node)) {
}
// AddNode adds the provided node into set.
func (set *NodeSet) AddNode(path []byte, n *Node) {
func (set *NodeSet) AddNode(path string, n *Node) {
if n.IsDeleted() {
set.deletes += 1
} else {
set.updates += 1
}
set.Nodes[string(path)] = n
set.Nodes[path] = n
}
// Merge adds a set of nodes into the set.

@ -42,7 +42,7 @@ func benchmarkMerge(b *testing.B, count int) {
blob := make([]byte, 32)
rand.Read(blob)
hash := crypto.Keccak256Hash(blob)
s.AddNode(path, New(hash, blob))
s.AddNode(string(path), New(hash, blob))
}
for i := 0; i < count; i++ {
// Random path of 4 nibbles

@ -258,7 +258,7 @@ func (t *VerkleTrie) Commit(_ bool) (common.Hash, *trienode.NodeSet) {
nodeset := trienode.NewNodeSet(common.Hash{})
for _, node := range nodes {
// Hash parameter is not used in pathdb
nodeset.AddNode(node.Path, trienode.New(common.Hash{}, node.SerializedBytes))
nodeset.AddNode(string(node.Path), trienode.New(common.Hash{}, node.SerializedBytes))
}
// Serialize root commitment form
return t.Hash(), nodeset

Loading…
Cancel
Save