trie: concurrent commit (#30545)

This change makes the trie commit operation concurrent, if the number of changes exceed 100. 

Co-authored-by: stevemilk <wangpeculiar@gmail.com>
Co-authored-by: Gary Rong <garyrong0905@gmail.com>
release/1.14
Martin HS 1 month ago
parent 912a13700c
commit 118ed442c6
  1. 38
      trie/committer.go
  2. 13
      trie/trie.go
  3. 104
      trie/trie_test.go
  4. 18
      trie/trienode/node.go

@ -18,6 +18,7 @@ package trie
import (
"fmt"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/trie/trienode"
@ -42,12 +43,12 @@ func newCommitter(nodeset *trienode.NodeSet, tracer *tracer, collectLeaf bool) *
}
// Commit collapses a node down into a hash node.
func (c *committer) Commit(n node) hashNode {
return c.commit(nil, n).(hashNode)
func (c *committer) Commit(n node, parallel bool) hashNode {
return c.commit(nil, n, parallel).(hashNode)
}
// commit collapses a node down into a hash node and returns it.
func (c *committer) commit(path []byte, n node) node {
func (c *committer) commit(path []byte, n node, parallel bool) node {
// if this path is clean, use available cached data
hash, dirty := n.cache()
if hash != nil && !dirty {
@ -62,7 +63,7 @@ func (c *committer) commit(path []byte, n node) node {
// If the child is fullNode, recursively commit,
// otherwise it can only be hashNode or valueNode.
if _, ok := cn.Val.(*fullNode); ok {
collapsed.Val = c.commit(append(path, cn.Key...), cn.Val)
collapsed.Val = c.commit(append(path, cn.Key...), cn.Val, false)
}
// The key needs to be copied, since we're adding it to the
// modified nodeset.
@ -73,7 +74,7 @@ func (c *committer) commit(path []byte, n node) node {
}
return collapsed
case *fullNode:
hashedKids := c.commitChildren(path, cn)
hashedKids := c.commitChildren(path, cn, parallel)
collapsed := cn.copy()
collapsed.Children = hashedKids
@ -91,8 +92,12 @@ func (c *committer) commit(path []byte, n node) node {
}
// commitChildren commits the children of the given fullnode
func (c *committer) commitChildren(path []byte, n *fullNode) [17]node {
var children [17]node
func (c *committer) commitChildren(path []byte, n *fullNode, parallel bool) [17]node {
var (
wg sync.WaitGroup
nodesMu sync.Mutex
children [17]node
)
for i := 0; i < 16; i++ {
child := n.Children[i]
if child == nil {
@ -108,7 +113,24 @@ func (c *committer) commitChildren(path []byte, n *fullNode) [17]node {
// Commit the child recursively and store the "hashed" value.
// Note the returned node can be some embedded nodes, so it's
// possible the type is not hashNode.
children[i] = c.commit(append(path, byte(i)), child)
if !parallel {
children[i] = c.commit(append(path, byte(i)), child, false)
} else {
wg.Add(1)
go func(index int) {
p := append(path, byte(index))
childSet := trienode.NewNodeSet(c.nodes.Owner)
childCommitter := newCommitter(childSet, c.tracer, c.collectLeaf)
children[index] = childCommitter.commit(p, child, false)
nodesMu.Lock()
c.nodes.MergeSet(childSet)
nodesMu.Unlock()
wg.Done()
}(i)
}
}
if parallel {
wg.Wait()
}
// For the 17th child, it's possible the type is valuenode.
if n.Children[16] != nil {

@ -49,6 +49,9 @@ type Trie struct {
// actually unhashed nodes.
unhashed int
// uncommitted is the number of updates since last commit.
uncommitted int
// reader is the handler trie can retrieve nodes from.
reader *trieReader
@ -67,9 +70,10 @@ func (t *Trie) Copy() *Trie {
root: t.root,
owner: t.owner,
committed: t.committed,
unhashed: t.unhashed,
reader: t.reader,
tracer: t.tracer.copy(),
uncommitted: t.uncommitted,
unhashed: t.unhashed,
}
}
@ -309,6 +313,7 @@ func (t *Trie) Update(key, value []byte) error {
func (t *Trie) update(key, value []byte) error {
t.unhashed++
t.uncommitted++
k := keybytesToHex(key)
if len(value) != 0 {
_, n, err := t.insert(t.root, nil, k, valueNode(value))
@ -422,6 +427,7 @@ func (t *Trie) Delete(key []byte) error {
if t.committed {
return ErrCommitted
}
t.uncommitted++
t.unhashed++
k := keybytesToHex(key)
_, n, err := t.delete(t.root, nil, k)
@ -642,7 +648,9 @@ func (t *Trie) Commit(collectLeaf bool) (common.Hash, *trienode.NodeSet) {
for _, path := range t.tracer.deletedNodes() {
nodes.AddNode([]byte(path), trienode.NewDeleted())
}
t.root = newCommitter(nodes, t.tracer, collectLeaf).Commit(t.root)
// If the number of changes is below 100, we let one thread handle it
t.root = newCommitter(nodes, t.tracer, collectLeaf).Commit(t.root, t.uncommitted > 100)
t.uncommitted = 0
return rootHash, nodes
}
@ -678,6 +686,7 @@ func (t *Trie) Reset() {
t.root = nil
t.owner = common.Hash{}
t.unhashed = 0
t.uncommitted = 0
t.tracer.reset()
t.committed = false
}

@ -26,6 +26,7 @@ import (
"math/rand"
"reflect"
"sort"
"strings"
"testing"
"testing/quick"
@ -35,6 +36,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/internal/testrand"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie/trienode"
"github.com/holiman/uint256"
@ -1206,3 +1208,105 @@ func FuzzTrie(f *testing.F) {
}
})
}
func BenchmarkCommit(b *testing.B) {
benchmarkCommit(b, 100)
benchmarkCommit(b, 500)
benchmarkCommit(b, 2000)
benchmarkCommit(b, 5000)
}
func benchmarkCommit(b *testing.B, n int) {
b.Run(fmt.Sprintf("commit-%vnodes-sequential", n), func(b *testing.B) {
testCommit(b, n, false)
})
b.Run(fmt.Sprintf("commit-%vnodes-parallel", n), func(b *testing.B) {
testCommit(b, n, true)
})
}
func testCommit(b *testing.B, n int, parallel bool) {
tries := make([]*Trie, b.N)
for i := 0; i < b.N; i++ {
tries[i] = NewEmpty(nil)
for j := 0; j < n; j++ {
key := testrand.Bytes(32)
val := testrand.Bytes(32)
tries[i].Update(key, val)
}
tries[i].Hash()
if !parallel {
tries[i].uncommitted = 0
}
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < len(tries); i++ {
tries[i].Commit(true)
}
}
func TestCommitCorrect(t *testing.T) {
var paraTrie = NewEmpty(nil)
var refTrie = NewEmpty(nil)
for j := 0; j < 5000; j++ {
key := testrand.Bytes(32)
val := testrand.Bytes(32)
paraTrie.Update(key, val)
refTrie.Update(common.CopyBytes(key), common.CopyBytes(val))
}
paraTrie.Hash()
refTrie.Hash()
refTrie.uncommitted = 0
haveRoot, haveNodes := paraTrie.Commit(true)
wantRoot, wantNodes := refTrie.Commit(true)
if haveRoot != wantRoot {
t.Fatalf("have %x want %x", haveRoot, wantRoot)
}
have := printSet(haveNodes)
want := printSet(wantNodes)
if have != want {
i := 0
for i = 0; i < len(have); i++ {
if have[i] != want[i] {
break
}
}
if i > 100 {
i -= 100
}
t.Fatalf("have != want\nhave %q\nwant %q", have[i:], want[i:])
}
}
func printSet(set *trienode.NodeSet) string {
var out = new(strings.Builder)
fmt.Fprintf(out, "nodeset owner: %v\n", set.Owner)
var paths []string
for k := range set.Nodes {
paths = append(paths, k)
}
sort.Strings(paths)
for _, path := range paths {
n := set.Nodes[path]
// Deletion
if n.IsDeleted() {
fmt.Fprintf(out, " [-]: %x\n", path)
continue
}
// Insertion or update
fmt.Fprintf(out, " [+/*]: %x -> %v \n", path, n.Hash)
}
sort.Slice(set.Leaves, func(i, j int) bool {
a := set.Leaves[i]
b := set.Leaves[j]
return bytes.Compare(a.Parent[:], b.Parent[:]) < 0
})
for _, n := range set.Leaves {
fmt.Fprintf(out, "[leaf]: %v\n", n)
}
return out.String()
}

@ -18,6 +18,7 @@ package trienode
import (
"fmt"
"maps"
"sort"
"strings"
@ -99,6 +100,23 @@ func (set *NodeSet) AddNode(path []byte, n *Node) {
set.Nodes[string(path)] = n
}
// MergeSet merges this 'set' with 'other'. It assumes that the sets are disjoint,
// and thus does not deduplicate data (count deletes, dedup leaves etc).
func (set *NodeSet) MergeSet(other *NodeSet) error {
if set.Owner != other.Owner {
return fmt.Errorf("nodesets belong to different owner are not mergeable %x-%x", set.Owner, other.Owner)
}
maps.Copy(set.Nodes, other.Nodes)
set.deletes += other.deletes
set.updates += other.updates
// Since we assume the sets are disjoint, we can safely append leaves
// like this without deduplication.
set.Leaves = append(set.Leaves, other.Leaves...)
return nil
}
// Merge adds a set of nodes into the set.
func (set *NodeSet) Merge(owner common.Hash, nodes map[string]*Node) error {
if set.Owner != owner {

Loading…
Cancel
Save