triedb/pathdb, eth: introduce Double-Buffer Mechanism in PathDB

Previously, PathDB used a single buffer to aggregate database writes,
which needed to be flushed atomically. However, flushing large amounts
of data (e.g., 256MB) caused significant overhead, often blocking
the system for around 3 seconds during the flush.

To mitigate this overhead and reduce performance spikes, a double-buffer
mechanism is introduced. When the active buffer fills up, it is marked
as frozen and a background flushing process is triggered. Meanwhile, a
new buffer is allocated for incoming writes, allowing operations to
continue uninterrupted.

This approach reduces system blocking times and provides flexibility
in adjusting buffer parameters for improved performance.
Gary Rong 2 months ago
parent 8032b63f16
commit 20b4ffd3bc
  1. 10
      core/state/snapshot/generate_test.go
  2. 23
      core/state/statedb_test.go
  3. 5
      eth/handler.go
  4. 11
      triedb/database.go
  5. 29
      triedb/pathdb/database.go
  6. 4
      triedb/pathdb/difflayer.go
  7. 2
      triedb/pathdb/difflayer_test.go
  8. 77
      triedb/pathdb/disklayer.go
  9. 9
      triedb/pathdb/journal.go
  10. 6
      triedb/pathdb/layertree.go
  11. 46
      triedb/pathdb/nodebuffer.go

@ -224,6 +224,16 @@ func (t *testHelper) Commit() common.Hash {
}
t.triedb.Update(root, types.EmptyRootHash, 0, t.nodes, nil)
t.triedb.Commit(root, false)
// re-open the trie database to ensure the frozen buffer
// is not referenced
config := &triedb.Config{}
if t.triedb.Scheme() == rawdb.PathScheme {
config.PathDB = &pathdb.Config{} // disable caching
} else {
config.HashDB = &hashdb.Config{} // disable caching
}
t.triedb = triedb.NewDatabase(t.triedb.Disk(), config)
return root
}

