core/rawdb: introduce freezer index repair mechanism

pull/30392/head
Gary Rong 4 months ago
parent c0b5d428a9
commit f9bdae4392
  1. 14
      core/rawdb/freezer_batch.go
  2. 125
      core/rawdb/freezer_table.go
  3. 4
      triedb/pathdb/disklayer.go
  4. 11
      triedb/pathdb/nodebuffer.go

@ -180,29 +180,21 @@ func (batch *freezerTableBatch) maybeCommit() error {
return nil
}
// commit writes the batched items to the backing freezerTable.
// commit writes the batched items to the backing freezerTable. Note both index
// and data file are not fsync'd after the file write, the data could be lost
// after the power failure.
func (batch *freezerTableBatch) commit() error {
// Write data. The head file is fsync'd after write to ensure the
// data is truly transferred to disk.
_, err := batch.t.head.Write(batch.dataBuffer)
if err != nil {
return err
}
if err := batch.t.head.Sync(); err != nil {
return err
}
dataSize := int64(len(batch.dataBuffer))
batch.dataBuffer = batch.dataBuffer[:0]
// Write indices. The index file is fsync'd after write to ensure the
// data indexes are truly transferred to disk.
_, err = batch.t.index.Write(batch.indexBuffer)
if err != nil {
return err
}
if err := batch.t.index.Sync(); err != nil {
return err
}
indexSize := int64(len(batch.indexBuffer))
batch.indexBuffer = batch.indexBuffer[:0]

@ -26,6 +26,7 @@ import (
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
@ -219,7 +220,13 @@ func (t *freezerTable) repair() error {
return err
} // New file can't trigger this path
}
// Retrieve the file sizes and prepare for truncation
// Validate the index file as it might contain some garbage data after the
// power failures.
if err := t.repairIndex(); err != nil {
return err
}
// Retrieve the file sizes and prepare for truncation. Note the file size
// might be changed after index validation.
if stat, err = t.index.Stat(); err != nil {
return err
}
@ -364,6 +371,122 @@ func (t *freezerTable) repair() error {
return nil
}
// repairIndex validates the integrity of the index file. According to the design,
// the initial entry in the file denotes the earliest data file along with the
// count of deleted items. Following this, all subsequent entries in the file must
// be in order. This function identifies any corrupted entries and truncates items
// occurring after the corruption point.
//
// The corruption can be made because of the power failure. As in the Linux kernel,
// the file metadata update and data update are not necessarily performed at the
// same time. Typically, the metadata will be flushed/journaled ahead of the file
// data. Therefore, we make the pessimistic assumption that the file is first
// extended with invalid "garbage" data (normally zero bytes) and that afterwards
// the correct data replaces the garbage. As all the items in index file are
// supposed to be in-order, the leftover garbage must be truncated before the
// index data is utilized.
//
// It's important to note an exception that's unfortunately undetectable: when
// all index entries in the file are zero. Distinguishing whether they represent
// leftover garbage or if all items in the table have zero size is impossible.
// In such instances, the file will remain unchanged to prevent potential data
// loss or misinterpretation.
func (t *freezerTable) repairIndex() error {
// Retrieve the file sizes and prepare for validation
stat, err := t.index.Stat()
if err != nil {
return err
}
size := stat.Size()
var (
start = time.Now()
batchSize = int64(indexEntrySize * 1024 * 1024)
buffer = make([]byte, batchSize) // pre-allocate for batch reading
prev indexEntry
head indexEntry
readOffset int64
consumed int64
read = func(offset int64) (indexEntry, error) {
if offset+indexEntrySize > size {
return indexEntry{}, fmt.Errorf("slice bounds out of range, offset: %d, size: %d", offset, size)
}
if offset >= readOffset {
n, err := t.index.ReadAt(buffer, readOffset)
if err != nil && !errors.Is(err, io.EOF) {
return indexEntry{}, err
}
expect := batchSize
if size-readOffset < batchSize {
expect = size - readOffset
}
if expect != int64(n) {
return indexEntry{}, fmt.Errorf("failed to read from index, want: %d, got: %d", expect, n)
}
consumed = readOffset
readOffset += int64(n)
}
var entry indexEntry
entry.unmarshalBinary(buffer[offset-consumed : offset-consumed+indexEntrySize])
return entry, nil
}
truncate = func(offset int64) error {
if t.readonly {
return fmt.Errorf("index file is corrupted at %d, size: %d", offset, size)
}
if err := truncateFreezerFile(t.index, offset); err != nil {
return err
}
log.Warn("Truncated index file", "offset", offset, "truncated", size-offset)
return nil
}
)
for offset := int64(0); offset < size; offset += indexEntrySize {
entry, err := read(offset)
if err != nil {
return err
}
if offset == 0 {
head = entry
continue
}
// ensure the first non-head index denotes to the earliest file
if offset == indexEntrySize {
if entry.filenum != head.filenum {
return truncate(offset)
}
prev = entry
continue
}
// ensure two consecutive index items are in order
if err := t.checkIndexItems(prev, entry); err != nil {
return truncate(offset)
}
prev = entry
}
log.Debug("Verified index file", "items", size/indexEntrySize, "elapsed", common.PrettyDuration(time.Since(start)))
return nil
}
// checkIndexItems checks the validity of two consecutive index items. The index
// item is regarded as invalid if:
// - file number of two index items are not same and not monotonically increasing
// - data offset of two index items with same file number are out of order
// - zero data offset with an increasing file number
func (t *freezerTable) checkIndexItems(a, b indexEntry) error {
if b.filenum != a.filenum && b.filenum != a.filenum+1 {
return fmt.Errorf("index items with inconsistent file number, prev: %d, next: %d", a.filenum, b.filenum)
}
if b.filenum == a.filenum && b.offset < a.offset {
return fmt.Errorf("index items with unordered offset, prev: %d, next: %d", a.offset, b.offset)
}
if b.filenum == a.filenum+1 && b.offset == 0 {
return fmt.Errorf("index items with zero offset, file number: %d", b.filenum)
}
return nil
}
// preopen opens all files that the freezer will need. This method should be called from an init-context,
// since it assumes that it doesn't have to bother with locking
// The rationale for doing preopen is to not have to do it from within Retrieve, thus not needing to ever

@ -202,7 +202,7 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest {
force = true
}
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force); err != nil {
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.db.freezer, ndl.cleans, ndl.id, force); err != nil {
return nil, err
}
// To remove outdated history objects from the end, we set the 'tail' parameter
@ -267,7 +267,7 @@ func (dl *diskLayer) setBufferSize(size int) error {
if dl.stale {
return errSnapshotStale
}
return dl.buffer.setSize(size, dl.db.diskdb, dl.cleans, dl.id)
return dl.buffer.setSize(size, dl.db.diskdb, dl.db.freezer, dl.cleans, dl.id)
}
// size returns the approximate size of cached nodes in the disk layer.

@ -194,9 +194,9 @@ func (b *nodebuffer) empty() bool {
// setSize sets the buffer size to the provided number, and invokes a flush
// operation if the current memory usage exceeds the new limit.
func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, freezer ethdb.AncientStore, clean *fastcache.Cache, id uint64) error {
b.limit = uint64(size)
return b.flush(db, clean, id, false)
return b.flush(db, freezer, clean, id, false)
}
// allocBatch returns a database batch with pre-allocated buffer.
@ -214,7 +214,7 @@ func (b *nodebuffer) allocBatch(db ethdb.KeyValueStore) ethdb.Batch {
// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error {
func (b *nodebuffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, clean *fastcache.Cache, id uint64, force bool) error {
if b.size <= b.limit && !force {
return nil
}
@ -227,6 +227,11 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui
start = time.Now()
batch = b.allocBatch(db)
)
// Explicitly sync the state freezer, ensuring that all written
// data is transferred to disk before updating the key-value store.
if err := freezer.Sync(); err != nil {
return err
}
nodes := writeNodes(batch, b.nodes, clean)
rawdb.WritePersistentStateID(batch, id)

Loading…
Cancel
Save