From 72f40bfe858c57ff218bf5c00cca3b6ffaa15ada Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Thu, 22 Aug 2024 15:05:28 +0800 Subject: [PATCH] triedb/pathdb: clean cache for states --- triedb/pathdb/buffer.go | 6 +-- triedb/pathdb/database.go | 2 +- triedb/pathdb/disklayer.go | 94 +++++++++++++++++++++++++--------- triedb/pathdb/flush.go | 21 +++++++- triedb/pathdb/generate.go | 2 +- triedb/pathdb/iterator_test.go | 4 +- triedb/pathdb/journal.go | 4 +- triedb/pathdb/metrics.go | 14 +++-- triedb/pathdb/states.go | 5 +- 9 files changed, 110 insertions(+), 42 deletions(-) diff --git a/triedb/pathdb/buffer.go b/triedb/pathdb/buffer.go index 670db47c42..5a61ff705d 100644 --- a/triedb/pathdb/buffer.go +++ b/triedb/pathdb/buffer.go @@ -133,7 +133,7 @@ func (b *buffer) allocBatch(db ethdb.KeyValueStore) ethdb.Batch { // flush persists the in-memory dirty trie node into the disk if the configured // memory threshold is reached. Note, all data must be written atomically. -func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, progress []byte, clean *fastcache.Cache, id uint64) error { +func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, progress []byte, nodesCache, statesCache *fastcache.Cache, id uint64) error { // Ensure the target state id is aligned with the internal counter. head := rawdb.ReadPersistentStateID(db) if head+b.layers != id { @@ -144,8 +144,8 @@ func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, progress []byte start = time.Now() batch = b.allocBatch(db) ) - nodes := b.nodes.write(batch, b.nodes.nodes, clean) - accounts, slots := b.states.write(db, batch, progress) + nodes := b.nodes.write(batch, b.nodes.nodes, nodesCache) + accounts, slots := b.states.write(db, batch, progress, statesCache) rawdb.WritePersistentStateID(batch, id) rawdb.WriteSnapshotRoot(batch, root) diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index 4c78825e67..c8e0684f46 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -42,7 +42,7 @@ const ( // Too large node buffer will cause the system to pause for a long // time when write happens. Also, the largest batch that pebble can // support is 4GB, node will panic if batch size exceeds this limit. - maxBufferSize = 512 * 1024 * 1024 + maxBufferSize = 256 * 1024 * 1024 // defaultBufferSize is the default memory allowance of node buffer // that aggregates the writes from above until it's flushed into the diff --git a/triedb/pathdb/disklayer.go b/triedb/pathdb/disklayer.go index e63575c39c..e74be8c503 100644 --- a/triedb/pathdb/disklayer.go +++ b/triedb/pathdb/disklayer.go @@ -33,7 +33,8 @@ type diskLayer struct { root common.Hash // Immutable, root hash to which this layer was made for id uint64 // Immutable, corresponding state id db *Database // Path-based trie database - cleans *fastcache.Cache // GC friendly memory cache of clean nodes and states + nodes *fastcache.Cache // GC friendly memory cache of clean nodes + states *fastcache.Cache // GC friendly memory cache of clean states buffer *buffer // Dirty buffer to aggregate writes of nodes and states stale bool // Signals that the layer became stale (state progressed) lock sync.RWMutex // Lock used to protect stale flag and genMarker @@ -43,18 +44,22 @@ type diskLayer struct { } // newDiskLayer creates a new disk layer based on the passing arguments. -func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer *buffer) *diskLayer { - // Initialize a clean cache if the memory allowance is not zero - // or reuse the provided cache if it is not nil (inherited from +func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Cache, states *fastcache.Cache, buffer *buffer) *diskLayer { + // Initialize the clean caches if the memory allowance is not zero + // or reuse the provided caches if they are not nil (inherited from // the original disk layer). - if cleans == nil && db.config.CleanCacheSize != 0 { - cleans = fastcache.New(db.config.CleanCacheSize) + if nodes == nil && db.config.CleanCacheSize != 0 { + nodes = fastcache.New(db.config.CleanCacheSize / 2) + } + if states == nil && db.config.CleanCacheSize != 0 { + states = fastcache.New(db.config.CleanCacheSize / 2) } return &diskLayer{ root: root, id: id, db: db, - cleans: cleans, + nodes: nodes, + states: states, buffer: buffer, } } @@ -129,13 +134,13 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co defer h.release() key := cacheKey(owner, path) - if dl.cleans != nil { - if blob := dl.cleans.Get(nil, key); len(blob) > 0 { - cleanHitMeter.Mark(1) - cleanReadMeter.Mark(int64(len(blob))) + if dl.nodes != nil { + if blob := dl.nodes.Get(nil, key); len(blob) > 0 { + cleanNodeHitMeter.Mark(1) + cleanNodeReadMeter.Mark(int64(len(blob))) return blob, h.hash(blob), &nodeLoc{loc: locCleanCache, depth: depth}, nil } - cleanMissMeter.Mark(1) + cleanNodeMissMeter.Mark(1) } // Try to retrieve the trie node from the disk. var blob []byte @@ -144,9 +149,9 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co } else { blob = rawdb.ReadStorageTrieNode(dl.db.diskdb, owner, path) } - if dl.cleans != nil && len(blob) > 0 { - dl.cleans.Set(key, blob) - cleanWriteMeter.Mark(int64(len(blob))) + if dl.nodes != nil && len(blob) > 0 { + dl.nodes.Set(key, blob) + cleanNodeWriteMeter.Mark(int64(len(blob))) } return blob, h.hash(blob), &nodeLoc{loc: locDiskLayer, depth: depth}, nil } @@ -181,8 +186,26 @@ func (dl *diskLayer) account(hash common.Hash, depth int) ([]byte, error) { if marker != nil && bytes.Compare(hash.Bytes(), marker) > 0 { return nil, errNotCoveredYet } + // Try to retrieve the account from the memory cache + if dl.states != nil { + if blob, found := dl.states.HasGet(nil, hash[:]); found { + cleanStateHitMeter.Mark(1) + cleanStateReadMeter.Mark(int64(len(blob))) + return blob, nil + } + cleanStateMissMeter.Mark(1) + } // Try to retrieve the account from the disk. - return rawdb.ReadAccountSnapshot(dl.db.diskdb, hash), nil + blob = rawdb.ReadAccountSnapshot(dl.db.diskdb, hash) + if dl.states != nil { + dl.states.Set(hash[:], blob) + if n := len(blob); n > 0 { + cleanStateWriteMeter.Mark(int64(n)) + } else { + cleanStateInexMeter.Mark(1) + } + } + return blob, nil } // storage directly retrieves the storage data associated with a particular hash, @@ -213,8 +236,26 @@ func (dl *diskLayer) storage(accountHash, storageHash common.Hash, depth int) ([ if marker != nil && bytes.Compare(key, marker) > 0 { return nil, errNotCoveredYet } + // Try to retrieve the storage slot from the memory cache + if dl.states != nil { + if blob, found := dl.states.HasGet(nil, key); found { + cleanStateHitMeter.Mark(1) + cleanStateReadMeter.Mark(int64(len(blob))) + return blob, nil + } + cleanStateMissMeter.Mark(1) + } // Try to retrieve the account from the disk. - return rawdb.ReadStorageSnapshot(dl.db.diskdb, accountHash, storageHash), nil + blob := rawdb.ReadStorageSnapshot(dl.db.diskdb, accountHash, storageHash) + if dl.states != nil { + dl.states.Set(key, blob) + if n := len(blob); n > 0 { + cleanStateWriteMeter.Mark(int64(n)) + } else { + cleanStateInexMeter.Mark(1) + } + } + return blob, nil } // update implements the layer interface, returning a new diff layer on top @@ -285,7 +326,7 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) { } // Flush the trie data and state data into the combined buffer. Any state after // the progress marker will be ignored, as the generator will pick it up later. - if err := combined.flush(bottom.root, dl.db.diskdb, progress, dl.cleans, bottom.stateID()); err != nil { + if err := combined.flush(bottom.root, dl.db.diskdb, progress, dl.nodes, dl.states, bottom.stateID()); err != nil { return nil, err } // Relaunch the state snapshot generation if it's not done yet @@ -294,7 +335,7 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) { log.Info("Resumed state snapshot generation", "root", bottom.root) } } - ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, combined) + ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.nodes, dl.states, combined) // Link the generator if snapshot is not yet completed if dl.generator != nil && !dl.generator.completed() { @@ -351,7 +392,7 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) { if err != nil { return nil, err } - ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer) + ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer) // Link the generator if it exists if dl.generator != nil { @@ -366,8 +407,8 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) { progress = dl.generator.progressMarker() } batch := dl.db.diskdb.NewBatch() - writeNodes(batch, nodes, dl.cleans) - writeStates(dl.db.diskdb, batch, progress, nil, accounts, storages) + writeNodes(batch, nodes, dl.nodes) + writeStates(dl.db.diskdb, batch, progress, nil, accounts, storages, dl.states) rawdb.WritePersistentStateID(batch, dl.id-1) rawdb.WriteSnapshotRoot(batch, h.meta.parent) if err := batch.Write(); err != nil { @@ -375,7 +416,7 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) { } // Link the generator and resume generation if the snapshot is not yet // fully completed. - ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer) + ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer) if dl.generator != nil && !dl.generator.completed() { ndl.generator = dl.generator ndl.generator.run(h.meta.parent) @@ -404,8 +445,11 @@ func (dl *diskLayer) resetCache() { if dl.stale { return } - if dl.cleans != nil { - dl.cleans.Reset() + if dl.nodes != nil { + dl.nodes.Reset() + } + if dl.states != nil { + dl.states.Reset() } } diff --git a/triedb/pathdb/flush.go b/triedb/pathdb/flush.go index 3f924d84bf..19151605e5 100644 --- a/triedb/pathdb/flush.go +++ b/triedb/pathdb/flush.go @@ -66,7 +66,7 @@ func writeNodes(batch ethdb.Batch, nodes map[common.Hash]map[string]*trienode.No } // writeStates flushes state mutations into the provided database batch as a whole. -func writeStates(db ethdb.KeyValueStore, batch ethdb.Batch, genMarker []byte, destructSet map[common.Hash]struct{}, accountData map[common.Hash][]byte, storageData map[common.Hash]map[common.Hash][]byte) (int, int) { +func writeStates(db ethdb.KeyValueStore, batch ethdb.Batch, genMarker []byte, destructSet map[common.Hash]struct{}, accountData map[common.Hash][]byte, storageData map[common.Hash]map[common.Hash][]byte, clean *fastcache.Cache) (int, int) { var ( accounts int slots int @@ -78,11 +78,16 @@ func writeStates(db ethdb.KeyValueStore, batch ethdb.Batch, genMarker []byte, de } rawdb.DeleteAccountSnapshot(batch, addrHash) accounts += 1 - + if clean != nil { + clean.Set(addrHash[:], nil) + } it := rawdb.IterateStorageSnapshots(db, addrHash) for it.Next() { batch.Delete(it.Key()) slots += 1 + if clean != nil { + clean.Del(it.Key()[1:]) + } } it.Release() } @@ -94,8 +99,14 @@ func writeStates(db ethdb.KeyValueStore, batch ethdb.Batch, genMarker []byte, de accounts += 1 if len(blob) == 0 { rawdb.DeleteAccountSnapshot(batch, addrHash) + if clean != nil { + clean.Set(addrHash[:], nil) + } } else { rawdb.WriteAccountSnapshot(batch, addrHash, blob) + if clean != nil { + clean.Set(addrHash[:], blob) + } } } for addrHash, storages := range storageData { @@ -113,8 +124,14 @@ func writeStates(db ethdb.KeyValueStore, batch ethdb.Batch, genMarker []byte, de slots += 1 if len(blob) == 0 { rawdb.DeleteStorageSnapshot(batch, addrHash, storageHash) + if clean != nil { + clean.Set(append(addrHash[:], storageHash[:]...), nil) + } } else { rawdb.WriteStorageSnapshot(batch, addrHash, storageHash, blob) + if clean != nil { + clean.Set(append(addrHash[:], storageHash[:]...), blob) + } } } } diff --git a/triedb/pathdb/generate.go b/triedb/pathdb/generate.go index 84e3c5b87b..21d46a8d1d 100644 --- a/triedb/pathdb/generate.go +++ b/triedb/pathdb/generate.go @@ -175,7 +175,7 @@ func generateSnapshot(triedb *Database, root common.Hash) *diskLayer { stats = &generatorStats{start: time.Now()} genMarker = []byte{} // Initialized but empty! ) - dl := newDiskLayer(root, 0, triedb, nil, newBuffer(triedb.bufferSize, nil, nil, 0)) + dl := newDiskLayer(root, 0, triedb, nil, nil, newBuffer(triedb.bufferSize, nil, nil, 0)) dl.generator = newGenerator(triedb.diskdb, false, genMarker, stats) dl.generator.run(root) log.Info("Started snapshot generation", "root", root) diff --git a/triedb/pathdb/iterator_test.go b/triedb/pathdb/iterator_test.go index b17414d416..2836297d1a 100644 --- a/triedb/pathdb/iterator_test.go +++ b/triedb/pathdb/iterator_test.go @@ -139,7 +139,7 @@ func TestAccountIteratorBasics(t *testing.T) { db := rawdb.NewMemoryDatabase() batch := db.NewBatch() - states.write(db, batch, nil) + states.write(db, batch, nil, nil) batch.Write() it = newDiskAccountIterator(db, common.Hash{}) verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator @@ -179,7 +179,7 @@ func TestStorageIteratorBasics(t *testing.T) { db := rawdb.NewMemoryDatabase() batch := db.NewBatch() - states.write(db, batch, nil) + states.write(db, batch, nil, nil) batch.Write() for account := range accounts { it := newDiskStorageIterator(db, account, common.Hash{}) diff --git a/triedb/pathdb/journal.go b/triedb/pathdb/journal.go index f8f1bf8982..34bac56bcd 100644 --- a/triedb/pathdb/journal.go +++ b/triedb/pathdb/journal.go @@ -151,7 +151,7 @@ func (db *Database) loadLayers() layer { log.Info("Failed to load journal, discard it", "err", err) } // Return single layer with persistent state. - return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newBuffer(db.bufferSize, nil, nil, 0)) + return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, nil, newBuffer(db.bufferSize, nil, nil, 0)) } // loadDiskLayer reads the binary blob from the layer journal, reconstructing @@ -183,7 +183,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) { if err := states.decode(r); err != nil { return nil, err } - return newDiskLayer(root, id, db, nil, newBuffer(db.bufferSize, &nodes, &states, id-stored)), nil + return newDiskLayer(root, id, db, nil, nil, newBuffer(db.bufferSize, &nodes, &states, id-stored)), nil } // loadDiffLayer reads the next sections of a layer journal, reconstructing a new diff --git a/triedb/pathdb/metrics.go b/triedb/pathdb/metrics.go index e7ccd65639..11e34b1347 100644 --- a/triedb/pathdb/metrics.go +++ b/triedb/pathdb/metrics.go @@ -19,10 +19,16 @@ package pathdb import "github.com/ethereum/go-ethereum/metrics" var ( - cleanHitMeter = metrics.NewRegisteredMeter("pathdb/clean/hit", nil) - cleanMissMeter = metrics.NewRegisteredMeter("pathdb/clean/miss", nil) - cleanReadMeter = metrics.NewRegisteredMeter("pathdb/clean/read", nil) - cleanWriteMeter = metrics.NewRegisteredMeter("pathdb/clean/write", nil) + cleanNodeHitMeter = metrics.NewRegisteredMeter("pathdb/clean/node/hit", nil) + cleanNodeMissMeter = metrics.NewRegisteredMeter("pathdb/clean/node/miss", nil) + cleanNodeReadMeter = metrics.NewRegisteredMeter("pathdb/clean/node/read", nil) + cleanNodeWriteMeter = metrics.NewRegisteredMeter("pathdb/clean/node/write", nil) + + cleanStateHitMeter = metrics.NewRegisteredMeter("pathdb/clean/state/hit", nil) + cleanStateMissMeter = metrics.NewRegisteredMeter("pathdb/clean/state/miss", nil) + cleanStateReadMeter = metrics.NewRegisteredMeter("pathdb/clean/state/read", nil) + cleanStateWriteMeter = metrics.NewRegisteredMeter("pathdb/clean/state/write", nil) + cleanStateInexMeter = metrics.NewRegisteredMeter("pathdb/clean/state/inex", nil) dirtyNodeHitMeter = metrics.NewRegisteredMeter("pathdb/dirty/hit/node", nil) dirtyNodeMissMeter = metrics.NewRegisteredMeter("pathdb/dirty/miss/node", nil) diff --git a/triedb/pathdb/states.go b/triedb/pathdb/states.go index 4766504a92..661ec571de 100644 --- a/triedb/pathdb/states.go +++ b/triedb/pathdb/states.go @@ -23,6 +23,7 @@ import ( "slices" "sync" + "github.com/VictoriaMetrics/fastcache" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" @@ -534,8 +535,8 @@ func (s *stateSet) decode(r *rlp.Stream) error { } // write flushes state mutations into the provided database batch as a whole. -func (s *stateSet) write(db ethdb.KeyValueStore, batch ethdb.Batch, genMarker []byte) (int, int) { - return writeStates(db, batch, genMarker, s.destructSet, s.accountData, s.storageData) +func (s *stateSet) write(db ethdb.KeyValueStore, batch ethdb.Batch, genMarker []byte, clean *fastcache.Cache) (int, int) { + return writeStates(db, batch, genMarker, s.destructSet, s.accountData, s.storageData, clean) } // reset clears all cached state data, including any optional sorted lists that