From 331de17e4d773803c0d507bd574361f777acdf57 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Sun, 14 Apr 2019 21:25:32 +0200 Subject: [PATCH] core/rawdb: support starting offset for future deletion --- core/rawdb/freezer_table.go | 65 ++++++++++++-- core/rawdb/freezer_table_test.go | 140 ++++++++++++++++++++++++++----- 2 files changed, 175 insertions(+), 30 deletions(-) diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 8e117301b2..93636a5ba7 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -81,9 +81,14 @@ type freezerTable struct { head *os.File // File descriptor for the data head of the table files map[uint32]*os.File // open files headId uint32 // number of the currently active head file + tailId uint32 // number of the earliest file index *os.File // File descriptor for the indexEntry file of the table - items uint64 // Number of items stored in the table + // In the case that old items are deleted (from the tail), we use itemOffset + // to count how many historic items have gone missing. + items uint64 // Number of items stored in the table (including items removed from tail) + itemOffset uint32 // Offset (number of discarded items) + headBytes uint32 // Number of bytes written to the head file readMeter metrics.Meter // Meter for measuring the effective amount of data read writeMeter metrics.Meter // Meter for measuring the effective amount of data written @@ -164,10 +169,19 @@ func (t *freezerTable) repair() error { // Open the head file var ( + firstIndex indexEntry lastIndex indexEntry contentSize int64 contentExp int64 ) + // Read index zero, determine what file is the earliest + // and what item offset to use + t.index.ReadAt(buffer, 0) + firstIndex.unmarshalBinary(buffer) + + t.tailId = firstIndex.offset + t.itemOffset = firstIndex.filenum + t.index.ReadAt(buffer, offsetsSize-indexEntrySize) lastIndex.unmarshalBinary(buffer) t.head, err = t.openFile(lastIndex.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND) @@ -225,7 +239,7 @@ func (t *freezerTable) repair() error { return err } // Update the item and byte counters and return - t.items = uint64(offsetsSize/indexEntrySize - 1) // last indexEntry points to the end of the data file + t.items = uint64(t.itemOffset) + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file t.headBytes = uint32(contentSize) t.headId = lastIndex.filenum @@ -245,7 +259,7 @@ func (t *freezerTable) preopen() (err error) { // The repair might have already opened (some) files t.releaseFilesAfter(0, false) // Open all except head in RDONLY - for i := uint32(0); i < t.headId; i++ { + for i := uint32(t.tailId); i < t.headId; i++ { if _, err = t.openFile(i, os.O_RDONLY); err != nil { return err } @@ -259,7 +273,8 @@ func (t *freezerTable) preopen() (err error) { func (t *freezerTable) truncate(items uint64) error { t.lock.Lock() defer t.lock.Unlock() - // If out item count is corrent, don't do anything + + // If our item count is correct, don't do anything if atomic.LoadUint64(&t.items) <= items { return nil } @@ -275,6 +290,7 @@ func (t *freezerTable) truncate(items uint64) error { } var expected indexEntry expected.unmarshalBinary(buffer) + // We might need to truncate back to older files if expected.filenum != t.headId { // If already open for reading, force-reopen for writing @@ -290,7 +306,6 @@ func (t *freezerTable) truncate(items uint64) error { t.head = newHead atomic.StoreUint32(&t.headId, expected.filenum) } - if err := t.head.Truncate(int64(expected.offset)); err != nil { return err } @@ -330,9 +345,9 @@ func (t *freezerTable) openFile(num uint32, flag int) (f *os.File, err error) { if f, exist = t.files[num]; !exist { var name string if t.noCompression { - name = fmt.Sprintf("%s.%d.rdat", t.name, num) + name = fmt.Sprintf("%s.%04d.rdat", t.name, num) } else { - name = fmt.Sprintf("%s.%d.cdat", t.name, num) + name = fmt.Sprintf("%s.%04d.cdat", t.name, num) } f, err = os.OpenFile(filepath.Join(t.path, name), flag, 0644) if err != nil { @@ -376,11 +391,13 @@ func (t *freezerTable) Append(item uint64, blob []byte) error { t.lock.RLock() // Ensure the table is still accessible if t.index == nil || t.head == nil { + t.lock.RUnlock() return errClosed } // Ensure only the next item can be written, nothing else if atomic.LoadUint64(&t.items) != item { - panic(fmt.Sprintf("appending unexpected item: want %d, have %d", t.items, item)) + t.lock.RUnlock() + return fmt.Errorf("appending unexpected item: want %d, have %d", t.items, item) } // Encode the blob and write it into the data file if !t.noCompression { @@ -461,13 +478,20 @@ func (t *freezerTable) Retrieve(item uint64) ([]byte, error) { if atomic.LoadUint64(&t.items) <= item { return nil, errOutOfBounds } + // Ensure the item was not deleted from the tail either + offset := atomic.LoadUint32(&t.itemOffset) + if uint64(offset) > item { + return nil, errOutOfBounds + } t.lock.RLock() - startOffset, endOffset, filenum, err := t.getBounds(item) + startOffset, endOffset, filenum, err := t.getBounds(item - uint64(offset)) if err != nil { + t.lock.RUnlock() return nil, err } dataFile, exist := t.files[filenum] if !exist { + t.lock.RUnlock() return nil, fmt.Errorf("missing data file %d", filenum) } // Retrieve the data itself, decompress and return @@ -499,3 +523,26 @@ func (t *freezerTable) Sync() error { } return t.head.Sync() } + +// printIndex is a debug print utility function for testing +func (t *freezerTable) printIndex() { + buf := make([]byte, indexEntrySize) + + fmt.Printf("|-----------------|\n") + fmt.Printf("| fileno | offset |\n") + fmt.Printf("|--------+--------|\n") + + for i := uint64(0); ; i++ { + if _, err := t.index.ReadAt(buf, int64(i*indexEntrySize)); err != nil { + break + } + var entry indexEntry + entry.unmarshalBinary(buf) + fmt.Printf("| %03d | %03d | \n", entry.filenum, entry.offset) + if i > 100 { + fmt.Printf(" ... \n") + break + } + } + fmt.Printf("|-----------------|\n") +} diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index d6ce6e93c0..9a7eec5054 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -19,12 +19,13 @@ package rawdb import ( "bytes" "fmt" - "github.com/ethereum/go-ethereum/metrics" "math/rand" "os" "path/filepath" "testing" "time" + + "github.com/ethereum/go-ethereum/metrics" ) func init() { @@ -32,10 +33,10 @@ func init() { } // Gets a chunk of data, filled with 'b' -func getChunk(size int, b byte) []byte { +func getChunk(size int, b int) []byte { data := make([]byte, size) for i, _ := range data { - data[i] = b + data[i] = byte(b) } return data } @@ -61,7 +62,7 @@ func TestFreezerBasics(t *testing.T) { } defer f.Close() // Write 15 bytes 255 times, results in 85 files - for x := byte(0); x < 255; x++ { + for x := 0; x < 255; x++ { data := getChunk(15, x) f.Append(uint64(x), data) } @@ -74,7 +75,7 @@ func TestFreezerBasics(t *testing.T) { //db[1] = 010101010101010101010101010101 //db[2] = 020202020202020202020202020202 - for y := byte(0); y < 255; y++ { + for y := 0; y < 255; y++ { exp := getChunk(15, y) got, err := f.Retrieve(uint64(y)) if err != nil { @@ -84,6 +85,11 @@ func TestFreezerBasics(t *testing.T) { t.Fatalf("test %d, got \n%x != \n%x", y, got, exp) } } + // Check that we cannot read too far + _, err = f.Retrieve(uint64(255)) + if err != errOutOfBounds { + t.Fatal(err) + } } // TestFreezerBasicsClosing tests same as TestFreezerBasics, but also closes and reopens the freezer between @@ -102,18 +108,15 @@ func TestFreezerBasicsClosing(t *testing.T) { t.Fatal(err) } // Write 15 bytes 255 times, results in 85 files - for x := byte(0); x < 255; x++ { + for x := 0; x < 255; x++ { data := getChunk(15, x) f.Append(uint64(x), data) f.Close() f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true) - if err != nil { - t.Fatal(err) - } } defer f.Close() - for y := byte(0); y < 255; y++ { + for y := 0; y < 255; y++ { exp := getChunk(15, y) got, err := f.Retrieve(uint64(y)) if err != nil { @@ -142,7 +145,7 @@ func TestFreezerRepairDanglingHead(t *testing.T) { t.Fatal(err) } // Write 15 bytes 255 times - for x := byte(0); x < 0xff; x++ { + for x := 0; x < 255; x++ { data := getChunk(15, x) f.Append(uint64(x), data) } @@ -190,7 +193,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { t.Fatal(err) } // Write 15 bytes 255 times - for x := byte(0); x < 0xff; x++ { + for x := 0; x < 0xff; x++ { data := getChunk(15, x) f.Append(uint64(x), data) } @@ -223,7 +226,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { t.Errorf("Expected error for missing index entry") } // We should now be able to store items again, from item = 1 - for x := byte(1); x < 0xff; x++ { + for x := 1; x < 0xff; x++ { data := getChunk(15, ^x) f.Append(uint64(x), data) } @@ -232,7 +235,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { // And if we open it, we should now be able to read all of them (new values) { f, _ := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) - for y := byte(1); y < 255; y++ { + for y := 1; y < 255; y++ { exp := getChunk(15, ^y) got, err := f.Retrieve(uint64(y)) if err != nil { @@ -257,7 +260,7 @@ func TestSnappyDetection(t *testing.T) { t.Fatal(err) } // Write 15 bytes 255 times - for x := byte(0); x < 0xff; x++ { + for x := 0; x < 0xff; x++ { data := getChunk(15, x) f.Append(uint64(x), data) } @@ -308,7 +311,7 @@ func TestFreezerRepairDanglingIndex(t *testing.T) { t.Fatal(err) } // Write 15 bytes 9 times : 150 bytes - for x := byte(0); x < 9; x++ { + for x := 0; x < 9; x++ { data := getChunk(15, x) f.Append(uint64(x), data) } @@ -321,7 +324,7 @@ func TestFreezerRepairDanglingIndex(t *testing.T) { // File sizes should be 45, 45, 45 : items[3, 3, 3) } // Crop third file - fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.2.rdat", fname)) + fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.0002.rdat", fname)) // Truncate third file: 45 ,45, 20 { if err := assertFileSize(fileToCrop, 45); err != nil { @@ -365,7 +368,7 @@ func TestFreezerTruncate(t *testing.T) { t.Fatal(err) } // Write 15 bytes 30 times - for x := byte(0); x < 30; x++ { + for x := 0; x < 30; x++ { data := getChunk(15, x) f.Append(uint64(x), data) } @@ -416,7 +419,7 @@ func TestFreezerRepairFirstFile(t *testing.T) { f.Close() } // Truncate the file in half - fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.1.rdat", fname)) + fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.0001.rdat", fname)) { if err := assertFileSize(fileToCrop, 40); err != nil { t.Fatal(err) @@ -463,7 +466,7 @@ func TestFreezerReadAndTruncate(t *testing.T) { t.Fatal(err) } // Write 15 bytes 30 times - for x := byte(0); x < 30; x++ { + for x := 0; x < 30; x++ { data := getChunk(15, x) f.Append(uint64(x), data) } @@ -489,7 +492,7 @@ func TestFreezerReadAndTruncate(t *testing.T) { // Now, truncate back to zero f.truncate(0) // Write the data again - for x := byte(0); x < 30; x++ { + for x := 0; x < 30; x++ { data := getChunk(15, ^x) if err := f.Append(uint64(x), data); err != nil { t.Fatalf("error %v", err) @@ -499,6 +502,101 @@ func TestFreezerReadAndTruncate(t *testing.T) { } } +func TestOffset(t *testing.T) { + t.Parallel() + wm, rm := metrics.NewMeter(), metrics.NewMeter() + fname := fmt.Sprintf("offset-%d", rand.Uint64()) + { // Fill table + f, err := newCustomTable(os.TempDir(), fname, rm, wm, 40, true) + if err != nil { + t.Fatal(err) + } + // Write 6 x 20 bytes, splitting out into three files + f.Append(0, getChunk(20, 0xFF)) + f.Append(1, getChunk(20, 0xEE)) + + f.Append(2, getChunk(20, 0xdd)) + f.Append(3, getChunk(20, 0xcc)) + + f.Append(4, getChunk(20, 0xbb)) + f.Append(5, getChunk(20, 0xaa)) + f.printIndex() + f.Close() + } + // Now crop it. + { + // delete files 0 and 1 + for i := 0; i < 2; i++ { + p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.%04d.rdat", fname, i)) + if err := os.Remove(p); err != nil { + t.Fatal(err) + } + } + // Read the index file + p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.ridx", fname)) + indexFile, err := os.OpenFile(p, os.O_RDWR, 0644) + if err != nil { + t.Fatal(err) + } + indexBuf := make([]byte, 7*indexEntrySize) + indexFile.Read(indexBuf) + + // Update the index file, so that we store + // [ file = 2, offset = 4 ] at index zero + + tailId := uint32(2) // First file is 2 + itemOffset := uint32(4) // We have removed four items + zeroIndex := indexEntry{ + offset: tailId, + filenum: itemOffset, + } + buf := zeroIndex.marshallBinary() + // Overwrite index zero + copy(indexBuf, buf) + // Remove the four next indices by overwriting + copy(indexBuf[indexEntrySize:], indexBuf[indexEntrySize*5:]) + indexFile.WriteAt(indexBuf, 0) + // Need to truncate the moved index items + indexFile.Truncate(indexEntrySize * (1 + 2)) + indexFile.Close() + + } + // Now open again + { + f, err := newCustomTable(os.TempDir(), fname, rm, wm, 40, true) + if err != nil { + t.Fatal(err) + } + f.printIndex() + // It should allow writing item 6 + f.Append(6, getChunk(20, 0x99)) + + // It should be fine to fetch 4,5,6 + if got, err := f.Retrieve(4); err != nil { + t.Fatal(err) + } else if exp := getChunk(20, 0xbb); !bytes.Equal(got, exp) { + t.Fatalf("expected %x got %x", exp, got) + } + if got, err := f.Retrieve(5); err != nil { + t.Fatal(err) + } else if exp := getChunk(20, 0xaa); !bytes.Equal(got, exp) { + t.Fatalf("expected %x got %x", exp, got) + } + if got, err := f.Retrieve(6); err != nil { + t.Fatal(err) + } else if exp := getChunk(20, 0x99); !bytes.Equal(got, exp) { + t.Fatalf("expected %x got %x", exp, got) + } + + // It should error at 0, 1,2,3 + for i := 0; i < 4; i++ { + if _, err := f.Retrieve(uint64(i)); err == nil { + t.Fatal("expected err") + } + } + } +} + // TODO (?) // - test that if we remove several head-files, aswell as data last data-file, // the index is truncated accordingly