diff --git a/core/rawdb/freezer_batch.go b/core/rawdb/freezer_batch.go index 84a63a4518..4e1b38df29 100644 --- a/core/rawdb/freezer_batch.go +++ b/core/rawdb/freezer_batch.go @@ -180,10 +180,10 @@ func (batch *freezerTableBatch) maybeCommit() error { return nil } -// commit writes the batched items to the backing freezerTable. +// commit writes the batched items to the backing freezerTable. Note index +// file isn't fsync'd after the file write, the recent write can be lost +// after the power failure. func (batch *freezerTableBatch) commit() error { - // Write data. The head file is fsync'd after write to ensure the - // data is truly transferred to disk. _, err := batch.t.head.Write(batch.dataBuffer) if err != nil { return err @@ -194,15 +194,10 @@ func (batch *freezerTableBatch) commit() error { dataSize := int64(len(batch.dataBuffer)) batch.dataBuffer = batch.dataBuffer[:0] - // Write indices. The index file is fsync'd after write to ensure the - // data indexes are truly transferred to disk. _, err = batch.t.index.Write(batch.indexBuffer) if err != nil { return err } - if err := batch.t.index.Sync(); err != nil { - return err - } indexSize := int64(len(batch.indexBuffer)) batch.indexBuffer = batch.indexBuffer[:0] diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 4b9d510e82..a8ed17b371 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -17,6 +17,7 @@ package rawdb import ( + "bufio" "bytes" "encoding/binary" "errors" @@ -26,6 +27,7 @@ import ( "path/filepath" "sync" "sync/atomic" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -219,7 +221,13 @@ func (t *freezerTable) repair() error { return err } // New file can't trigger this path } - // Retrieve the file sizes and prepare for truncation + // Validate the index file as it might contain some garbage data after the + // power failures. + if err := t.repairIndex(); err != nil { + return err + } + // Retrieve the file sizes and prepare for truncation. Note the file size + // might be changed after index validation. if stat, err = t.index.Stat(); err != nil { return err } @@ -364,6 +372,133 @@ func (t *freezerTable) repair() error { return nil } +// repairIndex validates the integrity of the index file. According to the design, +// the initial entry in the file denotes the earliest data file along with the +// count of deleted items. Following this, all subsequent entries in the file must +// be in order. This function identifies any corrupted entries and truncates items +// occurring after the corruption point. +// +// corruption can occur because of the power failure. In the Linux kernel, the +// file metadata update and data update are not necessarily performed at the +// same time. Typically, the metadata will be flushed/journalled ahead of the file +// data. Therefore, we make the pessimistic assumption that the file is first +// extended with invalid "garbage" data (normally zero bytes) and that afterwards +// the correct data replaces the garbage. As all the items in index file are +// supposed to be in-order, the leftover garbage must be truncated before the +// index data is utilized. +// +// It's important to note an exception that's unfortunately undetectable: when +// all index entries in the file are zero. Distinguishing whether they represent +// leftover garbage or if all items in the table have zero size is impossible. +// In such instances, the file will remain unchanged to prevent potential data +// loss or misinterpretation. +func (t *freezerTable) repairIndex() error { + // Retrieve the file sizes and prepare for validation + stat, err := t.index.Stat() + if err != nil { + return err + } + size := stat.Size() + + // Move the read cursor to the beginning of the file + _, err = t.index.Seek(0, io.SeekStart) + if err != nil { + return err + } + fr := bufio.NewReader(t.index) + + var ( + start = time.Now() + buff = make([]byte, indexEntrySize) + prev indexEntry + head indexEntry + + read = func() (indexEntry, error) { + n, err := io.ReadFull(fr, buff) + if err != nil { + return indexEntry{}, err + } + if n != indexEntrySize { + return indexEntry{}, fmt.Errorf("failed to read from index, n: %d", n) + } + var entry indexEntry + entry.unmarshalBinary(buff) + return entry, nil + } + truncate = func(offset int64) error { + if t.readonly { + return fmt.Errorf("index file is corrupted at %d, size: %d", offset, size) + } + if err := truncateFreezerFile(t.index, offset); err != nil { + return err + } + log.Warn("Truncated index file", "offset", offset, "truncated", size-offset) + return nil + } + ) + for offset := int64(0); offset < size; offset += indexEntrySize { + entry, err := read() + if err != nil { + return err + } + if offset == 0 { + head = entry + continue + } + // Ensure that the first non-head index refers to the earliest file, + // or the next file if the earliest file has no space to place the + // first item. + if offset == indexEntrySize { + if entry.filenum != head.filenum && entry.filenum != head.filenum+1 { + log.Error("Corrupted index item detected", "earliest", head.filenum, "filenumber", entry.filenum) + return truncate(offset) + } + prev = entry + continue + } + // ensure two consecutive index items are in order + if err := t.checkIndexItems(prev, entry); err != nil { + log.Error("Corrupted index item detected", "err", err) + return truncate(offset) + } + prev = entry + } + // Move the read cursor to the end of the file. While theoretically, the + // cursor should reach the end by reading all the items in the file, perform + // the seek operation anyway as a precaution. + _, err = t.index.Seek(0, io.SeekEnd) + if err != nil { + return err + } + log.Debug("Verified index file", "items", size/indexEntrySize, "elapsed", common.PrettyDuration(time.Since(start))) + return nil +} + +// checkIndexItems validates the correctness of two consecutive index items based +// on the following rules: +// +// - The file number of two consecutive index items must either be the same or +// increase monotonically. If the file number decreases or skips in a +// non-sequential manner, the index item is considered invalid. +// +// - For index items with the same file number, the data offset must be in +// non-decreasing order. Note: Two index items with the same file number +// and the same data offset are permitted if the entry size is zero. +// +// - The first index item in a new data file must not have a zero data offset. +func (t *freezerTable) checkIndexItems(a, b indexEntry) error { + if b.filenum != a.filenum && b.filenum != a.filenum+1 { + return fmt.Errorf("index items with inconsistent file number, prev: %d, next: %d", a.filenum, b.filenum) + } + if b.filenum == a.filenum && b.offset < a.offset { + return fmt.Errorf("index items with unordered offset, prev: %d, next: %d", a.offset, b.offset) + } + if b.filenum == a.filenum+1 && b.offset == 0 { + return fmt.Errorf("index items with zero offset, file number: %d", b.filenum) + } + return nil +} + // preopen opens all files that the freezer will need. This method should be called from an init-context, // since it assumes that it doesn't have to bother with locking // The rationale for doing preopen is to not have to do it from within Retrieve, thus not needing to ever diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index 91b4943e59..fd6e3cf199 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -1367,3 +1367,69 @@ func TestRandom(t *testing.T) { t.Fatal(err) } } + +func TestIndexValidation(t *testing.T) { + const ( + items = 30 + dataSize = 10 + ) + garbage := indexEntry{ + filenum: 100, + offset: 200, + } + var cases = []struct { + offset int64 + data []byte + expItems int + }{ + // extend index file with zero bytes at the end + { + offset: (items + 1) * indexEntrySize, + data: make([]byte, indexEntrySize), + expItems: 30, + }, + // write garbage in the first non-head item + { + offset: indexEntrySize, + data: garbage.append(nil), + expItems: 0, + }, + // write garbage in the first non-head item + { + offset: (items/2 + 1) * indexEntrySize, + data: garbage.append(nil), + expItems: items / 2, + }, + } + for _, c := range cases { + fn := fmt.Sprintf("t-%d", rand.Uint64()) + f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 100, true, false) + if err != nil { + t.Fatal(err) + } + writeChunks(t, f, items, dataSize) + + // write corrupted data + f.index.WriteAt(c.data, c.offset) + f.Close() + + // reopen the table, corruption should be truncated + f, err = newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 100, true, false) + if err != nil { + t.Fatal(err) + } + for i := 0; i < c.expItems; i++ { + exp := getChunk(10, i) + got, err := f.Retrieve(uint64(i)) + if err != nil { + t.Fatalf("Failed to read from table, %v", err) + } + if !bytes.Equal(exp, got) { + t.Fatalf("Unexpected item data, want: %v, got: %v", exp, got) + } + } + if f.items.Load() != uint64(c.expItems) { + t.Fatalf("Unexpected item number, want: %d, got: %d", c.expItems, f.items.Load()) + } + } +} diff --git a/triedb/pathdb/disklayer.go b/triedb/pathdb/disklayer.go index b6ae39446c..eadcfacef7 100644 --- a/triedb/pathdb/disklayer.go +++ b/triedb/pathdb/disklayer.go @@ -202,7 +202,7 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) { if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest { force = true } - if err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force); err != nil { + if err := ndl.buffer.flush(ndl.db.diskdb, ndl.db.freezer, ndl.cleans, ndl.id, force); err != nil { return nil, err } // To remove outdated history objects from the end, we set the 'tail' parameter @@ -267,7 +267,7 @@ func (dl *diskLayer) setBufferSize(size int) error { if dl.stale { return errSnapshotStale } - return dl.buffer.setSize(size, dl.db.diskdb, dl.cleans, dl.id) + return dl.buffer.setSize(size, dl.db.diskdb, dl.db.freezer, dl.cleans, dl.id) } // size returns the approximate size of cached nodes in the disk layer. diff --git a/triedb/pathdb/nodebuffer.go b/triedb/pathdb/nodebuffer.go index d3492602c8..a4788ff9ba 100644 --- a/triedb/pathdb/nodebuffer.go +++ b/triedb/pathdb/nodebuffer.go @@ -194,9 +194,9 @@ func (b *nodebuffer) empty() bool { // setSize sets the buffer size to the provided number, and invokes a flush // operation if the current memory usage exceeds the new limit. -func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error { +func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, freezer ethdb.AncientStore, clean *fastcache.Cache, id uint64) error { b.limit = uint64(size) - return b.flush(db, clean, id, false) + return b.flush(db, freezer, clean, id, false) } // allocBatch returns a database batch with pre-allocated buffer. @@ -214,7 +214,7 @@ func (b *nodebuffer) allocBatch(db ethdb.KeyValueStore) ethdb.Batch { // flush persists the in-memory dirty trie node into the disk if the configured // memory threshold is reached. Note, all data must be written atomically. -func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error { +func (b *nodebuffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, clean *fastcache.Cache, id uint64, force bool) error { if b.size <= b.limit && !force { return nil } @@ -227,6 +227,13 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui start = time.Now() batch = b.allocBatch(db) ) + // Explicitly sync the state freezer, ensuring that all written + // data is transferred to disk before updating the key-value store. + if freezer != nil { + if err := freezer.Sync(); err != nil { + return err + } + } nodes := writeNodes(batch, b.nodes, clean) rawdb.WritePersistentStateID(batch, id)