triedb/pathdb, eth: introduce Double-Buffer Mechanism in PathDB

Previously, PathDB used a single buffer to aggregate database writes,
which needed to be flushed atomically. However, flushing large amounts
of data (e.g., 256MB) caused significant overhead, often blocking
the system for around 3 seconds during the flush.

To mitigate this overhead and reduce performance spikes, a double-buffer
mechanism is introduced. When the active buffer fills up, it is marked
as frozen and a background flushing process is triggered. Meanwhile, a
new buffer is allocated for incoming writes, allowing operations to
continue uninterrupted.

This approach reduces system blocking times and provides flexibility
in adjusting buffer parameters for improved performance.
pull/30464/head
Gary Rong 4 months ago
parent 67a3b08795
commit 28ee3bc5b7
  1. 10
      core/state/snapshot/generate_test.go
  2. 47
      core/state/statedb_test.go
  3. 85
      triedb/pathdb/buffer.go
  4. 10
      triedb/pathdb/database.go
  5. 4
      triedb/pathdb/difflayer.go
  6. 145
      triedb/pathdb/disklayer.go
  7. 9
      triedb/pathdb/journal.go
  8. 6
      triedb/pathdb/layertree.go

@ -240,6 +240,16 @@ func (t *testHelper) Commit() common.Hash {
}
t.triedb.Update(root, types.EmptyRootHash, 0, t.nodes, t.states)
t.triedb.Commit(root, false)
// re-open the trie database to ensure the frozen buffer
// is not referenced
config := &triedb.Config{}
if t.triedb.Scheme() == rawdb.PathScheme {
config.PathDB = &pathdb.Config{} // disable caching
} else {
config.HashDB = &hashdb.Config{} // disable caching
}
t.triedb = triedb.NewDatabase(t.triedb.Disk(), config)
return root
}