@ -978,17 +978,20 @@ func testMissingTrieNodes(t *testing.T, scheme string) {
var (
tdb *triedb.Database
memDb = rawdb.NewMemoryDatabase()
)
openDb = func() *triedb.Database {
if scheme == rawdb.PathScheme {
tdb = triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{
return triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{
CleanCacheSize: 0,
DirtyCacheSize: 0,
}}) // disable caching
} else {
tdb = triedb.NewDatabase(memDb, &triedb.Config{HashDB: &hashdb.Config{
return triedb.NewDatabase(memDb, &triedb.Config{HashDB: &hashdb.Config{
CleanCacheSize: 0,
}}) // disable caching
}
}
)
tdb = openDb()
db := NewDatabase(tdb, nil)
var root common.Hash
@ -1006,17 +1009,29 @@ func testMissingTrieNodes(t *testing.T, scheme string) {
tdb.Commit(root, false)
}
// Create a new state on the old root
state, _ = New(root, db)
// Now we clear out the memdb
it := memDb.NewIterator(nil, nil)
for it.Next() {
k := it.Key()
// Leave the root intact
if scheme == rawdb.HashScheme {
if !bytes.Equal(k, root[:]) {
t.Logf("key: %x", k)
memDb.Delete(k)
}
}
if scheme == rawdb.PathScheme {
rk := k[len(rawdb.TrieNodeAccountPrefix):]
if len(rk) != 0 {
t.Logf("key: %x", k)
memDb.Delete(k)
}
}
}
tdb = openDb()
db = NewDatabase(tdb, nil)
state, _ = New(root, db)
balance := state.GetBalance(addr)
// The removed elem should lead to it returning zero balance
if exp, got := uint64(0), balance.Uint64(); got != exp {

@ -27,7 +27,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
@ -41,7 +40,6 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/triedb/pathdb"
)
const (
@ -558,7 +556,4 @@ func (h *handler) enableSyncedFeatures() {
log.Info("Snap sync complete, auto disabling")
h.snapSync.Store(false)
}
if h.chain.TrieDB().Scheme() == rawdb.PathScheme {
h.chain.TrieDB().SetBufferSize(pathdb.DefaultBufferSize)
}
}

@ -314,17 +314,6 @@ func (db *Database) Journal(root common.Hash) error {
return pdb.Journal(root)
}
// SetBufferSize sets the node buffer size to the provided value(in bytes).
// It's only supported by path-based database and will return an error for
// others.
func (db *Database) SetBufferSize(size int) error {
pdb, ok := db.backend.(*pathdb.Database)
if !ok {
return errors.New("not supported")
}
return pdb.SetBufferSize(size)
}
// IsVerkle returns the indicator if the database is holding a verkle tree.
func (db *Database) IsVerkle() bool {
return db.config.IsVerkle

@ -44,12 +44,12 @@ const (
// support is 4GB, node will panic if batch size exceeds this limit.
maxBufferSize = 256 * 1024 * 1024
// DefaultBufferSize is the default memory allowance of node buffer
// defaultBufferSize is the default memory allowance of node buffer
// that aggregates the writes from above until it's flushed into the
// disk. It's meant to be used once the initial sync is finished.
// Do not increase the buffer size arbitrarily, otherwise the system
// pause time will increase when the database writes happen.
DefaultBufferSize = 64 * 1024 * 1024
defaultBufferSize = 64 * 1024 * 1024
)
var (
@ -111,7 +111,7 @@ func (c *Config) sanitize() *Config {
var Defaults = &Config{
StateHistory: params.FullImmutabilityThreshold,
CleanCacheSize: defaultCleanSize,
DirtyCacheSize: DefaultBufferSize,
DirtyCacheSize: defaultBufferSize,
}
// ReadOnly is the config in order to open database in read only mode.
@ -341,7 +341,7 @@ func (db *Database) Enable(root common.Hash) error {
}
// Re-construct a new disk layer backed by persistent state
// with **empty clean cache and node buffer**.
db.tree.reset(newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0)))
db.tree.reset(newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0), nil))
// Re-enable the database as the final step.
db.waitSync = false
@ -440,7 +440,13 @@ func (db *Database) Close() error {
db.readOnly = true
// Release the memory held by clean cache.
db.tree.bottom().resetCache()
disk := db.tree.bottom()
if disk.frozen != nil {
if err := disk.frozen.flushed(); err != nil {
return err
}
}
disk.resetCache()
// Close the attached state history freezer.
if db.freezer == nil {
@ -478,19 +484,6 @@ func (db *Database) Initialized(genesisRoot common.Hash) bool {
return inited
}
// SetBufferSize sets the node buffer size to the provided value(in bytes).
func (db *Database) SetBufferSize(size int) error {
db.lock.Lock()
defer db.lock.Unlock()
if size > maxBufferSize {
log.Info("Capped node buffer size", "provided", common.StorageSize(size), "adjusted", common.StorageSize(maxBufferSize))
size = maxBufferSize
}
db.bufferSize = size
return db.tree.bottom().setBufferSize(db.bufferSize)
}
// modifyAllowed returns the indicator if mutation is allowed. This function
// assumes the db.lock is already held.
func (db *Database) modifyAllowed() error {

@ -125,7 +125,7 @@ func (dl *diffLayer) update(root common.Hash, id uint64, block uint64, nodes map
}
// persist flushes the diff layer and all its parent layers to disk layer.
func (dl *diffLayer) persist(force bool) (layer, error) {
func (dl *diffLayer) persist(force bool) (*diskLayer, error) {
if parent, ok := dl.parentLayer().(*diffLayer); ok {
// Hold the lock to prevent any read operation until the new
// parent is linked correctly.
@ -147,7 +147,7 @@ func (dl *diffLayer) persist(force bool) (layer, error) {
// diffToDisk merges a bottom-most diff into the persistent disk layer underneath
// it. The method will panic if called onto a non-bottom-most diff layer.
func diffToDisk(layer *diffLayer, force bool) (layer, error) {
func diffToDisk(layer *diffLayer, force bool) (*diskLayer, error) {
disk, ok := layer.parentLayer().(*diskLayer)
if !ok {
panic(fmt.Sprintf("unknown layer type: %T", layer.parentLayer()))

@ -30,7 +30,7 @@ import (
func emptyLayer() *diskLayer {
return &diskLayer{
db: New(rawdb.NewMemoryDatabase(), nil, false),
buffer: newNodeBuffer(DefaultBufferSize, nil, 0),
buffer: newNodeBuffer(defaultBufferSize, nil, 0),
}
}

@ -36,12 +36,13 @@ type diskLayer struct {
db *Database // Path-based trie database
cleans *fastcache.Cache // GC friendly memory cache of clean node RLPs
buffer *nodebuffer // Node buffer to aggregate writes
frozen *nodebuffer // Frozen node buffer waiting for flushing
stale bool // Signals that the layer became stale (state progressed)
lock sync.RWMutex // Lock used to protect stale flag
}
// newDiskLayer creates a new disk layer based on the passing arguments.
func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer *nodebuffer) *diskLayer {
func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer *nodebuffer, frozen *nodebuffer) *diskLayer {
// Initialize a clean cache if the memory allowance is not zero
// or reuse the provided cache if it is not nil (inherited from
// the original disk layer).
@ -54,6 +55,7 @@ func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.C
db: db,
cleans: cleans,
buffer: buffer,
frozen: frozen,
}
}
@ -102,17 +104,20 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co
if dl.stale {
return nil, common.Hash{}, nil, errSnapshotStale
}
// Try to retrieve the trie node from the not-yet-written
// node buffer first. Note the buffer is lock free since
// it's impossible to mutate the buffer before tagging the
// layer as stale.
n, found := dl.buffer.node(owner, path)
// Try to retrieve the trie node from the not-yet-written node buffer first
// (both the live one and the frozen one). Note the buffer is lock free since
// it's impossible to mutate the buffer before tagging the layer as stale.
for _, buffer := range []*nodebuffer{dl.buffer, dl.frozen} {
if buffer != nil {
n, found := buffer.node(owner, path)
if found {
dirtyHitMeter.Mark(1)
dirtyReadMeter.Mark(int64(len(n.Blob)))
dirtyNodeHitDepthHist.Update(int64(depth))
return n.Blob, n.Hash, &nodeLoc{loc: locDirtyCache, depth: depth}, nil
}
}
}
dirtyMissMeter.Mark(1)
// Try to retrieve the trie node from the clean memory cache
@ -182,7 +187,7 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
// Mark the diskLayer as stale before applying any mutations on top.
dl.stale = true
// Store the root->id lookup afterwards. All stored lookups are identified
// Store the root->id lookup afterward. All stored lookups are identified
// by the **unique** state root. It's impossible that in the same chain
// blocks are not adjacent but have the same root.
if dl.id == 0 {
@ -190,21 +195,43 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
}
rawdb.WriteStateID(dl.db.diskdb, bottom.rootHash(), bottom.stateID())
// Construct a new disk layer by merging the nodes from the provided diff
// layer, and flush the content in disk layer if there are too many nodes
// cached. The clean cache is inherited from the original disk layer.
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, dl.buffer.commit(bottom.nodes))
// In a unique scenario where the ID of the oldest history object (after tail
// truncation) surpasses the persisted state ID, we take the necessary action
// of forcibly committing the cached dirty nodes to ensure that the persisted
// of forcibly committing the cached dirty states to ensure that the persisted
// state ID remains higher.
if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest {
persistedID := rawdb.ReadPersistentStateID(dl.db.diskdb)
if !force && persistedID < oldest {
force = true
}
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force); err != nil {
// Merge the nodes of the bottom-most diff layer into the buffer as the combined one
combined := dl.buffer.commit(bottom.nodes)
if combined.full() || force {
// Wait until the previous frozen buffer is fully flushed
if dl.frozen != nil {
if err := dl.frozen.flushed(); err != nil {
return nil, err
}
}
dl.frozen = nil
// Freeze the live buffer and schedule background flushing
dl.frozen = combined
dl.frozen.flush(dl.db.diskdb, dl.cleans, bottom.stateID())
// Block until the frozen buffer is fully flushed out if the oldest history
// surpasses the persisted state ID.
if persistedID < oldest {
if err := dl.frozen.flushed(); err != nil {
return nil, err
}
}
combined = newNodeBuffer(dl.db.bufferSize, nil, 0)
}
// Construct a new disk layer by merging the nodes from the provided diff
// layer, and flush the content in disk layer if there are too many nodes
// cached. The clean cache is inherited from the original disk layer.
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, combined, dl.frozen)
// To remove outdated history objects from the end, we set the 'tail' parameter
// to 'oldest-1' due to the offset between the freezer index and the history ID.
if overflow {
@ -249,6 +276,13 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) {
return nil, err
}
} else {
// Block until the frozen buffer is fully flushed
if dl.frozen != nil {
if err := dl.frozen.flushed(); err != nil {
return nil, err
}
dl.frozen = nil // unset the frozen buffer
}
batch := dl.db.diskdb.NewBatch()
writeNodes(batch, nodes, dl.cleans)
rawdb.WritePersistentStateID(batch, dl.id-1)
@ -256,18 +290,7 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) {
log.Crit("Failed to write states", "err", err)
}
}
return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer), nil
}
// setBufferSize sets the node buffer size to the provided value.
func (dl *diskLayer) setBufferSize(size int) error {
dl.lock.RLock()
defer dl.lock.RUnlock()
if dl.stale {
return errSnapshotStale
}
return dl.buffer.setSize(size, dl.db.diskdb, dl.cleans, dl.id)
return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer, dl.frozen), nil
}
// size returns the approximate size of cached nodes in the disk layer.

@ -136,7 +136,7 @@ func (db *Database) loadLayers() layer {
log.Info("Failed to load journal, discard it", "err", err)
}
// Return single layer with persistent state.
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newNodeBuffer(db.bufferSize, nil, 0))
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newNodeBuffer(db.bufferSize, nil, 0), nil)
}
// loadDiskLayer reads the binary blob from the layer journal, reconstructing
@ -176,7 +176,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
nodes[entry.Owner] = subset
}
// Calculate the internal state transitions by id difference.
base := newDiskLayer(root, id, db, nil, newNodeBuffer(db.bufferSize, nodes, id-stored))
base := newDiskLayer(root, id, db, nil, newNodeBuffer(db.bufferSize, nodes, id-stored), nil)
return base, nil
}
@ -342,6 +342,11 @@ func (db *Database) Journal(root common.Hash) error {
return fmt.Errorf("triedb layer [%#x] missing", root)
}
disk := db.tree.bottom()
if disk.frozen != nil {
if err := disk.frozen.flushed(); err != nil {
return err
}
}
if l, ok := l.(*diffLayer); ok {
log.Info("Persisting dirty state to disk", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.layers)
} else { // disk layer only on noop runs (likely) or deep reorgs (unlikely)

@ -131,6 +131,12 @@ func (tree *layerTree) cap(root common.Hash, layers int) error {
if err != nil {
return err
}
// Block until the frozen buffer is fully flushed
if base.frozen != nil {
if err := base.frozen.flushed(); err != nil {
return err
}
}
// Replace the entire layer tree with the flat base
tree.layers = map[common.Hash]layer{base.rootHash(): base}
return nil

@ -39,6 +39,9 @@ type nodebuffer struct {
size uint64 // The size of aggregated writes
limit uint64 // The maximum memory allowance in bytes
nodes map[common.Hash]map[string]*trienode.Node // The dirty node set, mapped by owner and path
done chan struct{} // notifier whether the content in buffer has been flushed or not
flushErr error // error if any exception occurs during flushing
}
// newNodeBuffer initializes the node buffer with the provided nodes.
@ -192,11 +195,10 @@ func (b *nodebuffer) empty() bool {
return b.layers == 0
}
// setSize sets the buffer size to the provided number, and invokes a flush
// operation if the current memory usage exceeds the new limit.
func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
b.limit = uint64(size)
return b.flush(db, clean, id, false)
// full returns an indicator if the size of accumulated data exceeds the configured
// threshold.
func (b *nodebuffer) full() bool {
return b.size > b.limit
}
// allocBatch returns a database batch with pre-allocated buffer.
@ -212,16 +214,19 @@ func (b *nodebuffer) allocBatch(db ethdb.KeyValueStore) ethdb.Batch {
return db.NewBatchWithSize((metasize + int(b.size)) * 11 / 10) // extra 10% for potential pebble internal stuff
}
// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error {
if b.size <= b.limit && !force {
return nil
}
// Ensure the target state id is aligned with the internal counter.
// flush persists the in-memory dirty trie node into the disk. Note, all data must be written atomically.
func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) {
b.done = make(chan struct{})
go func() {
defer func() {
close(b.done)
}()
// Error out if the state id is aligned with the internal counter
head := rawdb.ReadPersistentStateID(db)
if head+b.layers != id {
return fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id)
b.flushErr = fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id)
return
}
var (
start = time.Now()
@ -233,14 +238,21 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui
// Flush all mutations in a single batch
size := batch.ValueSize()
if err := batch.Write(); err != nil {
return err
b.flushErr = err
return
}
commitBytesMeter.Mark(int64(size))
commitNodesMeter.Mark(int64(nodes))
commitTimeTimer.UpdateSince(start)
log.Debug("Persisted pathdb nodes", "nodes", len(b.nodes), "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
b.reset()
return nil
log.Info("Persisted pathdb nodes", "nodes", nodes, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
}()
}
// flushed blocks until the buffer is fully flushed and also returns the memorized
// error which occurs within the flushing.
func (b *nodebuffer) flushed() error {
<-b.done
return b.flushErr
}
// writeNodes writes the trie nodes into the provided database batch.

Loading…
Cancel
Save