core/rawdb, triedb/pathdb: various fixes

Gary Rong 6 months ago
parent f9bdae4392
commit 2deeb9d89d
  1. 4
      core/rawdb/freezer_batch.go
  2. 67
      core/rawdb/freezer_table.go
  3. 66
      core/rawdb/freezer_table_test.go
  4. 6
      triedb/pathdb/nodebuffer.go

@ -180,8 +180,8 @@ func (batch *freezerTableBatch) maybeCommit() error {
return nil
}
// commit writes the batched items to the backing freezerTable. Note both index
// and data file are not fsync'd after the file write, the data could be lost
// commit writes the batched items to the backing freezerTable. Note neither index
// nor data file are fsync'd after the file write, the recent write can be lost
// after the power failure.
func (batch *freezerTableBatch) commit() error {
_, err := batch.t.head.Write(batch.dataBuffer)

@ -17,6 +17,7 @@
package rawdb
import (
"bufio"
"bytes"
"encoding/binary"
"errors"
@ -377,9 +378,9 @@ func (t *freezerTable) repair() error {
// be in order. This function identifies any corrupted entries and truncates items
// occurring after the corruption point.
//
// The corruption can be made because of the power failure. As in the Linux kernel,
// the file metadata update and data update are not necessarily performed at the
// same time. Typically, the metadata will be flushed/journaled ahead of the file
// corruption can occur because of the power failure. In the Linux kernel, the
// file metadata update and data update are not necessarily performed at the
// same time. Typically, the metadata will be flushed/journalled ahead of the file
// data. Therefore, we make the pessimistic assumption that the file is first
// extended with invalid "garbage" data (normally zero bytes) and that afterwards
// the correct data replaces the garbage. As all the items in index file are
@ -399,36 +400,29 @@ func (t *freezerTable) repairIndex() error {
}
size := stat.Size()
// Move the read cursor to the beginning of the file.
_, err = t.index.Seek(0, 0)
if err != nil {
return err
}
fr := bufio.NewReader(t.index)
var (
start = time.Now()
batchSize = int64(indexEntrySize * 1024 * 1024)
buffer = make([]byte, batchSize) // pre-allocate for batch reading
prev indexEntry
head indexEntry
readOffset int64
consumed int64
read = func(offset int64) (indexEntry, error) {
if offset+indexEntrySize > size {
return indexEntry{}, fmt.Errorf("slice bounds out of range, offset: %d, size: %d", offset, size)
start = time.Now()
buff = make([]byte, indexEntrySize)
prev indexEntry
head indexEntry
read = func() (indexEntry, error) {
n, err := fr.Read(buff)
if err != nil {
return indexEntry{}, err
}
if offset >= readOffset {
n, err := t.index.ReadAt(buffer, readOffset)
if err != nil && !errors.Is(err, io.EOF) {
return indexEntry{}, err
}
expect := batchSize
if size-readOffset < batchSize {
expect = size - readOffset
}
if expect != int64(n) {
return indexEntry{}, fmt.Errorf("failed to read from index, want: %d, got: %d", expect, n)
}
consumed = readOffset
readOffset += int64(n)
if n != indexEntrySize {
return indexEntry{}, fmt.Errorf("failed to read from index, n: %d", n)
}
var entry indexEntry
entry.unmarshalBinary(buffer[offset-consumed : offset-consumed+indexEntrySize])
entry.unmarshalBinary(buff)
return entry, nil
}
truncate = func(offset int64) error {
@ -443,7 +437,7 @@ func (t *freezerTable) repairIndex() error {
}
)
for offset := int64(0); offset < size; offset += indexEntrySize {
entry, err := read(offset)
entry, err := read()
if err != nil {
return err
}
@ -451,9 +445,11 @@ func (t *freezerTable) repairIndex() error {
head = entry
continue
}
// ensure the first non-head index denotes to the earliest file
// Ensure that the first non-head index refers to the earliest file,
// or the next file if the earliest file is not sufficient to
// place the first item.
if offset == indexEntrySize {
if entry.filenum != head.filenum {
if entry.filenum != head.filenum && entry.filenum != head.filenum+1 {
return truncate(offset)
}
prev = entry
@ -465,6 +461,13 @@ func (t *freezerTable) repairIndex() error {
}
prev = entry
}
// Move the read cursor to the end of the file. While theoretically, the
// cursor should reach the end by reading all the items in the file, perform
// the seek operation anyway as a precaution.
_, err = t.index.Seek(0, io.SeekEnd)
if err != nil {
return err
}
log.Debug("Verified index file", "items", size/indexEntrySize, "elapsed", common.PrettyDuration(time.Since(start)))
return nil
}

@ -1367,3 +1367,69 @@ func TestRandom(t *testing.T) {
t.Fatal(err)
}
}
func TestIndexValidation(t *testing.T) {
const (
items = 30
dataSize = 10
)
garbage := indexEntry{
filenum: 100,
offset: 200,
}
var cases = []struct {
offset int64
data []byte
expItems int
}{
// extend index file with zero bytes at the end
{
offset: (items + 1) * indexEntrySize,
data: make([]byte, indexEntrySize),
expItems: 30,
},
// write garbage in the first non-head item
{
offset: indexEntrySize,
data: garbage.append(nil),
expItems: 0,
},
// write garbage in the first non-head item
{
offset: (items/2 + 1) * indexEntrySize,
data: garbage.append(nil),
expItems: items / 2,
},
}
for _, c := range cases {
fn := fmt.Sprintf("t-%d", rand.Uint64())
f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 100, true, false)
if err != nil {
t.Fatal(err)
}
writeChunks(t, f, items, dataSize)
// write corrupted data
f.index.WriteAt(c.data, c.offset)
f.Close()
// reopen the table, corruption should be truncated
f, err = newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 100, true, false)
if err != nil {
t.Fatal(err)
}
for i := 0; i < c.expItems; i++ {
exp := getChunk(10, i)
got, err := f.Retrieve(uint64(i))
if err != nil {
t.Fatalf("Failed to read from table, %v", err)
}
if !bytes.Equal(exp, got) {
t.Fatalf("Unexpected item data, want: %v, got: %v", exp, got)
}
}
if f.items.Load() != uint64(c.expItems) {
t.Fatalf("Unexpected item number, want: %d, got: %d", c.expItems, f.items.Load())
}
}
}

@ -229,8 +229,10 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter,
)
// Explicitly sync the state freezer, ensuring that all written
// data is transferred to disk before updating the key-value store.
if err := freezer.Sync(); err != nil {
return err
if freezer != nil {
if err := freezer.Sync(); err != nil {
return err
}
}
nodes := writeNodes(batch, b.nodes, clean)
rawdb.WritePersistentStateID(batch, id)

Loading…
Cancel
Save