@ -978,19 +978,22 @@ func TestMissingTrieNodes(t *testing.T) {
func testMissingTrieNodes(t *testing.T, scheme string) {
// Create an initial state with a few accounts
var (
tdb *triedb.Database
memDb = rawdb.NewMemoryDatabase()
tdb *triedb.Database
memDb = rawdb.NewMemoryDatabase()
openDb = func() *triedb.Database {
if scheme == rawdb.PathScheme {
return triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{
CleanCacheSize: 0,
WriteBufferSize: 0,
}}) // disable caching
} else {
return triedb.NewDatabase(memDb, &triedb.Config{HashDB: &hashdb.Config{
CleanCacheSize: 0,
}}) // disable caching
}
}
)
if scheme == rawdb.PathScheme {
tdb = triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{
CleanCacheSize: 0,
WriteBufferSize: 0,
}}) // disable caching
} else {
tdb = triedb.NewDatabase(memDb, &triedb.Config{HashDB: &hashdb.Config{
CleanCacheSize: 0,
}}) // disable caching
}
tdb = openDb()
db := NewDatabase(tdb, nil)
var root common.Hash
@ -1008,17 +1011,29 @@ func testMissingTrieNodes(t *testing.T, scheme string) {
tdb.Commit(root, false)
}
// Create a new state on the old root
state, _ = New(root, db)
// Now we clear out the memdb
it := memDb.NewIterator(nil, nil)
for it.Next() {
k := it.Key()
// Leave the root intact
if !bytes.Equal(k, root[:]) {
t.Logf("key: %x", k)
memDb.Delete(k)
if scheme == rawdb.HashScheme {
if !bytes.Equal(k, root[:]) {
t.Logf("key: %x", k)
memDb.Delete(k)
}
}
if scheme == rawdb.PathScheme {
rk := k[len(rawdb.TrieNodeAccountPrefix):]
if len(rk) != 0 {
t.Logf("key: %x", k)
memDb.Delete(k)
}
}
}
tdb = openDb()
db = NewDatabase(tdb, nil)
state, _ = New(root, db)
balance := state.GetBalance(addr)
// The removed elem should lead to it returning zero balance
if exp, got := uint64(0), balance.Uint64(); got != exp {

@ -17,6 +17,7 @@
package pathdb
import (
"errors"
"fmt"
"time"
@ -37,6 +38,9 @@ type buffer struct {
limit uint64 // The maximum memory allowance in bytes
nodes *nodeSet // Aggregated trie node set
states *stateSet // Aggregated state set
done chan struct{} // notifier whether the content in buffer has been flushed or not
flushErr error // error if any exception occurs during flushing
}
// newBuffer initializes the buffer with the provided states and trie nodes.
@ -124,36 +128,61 @@ func (b *buffer) size() uint64 {
// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, nodesCache *fastcache.Cache, id uint64) error {
// Ensure the target state id is aligned with the internal counter.
head := rawdb.ReadPersistentStateID(db)
if head+b.layers != id {
return fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id)
func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, nodesCache *fastcache.Cache, id uint64) {
if b.done != nil {
panic("duplicated flush operation")
}
// Terminate the state snapshot generation if it's active
var (
start = time.Now()
batch = db.NewBatchWithSize(b.nodes.dbsize() * 11 / 10) // extra 10% for potential pebble internal stuff
)
// Explicitly sync the state freezer, ensuring that all written
// data is transferred to disk before updating the key-value store.
if freezer != nil {
if err := freezer.Sync(); err != nil {
return err
b.done = make(chan struct{})
go func() {
defer func() {
close(b.done)
}()
// Ensure the target state id is aligned with the internal counter.
head := rawdb.ReadPersistentStateID(db)
if head+b.layers != id {
b.flushErr = fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id)
return
}
}
nodes := b.nodes.write(batch, nodesCache)
rawdb.WritePersistentStateID(batch, id)
// Terminate the state snapshot generation if it's active
var (
start = time.Now()
batch = db.NewBatchWithSize(b.nodes.dbsize() * 11 / 10) // extra 10% for potential pebble internal stuff
)
// Explicitly sync the state freezer, ensuring that all written
// data is transferred to disk before updating the key-value store.
if freezer != nil {
if err := freezer.Sync(); err != nil {
b.flushErr = err
return
}
}
nodes := b.nodes.write(batch, nodesCache)
rawdb.WritePersistentStateID(batch, id)
// Flush all mutations in a single batch
size := batch.ValueSize()
if err := batch.Write(); err != nil {
b.flushErr = err
return
}
// The content in the frozen buffer is kept for consequent state access,
// TODO (rjl493456442) measure the gc overhead for holding this struct.
commitBytesMeter.Mark(int64(size))
commitNodesMeter.Mark(int64(nodes))
commitTimeTimer.UpdateSince(start)
log.Debug("Persisted buffer content", "nodes", nodes, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
}()
}
// Flush all mutations in a single batch
size := batch.ValueSize()
if err := batch.Write(); err != nil {
return err
// waitFlush blocks until the buffer has been fully flushed and returns any
// stored errors that occurred during the process.
func (b *buffer) waitFlush() error {
if b.done == nil {
return errors.New("the buffer is not frozen")
}
commitBytesMeter.Mark(int64(size))
commitNodesMeter.Mark(int64(nodes))
commitTimeTimer.UpdateSince(start)
b.reset()
log.Debug("Persisted buffer content", "nodes", nodes, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
return nil
<-b.done
return b.flushErr
}

@ -377,7 +377,7 @@ func (db *Database) Enable(root common.Hash) error {
}
// Re-construct a new disk layer backed by persistent state
// with **empty clean cache and node buffer**.
db.tree.reset(newDiskLayer(root, 0, db, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0)))
db.tree.reset(newDiskLayer(root, 0, db, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0), nil))
// Re-enable the database as the final step.
db.waitSync = false
@ -476,7 +476,13 @@ func (db *Database) Close() error {
db.readOnly = true
// Release the memory held by clean cache.
db.tree.bottom().resetCache()
disk := db.tree.bottom()
if disk.frozen != nil {
if err := disk.frozen.waitFlush(); err != nil {
return err
}
}
disk.resetCache()
// Close the attached state history freezer.
if db.freezer == nil {

@ -156,7 +156,7 @@ func (dl *diffLayer) update(root common.Hash, id uint64, block uint64, nodes *no
}
// persist flushes the diff layer and all its parent layers to disk layer.
func (dl *diffLayer) persist(force bool) (layer, error) {
func (dl *diffLayer) persist(force bool) (*diskLayer, error) {
if parent, ok := dl.parentLayer().(*diffLayer); ok {
// Hold the lock to prevent any read operation until the new
// parent is linked correctly.
@ -183,7 +183,7 @@ func (dl *diffLayer) size() uint64 {
// diffToDisk merges a bottom-most diff into the persistent disk layer underneath
// it. The method will panic if called onto a non-bottom-most diff layer.
func diffToDisk(layer *diffLayer, force bool) (layer, error) {
func diffToDisk(layer *diffLayer, force bool) (*diskLayer, error) {
disk, ok := layer.parentLayer().(*diskLayer)
if !ok {
panic(fmt.Sprintf("unknown layer type: %T", layer.parentLayer()))

@ -34,13 +34,14 @@ type diskLayer struct {
id uint64 // Immutable, corresponding state id
db *Database // Path-based trie database
nodes *fastcache.Cache // GC friendly memory cache of clean nodes
buffer *buffer // Dirty buffer to aggregate writes of nodes and states
buffer *buffer // Live buffer to aggregate writes
frozen *buffer // Frozen node buffer waiting for flushing
stale bool // Signals that the layer became stale (state progressed)
lock sync.RWMutex // Lock used to protect stale flag
}
// newDiskLayer creates a new disk layer based on the passing arguments.
func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Cache, buffer *buffer) *diskLayer {
func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Cache, buffer *buffer, frozen *buffer) *diskLayer {
// Initialize a clean cache if the memory allowance is not zero
// or reuse the provided cache if it is not nil (inherited from
// the original disk layer).
@ -53,6 +54,7 @@ func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Ca
db: db,
nodes: nodes,
buffer: buffer,
frozen: frozen,
}
}
@ -101,16 +103,19 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co
if dl.stale {
return nil, common.Hash{}, nil, errSnapshotStale
}
// Try to retrieve the trie node from the not-yet-written
// node buffer first. Note the buffer is lock free since
// it's impossible to mutate the buffer before tagging the
// layer as stale.
n, found := dl.buffer.node(owner, path)
if found {
dirtyNodeHitMeter.Mark(1)
dirtyNodeReadMeter.Mark(int64(len(n.Blob)))
dirtyNodeHitDepthHist.Update(int64(depth))
return n.Blob, n.Hash, &nodeLoc{loc: locDirtyCache, depth: depth}, nil
// Try to retrieve the trie node from the not-yet-written node buffer first
// (both the live one and the frozen one). Note the buffer is lock free since
// it's impossible to mutate the buffer before tagging the layer as stale.
for _, buffer := range []*buffer{dl.buffer, dl.frozen} {
if buffer != nil {
n, found := buffer.node(owner, path)
if found {
dirtyNodeHitMeter.Mark(1)
dirtyNodeReadMeter.Mark(int64(len(n.Blob)))
dirtyNodeHitDepthHist.Update(int64(depth))
return n.Blob, n.Hash, &nodeLoc{loc: locDirtyCache, depth: depth}, nil
}
}
}
dirtyNodeMissMeter.Mark(1)
@ -134,6 +139,11 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co
} else {
blob = rawdb.ReadStorageTrieNode(dl.db.diskdb, owner, path)
}
// Store the resolved data in the clean cache. The background buffer flusher
// may also write to the clean cache concurrently, but two writers cannot
// write the same item with different content. If the item already exists,
// it will be found in the frozen buffer, eliminating the need to check the
// database.
if dl.nodes != nil && len(blob) > 0 {
dl.nodes.Set(key, blob)
cleanNodeWriteMeter.Mark(int64(len(blob)))
@ -152,24 +162,27 @@ func (dl *diskLayer) account(hash common.Hash, depth int) ([]byte, error) {
if dl.stale {
return nil, errSnapshotStale
}
// Try to retrieve the account from the not-yet-written
// node buffer first. Note the buffer is lock free since
// it's impossible to mutate the buffer before tagging the
// layer as stale.
blob, found := dl.buffer.account(hash)
if found {
dirtyStateHitMeter.Mark(1)
dirtyStateReadMeter.Mark(int64(len(blob)))
dirtyStateHitDepthHist.Update(int64(depth))
if len(blob) == 0 {
stateAccountInexMeter.Mark(1)
} else {
stateAccountExistMeter.Mark(1)
// Try to retrieve the trie node from the not-yet-written node buffer first
// (both the live one and the frozen one). Note the buffer is lock free since
// it's impossible to mutate the buffer before tagging the layer as stale.
for _, buffer := range []*buffer{dl.buffer, dl.frozen} {
if buffer != nil {
blob, found := buffer.account(hash)
if found {
dirtyStateHitMeter.Mark(1)
dirtyStateReadMeter.Mark(int64(len(blob)))
dirtyStateHitDepthHist.Update(int64(depth))
if len(blob) == 0 {
stateAccountInexMeter.Mark(1)
} else {
stateAccountExistMeter.Mark(1)
}
return blob, nil
}
}
return blob, nil
}
dirtyStateMissMeter.Mark(1)
dirtyNodeMissMeter.Mark(1)
// TODO(rjl493456442) support persistent state retrieval
return nil, errors.New("not supported")
@ -188,22 +201,27 @@ func (dl *diskLayer) storage(accountHash, storageHash common.Hash, depth int) ([
if dl.stale {
return nil, errSnapshotStale
}
// Try to retrieve the trie node from the not-yet-written node buffer first
// (both the live one and the frozen one). Note the buffer is lock free since
// it's impossible to mutate the buffer before tagging the layer as stale.
for _, buffer := range []*buffer{dl.buffer, dl.frozen} {
if blob, found := buffer.storage(accountHash, storageHash); found {
dirtyStateHitMeter.Mark(1)
dirtyStateReadMeter.Mark(int64(len(blob)))
dirtyStateHitDepthHist.Update(int64(depth))
if len(blob) == 0 {
stateStorageInexMeter.Mark(1)
} else {
stateStorageExistMeter.Mark(1)
}
return blob, nil
}
}
// Try to retrieve the storage slot from the not-yet-written
// node buffer first. Note the buffer is lock free since
// it's impossible to mutate the buffer before tagging the
// layer as stale.
if blob, found := dl.buffer.storage(accountHash, storageHash); found {
dirtyStateHitMeter.Mark(1)
dirtyStateReadMeter.Mark(int64(len(blob)))
dirtyStateHitDepthHist.Update(int64(depth))
if len(blob) == 0 {
stateStorageInexMeter.Mark(1)
} else {
stateStorageExistMeter.Mark(1)
}
return blob, nil
}
dirtyStateMissMeter.Mark(1)
// TODO(rjl493456442) support persistent state retrieval
@ -250,7 +268,7 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
// Mark the diskLayer as stale before applying any mutations on top.
dl.stale = true
// Store the root->id lookup afterwards. All stored lookups are identified
// Store the root->id lookup afterward. All stored lookups are identified
// by the **unique** state root. It's impossible that in the same chain
// blocks are not adjacent but have the same root.
if dl.id == 0 {
@ -262,18 +280,40 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
// truncation) surpasses the persisted state ID, we take the necessary action
// of forcibly committing the cached dirty states to ensure that the persisted
// state ID remains higher.
if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest {
persistedID := rawdb.ReadPersistentStateID(dl.db.diskdb)
if !force && persistedID < oldest {
force = true
}
// Merge the trie nodes and flat states of the bottom-most diff layer into the
// buffer as the combined layer.
// Merge the nodes of the bottom-most diff layer into the buffer as the combined one
combined := dl.buffer.commit(bottom.nodes, bottom.states.stateSet)
if combined.full() || force {
if err := combined.flush(dl.db.diskdb, dl.db.freezer, dl.nodes, bottom.stateID()); err != nil {
return nil, err
// Wait until the previous frozen buffer is fully flushed
if dl.frozen != nil {
if err := dl.frozen.waitFlush(); err != nil {
return nil, err
}
}
// Release the frozen buffer and the internally referenced maps will
// be reclaimed by GC.
dl.frozen = nil
// Freeze the live buffer and schedule background flushing
dl.frozen = combined
dl.frozen.flush(dl.db.diskdb, dl.db.freezer, dl.nodes, bottom.stateID())
// Block until the frozen buffer is fully flushed out if the oldest history
// surpasses the persisted state ID.
if persistedID < oldest {
if err := dl.frozen.waitFlush(); err != nil {
return nil, err
}
}
combined = newBuffer(dl.db.config.WriteBufferSize, nil, nil, 0)
}
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.nodes, combined)
// Construct a new disk layer by merging the nodes from the provided diff
// layer, and flush the content in disk layer if there are too many nodes
// cached. The clean cache is inherited from the original disk layer.
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.nodes, combined, dl.frozen)
// To remove outdated history objects from the end, we set the 'tail' parameter
// to 'oldest-1' due to the offset between the freezer index and the history ID.
@ -337,6 +377,15 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) {
return nil, err
}
} else {
// Block until the frozen buffer is fully flushed
if dl.frozen != nil {
if err := dl.frozen.waitFlush(); err != nil {
return nil, err
}
// Unset the frozen buffer if it exists, otherwise these "reverted"
// states will still be accessible after revert in frozen buffer.
dl.frozen = nil
}
batch := dl.db.diskdb.NewBatch()
writeNodes(batch, nodes, dl.nodes)
rawdb.WritePersistentStateID(batch, dl.id-1)
@ -344,7 +393,7 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) {
log.Crit("Failed to write states", "err", err)
}
}
return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.buffer), nil
return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.buffer, dl.frozen), nil
}
// size returns the approximate size of cached nodes in the disk layer.

@ -109,7 +109,7 @@ func (db *Database) loadLayers() layer {
log.Info("Failed to load journal, discard it", "err", err)
}
// Return single layer with persistent state.
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0))
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0), nil)
}
// loadDiskLayer reads the binary blob from the layer journal, reconstructing
@ -141,7 +141,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
if err := states.decode(r); err != nil {
return nil, err
}
return newDiskLayer(root, id, db, nil, newBuffer(db.config.WriteBufferSize, &nodes, &states, id-stored)), nil
return newDiskLayer(root, id, db, nil, newBuffer(db.config.WriteBufferSize, &nodes, &states, id-stored), nil), nil
}
// loadDiffLayer reads the next sections of a layer journal, reconstructing a new
@ -243,6 +243,11 @@ func (db *Database) Journal(root common.Hash) error {
return fmt.Errorf("triedb layer [%#x] missing", root)
}
disk := db.tree.bottom()
if disk.frozen != nil {
if err := disk.frozen.waitFlush(); err != nil {
return err
}
}
if l, ok := l.(*diffLayer); ok {
log.Info("Persisting dirty state to disk", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.layers)
} else { // disk layer only on noop runs (likely) or deep reorgs (unlikely)

@ -130,6 +130,12 @@ func (tree *layerTree) cap(root common.Hash, layers int) error {
if err != nil {
return err
}
// Block until the frozen buffer is fully flushed
if base.frozen != nil {
if err := base.frozen.waitFlush(); err != nil {
return err
}
}
// Replace the entire layer tree with the flat base
tree.layers = map[common.Hash]layer{base.rootHash(): base}
return nil

Loading…
Cancel
Save