triedb/pathdb: clean cache for states

pull/30159/head
Gary Rong 4 weeks ago
parent ebc045469c
commit 72f40bfe85
  1. 6
      triedb/pathdb/buffer.go
  2. 2
      triedb/pathdb/database.go
  3. 94
      triedb/pathdb/disklayer.go
  4. 21
      triedb/pathdb/flush.go
  5. 2
      triedb/pathdb/generate.go
  6. 4
      triedb/pathdb/iterator_test.go
  7. 4
      triedb/pathdb/journal.go
  8. 14
      triedb/pathdb/metrics.go
  9. 5
      triedb/pathdb/states.go

@ -133,7 +133,7 @@ func (b *buffer) allocBatch(db ethdb.KeyValueStore) ethdb.Batch {
// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, progress []byte, clean *fastcache.Cache, id uint64) error {
func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, progress []byte, nodesCache, statesCache *fastcache.Cache, id uint64) error {
// Ensure the target state id is aligned with the internal counter.
head := rawdb.ReadPersistentStateID(db)
if head+b.layers != id {
@ -144,8 +144,8 @@ func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, progress []byte
start = time.Now()
batch = b.allocBatch(db)
)
nodes := b.nodes.write(batch, b.nodes.nodes, clean)
accounts, slots := b.states.write(db, batch, progress)
nodes := b.nodes.write(batch, b.nodes.nodes, nodesCache)
accounts, slots := b.states.write(db, batch, progress, statesCache)
rawdb.WritePersistentStateID(batch, id)
rawdb.WriteSnapshotRoot(batch, root)

@ -42,7 +42,7 @@ const (
// Too large node buffer will cause the system to pause for a long
// time when write happens. Also, the largest batch that pebble can
// support is 4GB, node will panic if batch size exceeds this limit.
maxBufferSize = 512 * 1024 * 1024
maxBufferSize = 256 * 1024 * 1024
// defaultBufferSize is the default memory allowance of node buffer
// that aggregates the writes from above until it's flushed into the

@ -33,7 +33,8 @@ type diskLayer struct {
root common.Hash // Immutable, root hash to which this layer was made for
id uint64 // Immutable, corresponding state id
db *Database // Path-based trie database
cleans *fastcache.Cache // GC friendly memory cache of clean nodes and states
nodes *fastcache.Cache // GC friendly memory cache of clean nodes
states *fastcache.Cache // GC friendly memory cache of clean states
buffer *buffer // Dirty buffer to aggregate writes of nodes and states
stale bool // Signals that the layer became stale (state progressed)
lock sync.RWMutex // Lock used to protect stale flag and genMarker
@ -43,18 +44,22 @@ type diskLayer struct {
}
// newDiskLayer creates a new disk layer based on the passing arguments.
func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer *buffer) *diskLayer {
// Initialize a clean cache if the memory allowance is not zero
// or reuse the provided cache if it is not nil (inherited from
func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Cache, states *fastcache.Cache, buffer *buffer) *diskLayer {
// Initialize the clean caches if the memory allowance is not zero
// or reuse the provided caches if they are not nil (inherited from
// the original disk layer).
if cleans == nil && db.config.CleanCacheSize != 0 {
cleans = fastcache.New(db.config.CleanCacheSize)
if nodes == nil && db.config.CleanCacheSize != 0 {
nodes = fastcache.New(db.config.CleanCacheSize / 2)
}
if states == nil && db.config.CleanCacheSize != 0 {
states = fastcache.New(db.config.CleanCacheSize / 2)
}
return &diskLayer{
root: root,
id: id,
db: db,
cleans: cleans,
nodes: nodes,
states: states,
buffer: buffer,
}
}
@ -129,13 +134,13 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co
defer h.release()
key := cacheKey(owner, path)
if dl.cleans != nil {
if blob := dl.cleans.Get(nil, key); len(blob) > 0 {
cleanHitMeter.Mark(1)
cleanReadMeter.Mark(int64(len(blob)))
if dl.nodes != nil {
if blob := dl.nodes.Get(nil, key); len(blob) > 0 {
cleanNodeHitMeter.Mark(1)
cleanNodeReadMeter.Mark(int64(len(blob)))
return blob, h.hash(blob), &nodeLoc{loc: locCleanCache, depth: depth}, nil
}
cleanMissMeter.Mark(1)
cleanNodeMissMeter.Mark(1)
}
// Try to retrieve the trie node from the disk.
var blob []byte
@ -144,9 +149,9 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co
} else {
blob = rawdb.ReadStorageTrieNode(dl.db.diskdb, owner, path)
}
if dl.cleans != nil && len(blob) > 0 {
dl.cleans.Set(key, blob)
cleanWriteMeter.Mark(int64(len(blob)))
if dl.nodes != nil && len(blob) > 0 {
dl.nodes.Set(key, blob)
cleanNodeWriteMeter.Mark(int64(len(blob)))
}
return blob, h.hash(blob), &nodeLoc{loc: locDiskLayer, depth: depth}, nil
}
@ -181,8 +186,26 @@ func (dl *diskLayer) account(hash common.Hash, depth int) ([]byte, error) {
if marker != nil && bytes.Compare(hash.Bytes(), marker) > 0 {
return nil, errNotCoveredYet
}
// Try to retrieve the account from the memory cache
if dl.states != nil {
if blob, found := dl.states.HasGet(nil, hash[:]); found {
cleanStateHitMeter.Mark(1)
cleanStateReadMeter.Mark(int64(len(blob)))
return blob, nil
}
cleanStateMissMeter.Mark(1)
}
// Try to retrieve the account from the disk.
return rawdb.ReadAccountSnapshot(dl.db.diskdb, hash), nil
blob = rawdb.ReadAccountSnapshot(dl.db.diskdb, hash)
if dl.states != nil {
dl.states.Set(hash[:], blob)
if n := len(blob); n > 0 {
cleanStateWriteMeter.Mark(int64(n))
} else {
cleanStateInexMeter.Mark(1)
}
}
return blob, nil
}
// storage directly retrieves the storage data associated with a particular hash,
@ -213,8 +236,26 @@ func (dl *diskLayer) storage(accountHash, storageHash common.Hash, depth int) ([
if marker != nil && bytes.Compare(key, marker) > 0 {
return nil, errNotCoveredYet
}
// Try to retrieve the storage slot from the memory cache
if dl.states != nil {
if blob, found := dl.states.HasGet(nil, key); found {
cleanStateHitMeter.Mark(1)
cleanStateReadMeter.Mark(int64(len(blob)))
return blob, nil
}
cleanStateMissMeter.Mark(1)
}
// Try to retrieve the account from the disk.
return rawdb.ReadStorageSnapshot(dl.db.diskdb, accountHash, storageHash), nil
blob := rawdb.ReadStorageSnapshot(dl.db.diskdb, accountHash, storageHash)
if dl.states != nil {
dl.states.Set(key, blob)
if n := len(blob); n > 0 {
cleanStateWriteMeter.Mark(int64(n))
} else {
cleanStateInexMeter.Mark(1)
}
}
return blob, nil
}
// update implements the layer interface, returning a new diff layer on top
@ -285,7 +326,7 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
}
// Flush the trie data and state data into the combined buffer. Any state after
// the progress marker will be ignored, as the generator will pick it up later.
if err := combined.flush(bottom.root, dl.db.diskdb, progress, dl.cleans, bottom.stateID()); err != nil {
if err := combined.flush(bottom.root, dl.db.diskdb, progress, dl.nodes, dl.states, bottom.stateID()); err != nil {
return nil, err
}
// Relaunch the state snapshot generation if it's not done yet
@ -294,7 +335,7 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
log.Info("Resumed state snapshot generation", "root", bottom.root)
}
}
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, combined)
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.nodes, dl.states, combined)
// Link the generator if snapshot is not yet completed
if dl.generator != nil && !dl.generator.completed() {
@ -351,7 +392,7 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) {
if err != nil {
return nil, err
}
ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer)
ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer)
// Link the generator if it exists
if dl.generator != nil {
@ -366,8 +407,8 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) {
progress = dl.generator.progressMarker()
}
batch := dl.db.diskdb.NewBatch()
writeNodes(batch, nodes, dl.cleans)
writeStates(dl.db.diskdb, batch, progress, nil, accounts, storages)
writeNodes(batch, nodes, dl.nodes)
writeStates(dl.db.diskdb, batch, progress, nil, accounts, storages, dl.states)
rawdb.WritePersistentStateID(batch, dl.id-1)
rawdb.WriteSnapshotRoot(batch, h.meta.parent)
if err := batch.Write(); err != nil {
@ -375,7 +416,7 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) {
}
// Link the generator and resume generation if the snapshot is not yet
// fully completed.
ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer)
ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer)
if dl.generator != nil && !dl.generator.completed() {
ndl.generator = dl.generator
ndl.generator.run(h.meta.parent)
@ -404,8 +445,11 @@ func (dl *diskLayer) resetCache() {
if dl.stale {
return
}
if dl.cleans != nil {
dl.cleans.Reset()
if dl.nodes != nil {
dl.nodes.Reset()
}
if dl.states != nil {
dl.states.Reset()
}
}

@ -66,7 +66,7 @@ func writeNodes(batch ethdb.Batch, nodes map[common.Hash]map[string]*trienode.No
}
// writeStates flushes state mutations into the provided database batch as a whole.
func writeStates(db ethdb.KeyValueStore, batch ethdb.Batch, genMarker []byte, destructSet map[common.Hash]struct{}, accountData map[common.Hash][]byte, storageData map[common.Hash]map[common.Hash][]byte) (int, int) {
func writeStates(db ethdb.KeyValueStore, batch ethdb.Batch, genMarker []byte, destructSet map[common.Hash]struct{}, accountData map[common.Hash][]byte, storageData map[common.Hash]map[common.Hash][]byte, clean *fastcache.Cache) (int, int) {
var (
accounts int
slots int
@ -78,11 +78,16 @@ func writeStates(db ethdb.KeyValueStore, batch ethdb.Batch, genMarker []byte, de
}
rawdb.DeleteAccountSnapshot(batch, addrHash)
accounts += 1
if clean != nil {
clean.Set(addrHash[:], nil)
}
it := rawdb.IterateStorageSnapshots(db, addrHash)
for it.Next() {
batch.Delete(it.Key())
slots += 1
if clean != nil {
clean.Del(it.Key()[1:])
}
}
it.Release()
}
@ -94,8 +99,14 @@ func writeStates(db ethdb.KeyValueStore, batch ethdb.Batch, genMarker []byte, de
accounts += 1
if len(blob) == 0 {
rawdb.DeleteAccountSnapshot(batch, addrHash)
if clean != nil {
clean.Set(addrHash[:], nil)
}
} else {
rawdb.WriteAccountSnapshot(batch, addrHash, blob)
if clean != nil {
clean.Set(addrHash[:], blob)
}
}
}
for addrHash, storages := range storageData {
@ -113,8 +124,14 @@ func writeStates(db ethdb.KeyValueStore, batch ethdb.Batch, genMarker []byte, de
slots += 1
if len(blob) == 0 {
rawdb.DeleteStorageSnapshot(batch, addrHash, storageHash)
if clean != nil {
clean.Set(append(addrHash[:], storageHash[:]...), nil)
}
} else {
rawdb.WriteStorageSnapshot(batch, addrHash, storageHash, blob)
if clean != nil {
clean.Set(append(addrHash[:], storageHash[:]...), blob)
}
}
}
}

@ -175,7 +175,7 @@ func generateSnapshot(triedb *Database, root common.Hash) *diskLayer {
stats = &generatorStats{start: time.Now()}
genMarker = []byte{} // Initialized but empty!
)
dl := newDiskLayer(root, 0, triedb, nil, newBuffer(triedb.bufferSize, nil, nil, 0))
dl := newDiskLayer(root, 0, triedb, nil, nil, newBuffer(triedb.bufferSize, nil, nil, 0))
dl.generator = newGenerator(triedb.diskdb, false, genMarker, stats)
dl.generator.run(root)
log.Info("Started snapshot generation", "root", root)

@ -139,7 +139,7 @@ func TestAccountIteratorBasics(t *testing.T) {
db := rawdb.NewMemoryDatabase()
batch := db.NewBatch()
states.write(db, batch, nil)
states.write(db, batch, nil, nil)
batch.Write()
it = newDiskAccountIterator(db, common.Hash{})
verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator
@ -179,7 +179,7 @@ func TestStorageIteratorBasics(t *testing.T) {
db := rawdb.NewMemoryDatabase()
batch := db.NewBatch()
states.write(db, batch, nil)
states.write(db, batch, nil, nil)
batch.Write()
for account := range accounts {
it := newDiskStorageIterator(db, account, common.Hash{})

@ -151,7 +151,7 @@ func (db *Database) loadLayers() layer {
log.Info("Failed to load journal, discard it", "err", err)
}
// Return single layer with persistent state.
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newBuffer(db.bufferSize, nil, nil, 0))
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, nil, newBuffer(db.bufferSize, nil, nil, 0))
}
// loadDiskLayer reads the binary blob from the layer journal, reconstructing
@ -183,7 +183,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
if err := states.decode(r); err != nil {
return nil, err
}
return newDiskLayer(root, id, db, nil, newBuffer(db.bufferSize, &nodes, &states, id-stored)), nil
return newDiskLayer(root, id, db, nil, nil, newBuffer(db.bufferSize, &nodes, &states, id-stored)), nil
}
// loadDiffLayer reads the next sections of a layer journal, reconstructing a new

@ -19,10 +19,16 @@ package pathdb
import "github.com/ethereum/go-ethereum/metrics"
var (
cleanHitMeter = metrics.NewRegisteredMeter("pathdb/clean/hit", nil)
cleanMissMeter = metrics.NewRegisteredMeter("pathdb/clean/miss", nil)
cleanReadMeter = metrics.NewRegisteredMeter("pathdb/clean/read", nil)
cleanWriteMeter = metrics.NewRegisteredMeter("pathdb/clean/write", nil)
cleanNodeHitMeter = metrics.NewRegisteredMeter("pathdb/clean/node/hit", nil)
cleanNodeMissMeter = metrics.NewRegisteredMeter("pathdb/clean/node/miss", nil)
cleanNodeReadMeter = metrics.NewRegisteredMeter("pathdb/clean/node/read", nil)
cleanNodeWriteMeter = metrics.NewRegisteredMeter("pathdb/clean/node/write", nil)
cleanStateHitMeter = metrics.NewRegisteredMeter("pathdb/clean/state/hit", nil)
cleanStateMissMeter = metrics.NewRegisteredMeter("pathdb/clean/state/miss", nil)
cleanStateReadMeter = metrics.NewRegisteredMeter("pathdb/clean/state/read", nil)
cleanStateWriteMeter = metrics.NewRegisteredMeter("pathdb/clean/state/write", nil)
cleanStateInexMeter = metrics.NewRegisteredMeter("pathdb/clean/state/inex", nil)
dirtyNodeHitMeter = metrics.NewRegisteredMeter("pathdb/dirty/hit/node", nil)
dirtyNodeMissMeter = metrics.NewRegisteredMeter("pathdb/dirty/miss/node", nil)

@ -23,6 +23,7 @@ import (
"slices"
"sync"
"github.com/VictoriaMetrics/fastcache"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
@ -534,8 +535,8 @@ func (s *stateSet) decode(r *rlp.Stream) error {
}
// write flushes state mutations into the provided database batch as a whole.
func (s *stateSet) write(db ethdb.KeyValueStore, batch ethdb.Batch, genMarker []byte) (int, int) {
return writeStates(db, batch, genMarker, s.destructSet, s.accountData, s.storageData)
func (s *stateSet) write(db ethdb.KeyValueStore, batch ethdb.Batch, genMarker []byte, clean *fastcache.Cache) (int, int) {
return writeStates(db, batch, genMarker, s.destructSet, s.accountData, s.storageData, clean)
}
// reset clears all cached state data, including any optional sorted lists that

Loading…
Cancel
Save