diff --git a/core/rawdb/freezer_batch.go b/core/rawdb/freezer_batch.go index 84a63a4518..76c46a070b 100644 --- a/core/rawdb/freezer_batch.go +++ b/core/rawdb/freezer_batch.go @@ -180,29 +180,21 @@ func (batch *freezerTableBatch) maybeCommit() error { return nil } -// commit writes the batched items to the backing freezerTable. +// commit writes the batched items to the backing freezerTable. Note both index +// and data file are not fsync'd after the file write, the data could be lost +// after the power failure. func (batch *freezerTableBatch) commit() error { - // Write data. The head file is fsync'd after write to ensure the - // data is truly transferred to disk. _, err := batch.t.head.Write(batch.dataBuffer) if err != nil { return err } - if err := batch.t.head.Sync(); err != nil { - return err - } dataSize := int64(len(batch.dataBuffer)) batch.dataBuffer = batch.dataBuffer[:0] - // Write indices. The index file is fsync'd after write to ensure the - // data indexes are truly transferred to disk. _, err = batch.t.index.Write(batch.indexBuffer) if err != nil { return err } - if err := batch.t.index.Sync(); err != nil { - return err - } indexSize := int64(len(batch.indexBuffer)) batch.indexBuffer = batch.indexBuffer[:0] diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 4b9d510e82..84bfa0727a 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -26,6 +26,7 @@ import ( "path/filepath" "sync" "sync/atomic" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -219,7 +220,13 @@ func (t *freezerTable) repair() error { return err } // New file can't trigger this path } - // Retrieve the file sizes and prepare for truncation + // Validate the index file as it might contain some garbage data after the + // power failures. + if err := t.repairIndex(); err != nil { + return err + } + // Retrieve the file sizes and prepare for truncation. Note the file size + // might be changed after index validation. if stat, err = t.index.Stat(); err != nil { return err } @@ -364,6 +371,122 @@ func (t *freezerTable) repair() error { return nil } +// repairIndex validates the integrity of the index file. According to the design, +// the initial entry in the file denotes the earliest data file along with the +// count of deleted items. Following this, all subsequent entries in the file must +// be in order. This function identifies any corrupted entries and truncates items +// occurring after the corruption point. +// +// The corruption can be made because of the power failure. As in the Linux kernel, +// the file metadata update and data update are not necessarily performed at the +// same time. Typically, the metadata will be flushed/journaled ahead of the file +// data. Therefore, we make the pessimistic assumption that the file is first +// extended with invalid "garbage" data (normally zero bytes) and that afterwards +// the correct data replaces the garbage. As all the items in index file are +// supposed to be in-order, the leftover garbage must be truncated before the +// index data is utilized. +// +// It's important to note an exception that's unfortunately undetectable: when +// all index entries in the file are zero. Distinguishing whether they represent +// leftover garbage or if all items in the table have zero size is impossible. +// In such instances, the file will remain unchanged to prevent potential data +// loss or misinterpretation. +func (t *freezerTable) repairIndex() error { + // Retrieve the file sizes and prepare for validation + stat, err := t.index.Stat() + if err != nil { + return err + } + size := stat.Size() + + var ( + start = time.Now() + batchSize = int64(indexEntrySize * 1024 * 1024) + buffer = make([]byte, batchSize) // pre-allocate for batch reading + prev indexEntry + head indexEntry + readOffset int64 + consumed int64 + + read = func(offset int64) (indexEntry, error) { + if offset+indexEntrySize > size { + return indexEntry{}, fmt.Errorf("slice bounds out of range, offset: %d, size: %d", offset, size) + } + if offset >= readOffset { + n, err := t.index.ReadAt(buffer, readOffset) + if err != nil && !errors.Is(err, io.EOF) { + return indexEntry{}, err + } + expect := batchSize + if size-readOffset < batchSize { + expect = size - readOffset + } + if expect != int64(n) { + return indexEntry{}, fmt.Errorf("failed to read from index, want: %d, got: %d", expect, n) + } + consumed = readOffset + readOffset += int64(n) + } + var entry indexEntry + entry.unmarshalBinary(buffer[offset-consumed : offset-consumed+indexEntrySize]) + return entry, nil + } + truncate = func(offset int64) error { + if t.readonly { + return fmt.Errorf("index file is corrupted at %d, size: %d", offset, size) + } + if err := truncateFreezerFile(t.index, offset); err != nil { + return err + } + log.Warn("Truncated index file", "offset", offset, "truncated", size-offset) + return nil + } + ) + for offset := int64(0); offset < size; offset += indexEntrySize { + entry, err := read(offset) + if err != nil { + return err + } + if offset == 0 { + head = entry + continue + } + // ensure the first non-head index denotes to the earliest file + if offset == indexEntrySize { + if entry.filenum != head.filenum { + return truncate(offset) + } + prev = entry + continue + } + // ensure two consecutive index items are in order + if err := t.checkIndexItems(prev, entry); err != nil { + return truncate(offset) + } + prev = entry + } + log.Debug("Verified index file", "items", size/indexEntrySize, "elapsed", common.PrettyDuration(time.Since(start))) + return nil +} + +// checkIndexItems checks the validity of two consecutive index items. The index +// item is regarded as invalid if: +// - file number of two index items are not same and not monotonically increasing +// - data offset of two index items with same file number are out of order +// - zero data offset with an increasing file number +func (t *freezerTable) checkIndexItems(a, b indexEntry) error { + if b.filenum != a.filenum && b.filenum != a.filenum+1 { + return fmt.Errorf("index items with inconsistent file number, prev: %d, next: %d", a.filenum, b.filenum) + } + if b.filenum == a.filenum && b.offset < a.offset { + return fmt.Errorf("index items with unordered offset, prev: %d, next: %d", a.offset, b.offset) + } + if b.filenum == a.filenum+1 && b.offset == 0 { + return fmt.Errorf("index items with zero offset, file number: %d", b.filenum) + } + return nil +} + // preopen opens all files that the freezer will need. This method should be called from an init-context, // since it assumes that it doesn't have to bother with locking // The rationale for doing preopen is to not have to do it from within Retrieve, thus not needing to ever diff --git a/triedb/pathdb/disklayer.go b/triedb/pathdb/disklayer.go index b6ae39446c..eadcfacef7 100644 --- a/triedb/pathdb/disklayer.go +++ b/triedb/pathdb/disklayer.go @@ -202,7 +202,7 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) { if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest { force = true } - if err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force); err != nil { + if err := ndl.buffer.flush(ndl.db.diskdb, ndl.db.freezer, ndl.cleans, ndl.id, force); err != nil { return nil, err } // To remove outdated history objects from the end, we set the 'tail' parameter @@ -267,7 +267,7 @@ func (dl *diskLayer) setBufferSize(size int) error { if dl.stale { return errSnapshotStale } - return dl.buffer.setSize(size, dl.db.diskdb, dl.cleans, dl.id) + return dl.buffer.setSize(size, dl.db.diskdb, dl.db.freezer, dl.cleans, dl.id) } // size returns the approximate size of cached nodes in the disk layer. diff --git a/triedb/pathdb/nodebuffer.go b/triedb/pathdb/nodebuffer.go index d3492602c8..1f029cba86 100644 --- a/triedb/pathdb/nodebuffer.go +++ b/triedb/pathdb/nodebuffer.go @@ -194,9 +194,9 @@ func (b *nodebuffer) empty() bool { // setSize sets the buffer size to the provided number, and invokes a flush // operation if the current memory usage exceeds the new limit. -func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error { +func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, freezer ethdb.AncientStore, clean *fastcache.Cache, id uint64) error { b.limit = uint64(size) - return b.flush(db, clean, id, false) + return b.flush(db, freezer, clean, id, false) } // allocBatch returns a database batch with pre-allocated buffer. @@ -214,7 +214,7 @@ func (b *nodebuffer) allocBatch(db ethdb.KeyValueStore) ethdb.Batch { // flush persists the in-memory dirty trie node into the disk if the configured // memory threshold is reached. Note, all data must be written atomically. -func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error { +func (b *nodebuffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, clean *fastcache.Cache, id uint64, force bool) error { if b.size <= b.limit && !force { return nil } @@ -227,6 +227,11 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui start = time.Now() batch = b.allocBatch(db) ) + // Explicitly sync the state freezer, ensuring that all written + // data is transferred to disk before updating the key-value store. + if err := freezer.Sync(); err != nil { + return err + } nodes := writeNodes(batch, b.nodes, clean) rawdb.WritePersistentStateID(batch, id)