core/rawdb: introduce index flush offset

Gary Rong 2 months ago
parent e884acb1f9
commit ddefb0959f
  1. 9
      core/blockchain.go
  2. 35
      core/rawdb/ancient_scheme.go
  3. 7
      core/rawdb/ancient_utils.go
  4. 2
      core/rawdb/chain_freezer.go
  5. 11
      core/rawdb/freezer.go
  6. 2
      core/rawdb/freezer_batch.go
  7. 180
      core/rawdb/freezer_meta.go
  8. 67
      core/rawdb/freezer_meta_test.go
  9. 2
      core/rawdb/freezer_resettable.go
  10. 6
      core/rawdb/freezer_resettable_test.go
  11. 155
      core/rawdb/freezer_table.go
  12. 314
      core/rawdb/freezer_table_test.go
  13. 45
      core/rawdb/freezer_test.go

@ -111,21 +111,28 @@ const (
// * the `BlockNumber`, `TxHash`, `TxIndex`, `BlockHash` and `Index` fields of log are deleted // * the `BlockNumber`, `TxHash`, `TxIndex`, `BlockHash` and `Index` fields of log are deleted
// * the `Bloom` field of receipt is deleted // * the `Bloom` field of receipt is deleted
// * the `BlockIndex` and `TxIndex` fields of txlookup are deleted // * the `BlockIndex` and `TxIndex` fields of txlookup are deleted
//
// - Version 5 // - Version 5
// The following incompatible database changes were added: // The following incompatible database changes were added:
// * the `TxHash`, `GasCost`, and `ContractAddress` fields are no longer stored for a receipt // * the `TxHash`, `GasCost`, and `ContractAddress` fields are no longer stored for a receipt
// * the `TxHash`, `GasCost`, and `ContractAddress` fields are computed by looking up the // * the `TxHash`, `GasCost`, and `ContractAddress` fields are computed by looking up the
// receipts' corresponding block // receipts' corresponding block
//
// - Version 6 // - Version 6
// The following incompatible database changes were added: // The following incompatible database changes were added:
// * Transaction lookup information stores the corresponding block number instead of block hash // * Transaction lookup information stores the corresponding block number instead of block hash
//
// - Version 7 // - Version 7
// The following incompatible database changes were added: // The following incompatible database changes were added:
// * Use freezer as the ancient database to maintain all ancient data // * Use freezer as the ancient database to maintain all ancient data
//
// - Version 8 // - Version 8
// The following incompatible database changes were added: // The following incompatible database changes were added:
// * New scheme for contract code in order to separate the codes and trie nodes // * New scheme for contract code in order to separate the codes and trie nodes
BlockChainVersion uint64 = 8 //
// - Version 9
// * The metadata structure of freezer is changed by adding one more field 'IndexFlushOffset'
BlockChainVersion uint64 = 9
) )
// CacheConfig contains the configuration values for the trie database // CacheConfig contains the configuration values for the trie database

@ -50,10 +50,25 @@ var chainFreezerNoSnappy = map[string]bool{
ChainFreezerDifficultyTable: true, ChainFreezerDifficultyTable: true,
} }
const ( // chainFreezerSize configures the maximum size for each freezer table data files.
// stateHistoryTableSize defines the maximum size of freezer data files. var chainFreezerSize = map[string]uint32{
stateHistoryTableSize = 2 * 1000 * 1000 * 1000 // The size of each item's value is roughly 650 bytes, about 2 millions
// items per data file.
ChainFreezerHeaderTable: 2 * 1000 * 1000 * 1000,
// The size of each item’s value is fixed at 32 bytes, 2 millions items
// per data file.
ChainFreezerHashTable: 64 * 1000 * 1000,
// The size of each item’s value is less than 10 bytes, 2 millions items
// per data file.
ChainFreezerDifficultyTable: 20 * 1000 * 1000,
ChainFreezerBodiesTable: 2 * 1000 * 1000 * 1000,
ChainFreezerReceiptTable: 2 * 1000 * 1000 * 1000,
}
const (
// stateHistoryAccountIndex indicates the name of the freezer state history table. // stateHistoryAccountIndex indicates the name of the freezer state history table.
stateHistoryMeta = "history.meta" stateHistoryMeta = "history.meta"
stateHistoryAccountIndex = "account.index" stateHistoryAccountIndex = "account.index"
@ -62,6 +77,7 @@ const (
stateHistoryStorageData = "storage.data" stateHistoryStorageData = "storage.data"
) )
// stateFreezerNoSnappy configures whether compression is disabled for the state freezer.
var stateFreezerNoSnappy = map[string]bool{ var stateFreezerNoSnappy = map[string]bool{
stateHistoryMeta: true, stateHistoryMeta: true,
stateHistoryAccountIndex: false, stateHistoryAccountIndex: false,
@ -70,6 +86,17 @@ var stateFreezerNoSnappy = map[string]bool{
stateHistoryStorageData: false, stateHistoryStorageData: false,
} }
// stateFreezerSize configures the maximum size for each freezer table data files.
var stateFreezerSize = map[string]uint32{
// The size of each item's value is fixed at 73 bytes, about 2 millions
// items per data file.
stateHistoryMeta: 128 * 1000 * 1000,
stateHistoryAccountIndex: 2 * 1000 * 1000 * 1000,
stateHistoryStorageIndex: 2 * 1000 * 1000 * 1000,
stateHistoryAccountData: 2 * 1000 * 1000 * 1000,
stateHistoryStorageData: 2 * 1000 * 1000 * 1000,
}
// The list of identifiers of ancient stores. // The list of identifiers of ancient stores.
var ( var (
ChainFreezerName = "chain" // the folder name of chain segment ancient store. ChainFreezerName = "chain" // the folder name of chain segment ancient store.
@ -96,5 +123,5 @@ func NewStateFreezer(ancientDir string, verkle bool, readOnly bool) (ethdb.Reset
} else { } else {
name = filepath.Join(ancientDir, MerkleStateFreezerName) name = filepath.Join(ancientDir, MerkleStateFreezerName)
} }
return newResettableFreezer(name, "eth/db/state", readOnly, stateHistoryTableSize, stateFreezerNoSnappy) return newResettableFreezer(name, "eth/db/state", readOnly, stateFreezerSize, stateFreezerNoSnappy)
} }

@ -120,12 +120,13 @@ func InspectFreezerTable(ancient string, freezerName string, tableName string, s
var ( var (
path string path string
tables map[string]bool tables map[string]bool
sizes map[string]uint32
) )
switch freezerName { switch freezerName {
case ChainFreezerName: case ChainFreezerName:
path, tables = resolveChainFreezerDir(ancient), chainFreezerNoSnappy path, tables, sizes = resolveChainFreezerDir(ancient), chainFreezerNoSnappy, chainFreezerSize
case MerkleStateFreezerName, VerkleStateFreezerName: case MerkleStateFreezerName, VerkleStateFreezerName:
path, tables = filepath.Join(ancient, freezerName), stateFreezerNoSnappy path, tables, sizes = filepath.Join(ancient, freezerName), stateFreezerNoSnappy, stateFreezerSize
default: default:
return fmt.Errorf("unknown freezer, supported ones: %v", freezers) return fmt.Errorf("unknown freezer, supported ones: %v", freezers)
} }
@ -137,7 +138,7 @@ func InspectFreezerTable(ancient string, freezerName string, tableName string, s
} }
return fmt.Errorf("unknown table, supported ones: %v", names) return fmt.Errorf("unknown table, supported ones: %v", names)
} }
table, err := newFreezerTable(path, tableName, noSnappy, true) table, err := newFreezerTable(path, tableName, noSnappy, true, sizes[tableName])
if err != nil { if err != nil {
return err return err
} }

@ -64,7 +64,7 @@ func newChainFreezer(datadir string, namespace string, readonly bool) (*chainFre
if datadir == "" { if datadir == "" {
freezer = NewMemoryFreezer(readonly, chainFreezerNoSnappy) freezer = NewMemoryFreezer(readonly, chainFreezerNoSnappy)
} else { } else {
freezer, err = NewFreezer(datadir, namespace, readonly, freezerTableSize, chainFreezerNoSnappy) freezer, err = NewFreezer(datadir, namespace, readonly, chainFreezerSize, chainFreezerNoSnappy)
} }
if err != nil { if err != nil {
return nil, err return nil, err

@ -49,9 +49,6 @@ var (
errSymlinkDatadir = errors.New("symbolic link datadir is not supported") errSymlinkDatadir = errors.New("symbolic link datadir is not supported")
) )
// freezerTableSize defines the maximum size of freezer data files.
const freezerTableSize = 2 * 1000 * 1000 * 1000
// Freezer is an append-only database to store immutable ordered data into // Freezer is an append-only database to store immutable ordered data into
// flat files: // flat files:
// //
@ -77,7 +74,7 @@ type Freezer struct {
// //
// The 'tables' argument defines the data tables. If the value of a map // The 'tables' argument defines the data tables. If the value of a map
// entry is true, snappy compression is disabled for the table. // entry is true, snappy compression is disabled for the table.
func NewFreezer(datadir string, namespace string, readonly bool, maxTableSize uint32, tables map[string]bool) (*Freezer, error) { func NewFreezer(datadir string, namespace string, readonly bool, maxTableSize map[string]uint32, tables map[string]bool) (*Freezer, error) {
// Create the initial freezer object // Create the initial freezer object
var ( var (
readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil)
@ -116,7 +113,11 @@ func NewFreezer(datadir string, namespace string, readonly bool, maxTableSize ui
// Create the tables. // Create the tables.
for name, disableSnappy := range tables { for name, disableSnappy := range tables {
table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, maxTableSize, disableSnappy, readonly) size, exist := maxTableSize[name]
if !exist {
return nil, fmt.Errorf("table size for %q is not defined", name)
}
table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, size, disableSnappy, readonly)
if err != nil { if err != nil {
for _, table := range freezer.tables { for _, table := range freezer.tables {
table.Close() table.Close()

@ -194,6 +194,8 @@ func (batch *freezerTableBatch) commit() error {
dataSize := int64(len(batch.dataBuffer)) dataSize := int64(len(batch.dataBuffer))
batch.dataBuffer = batch.dataBuffer[:0] batch.dataBuffer = batch.dataBuffer[:0]
// index file is only fsync'd when the head data file is advanced
// for reducing the overhead.
_, err = batch.t.index.Write(batch.indexBuffer) _, err = batch.t.index.Write(batch.indexBuffer)
if err != nil { if err != nil {
return err return err

@ -17,93 +17,167 @@
package rawdb package rawdb
import ( import (
"errors"
"io" "io"
"os" "os"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
) )
const freezerVersion = 1 // The initial version tag of freezer table metadata const (
freezerTableV1 = 1 // Initial version of metadata struct
freezerTableV2 = 2 // New field: 'IndexFlushOffset'
)
// freezerTableMeta wraps all the metadata of the freezer table. // freezerTableMeta is a collection of additional properties that describe the
// freezer table. These properties are designed with error resilience, allowing
// them to be automatically corrected after an error occurs without significantly
// impacting overall correctness.
type freezerTableMeta struct { type freezerTableMeta struct {
// Version is the versioning descriptor of the freezer table. file *os.File // file handler of metadata
Version uint16 version uint16 // version descriptor of the freezer table
// VirtualTail indicates how many items have been marked as deleted. // virtualTail represents the number of items marked as deleted. It is
// Its value is equal to the number of items removed from the table // calculated as the sum of items removed from the table and the items
// plus the number of items hidden in the table, so it should never // hidden within the table, and should never be less than the "actual tail".
// be lower than the "actual tail". //
VirtualTail uint64 // If lost due to a crash or other reasons, it will be reset to the number
} // of items deleted from the table, causing the previously hidden items to
// become visible, which is an acceptable consequence.
virtualTail uint64
// newMetadata initializes the metadata object with the given virtual tail. // indexFlushOffset represents the offset in the index file up to which
func newMetadata(tail uint64) *freezerTableMeta { // all data has been flushed (fsync’d) to disk. Beyond this offset, data
return &freezerTableMeta{ // integrity is not guaranteed, and a validation process is required
Version: freezerVersion, // before using the indexes.
VirtualTail: tail, //
} // Typically, the offset refers to the location of the first index entry
// in the newest data file. However, in rare cases, this offset might point
// to an index entry in a fully flushed data file. This can happen if a
// crash occurs after the metadata offset is updated, but before the index
// file is modified during the tail truncation operation. In such cases,
// more items will be checked in index validation procedure which is an
// acceptable consequence.
indexFlushOffset uint64
} }
// readMetadata reads the metadata of the freezer table from the // decodeV1 attempts to decode the metadata structure in v1 format. If fails or
// given metadata file. // the result is incompatible, nil is returned.
func readMetadata(file *os.File) (*freezerTableMeta, error) { func decodeV1(file *os.File) *freezerTableMeta {
_, err := file.Seek(0, io.SeekStart) _, err := file.Seek(0, io.SeekStart)
if err != nil { if err != nil {
return nil, err return nil
} }
var meta freezerTableMeta type obj struct {
if err := rlp.Decode(file, &meta); err != nil { Version uint16
return nil, err Tail uint64
}
var o obj
if err := rlp.Decode(file, &o); err != nil {
return nil
}
if o.Version != freezerTableV1 {
return nil
}
return &freezerTableMeta{
file: file,
version: freezerTableV2,
virtualTail: o.Tail,
} }
return &meta, nil
} }
// writeMetadata writes the metadata of the freezer table into the // decodeV2 attempts to decode the metadata structure in v2 format. If fails or
// given metadata file. // the result is incompatible, nil is returned.
func writeMetadata(file *os.File, meta *freezerTableMeta) error { func decodeV2(file *os.File) *freezerTableMeta {
_, err := file.Seek(0, io.SeekStart) _, err := file.Seek(0, io.SeekStart)
if err != nil { if err != nil {
return err return nil
}
type obj struct {
Version uint16
Tail uint64
Offset uint64
}
var o obj
if err := rlp.Decode(file, &o); err != nil {
return nil
}
if o.Version != freezerTableV2 {
return nil
}
return &freezerTableMeta{
file: file,
version: freezerTableV2,
virtualTail: o.Tail,
indexFlushOffset: o.Offset,
} }
return rlp.Encode(file, meta)
} }
// loadMetadata loads the metadata from the given metadata file. // newMetadata initializes the metadata object, either by loading the content
// Initializes the metadata file with the given "actual tail" if // from the file or constructs a new one from scratch.
// it's empty. func newMetadata(file *os.File) (*freezerTableMeta, error) {
func loadMetadata(file *os.File, tail uint64) (*freezerTableMeta, error) {
stat, err := file.Stat() stat, err := file.Stat()
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Write the metadata with the given actual tail into metadata file
// if it's non-existent. There are two possible scenarios here:
// - the freezer table is empty
// - the freezer table is legacy
// In both cases, write the meta into the file with the actual tail
// as the virtual tail.
if stat.Size() == 0 { if stat.Size() == 0 {
m := newMetadata(tail) m := &freezerTableMeta{
if err := writeMetadata(file, m); err != nil { file: file,
version: freezerTableV2,
virtualTail: 0,
indexFlushOffset: 0,
}
if err := m.write(true); err != nil {
return nil, err return nil, err
} }
return m, nil return m, nil
} }
m, err := readMetadata(file) if m := decodeV2(file); m != nil {
if err != nil { return m, nil
return nil, err
} }
// Update the virtual tail with the given actual tail if it's even if m := decodeV1(file); m != nil {
// lower than it. Theoretically it shouldn't happen at all, print // Upgrade the existent legacy metadata to new version
// a warning here. if err := m.write(true); err != nil {
if m.VirtualTail < tail {
log.Warn("Updated virtual tail", "have", m.VirtualTail, "now", tail)
m.VirtualTail = tail
if err := writeMetadata(file, m); err != nil {
return nil, err return nil, err
} }
return m, nil
}
return nil, errors.New("failed to decode metadata")
}
// setVirtualTail sets the virtual tail and flushes the metadata.
func (m *freezerTableMeta) setVirtualTail(tail uint64, sync bool) error {
m.virtualTail = tail
return m.write(sync)
}
// setIndexFlushOffset sets the flush offset and flushes the metadata.
func (m *freezerTableMeta) setIndexFlushOffset(offset uint64, sync bool) error {
m.indexFlushOffset = offset
return m.write(sync)
}
// write flushes the content of metadata into file and performs a fsync if required.
func (m *freezerTableMeta) write(sync bool) error {
type obj struct {
Version uint16
Tail uint64
Offset uint64
}
var o obj
o.Version = m.version
o.Tail = m.virtualTail
o.Offset = m.indexFlushOffset
_, err := m.file.Seek(0, io.SeekStart)
if err != nil {
return err
}
if err := rlp.Encode(m.file, &o); err != nil {
return err
}
if !sync {
return nil
} }
return m, nil return m.file.Sync()
} }

@ -19,6 +19,8 @@ package rawdb
import ( import (
"os" "os"
"testing" "testing"
"github.com/ethereum/go-ethereum/rlp"
) )
func TestReadWriteFreezerTableMeta(t *testing.T) { func TestReadWriteFreezerTableMeta(t *testing.T) {
@ -27,36 +29,81 @@ func TestReadWriteFreezerTableMeta(t *testing.T) {
t.Fatalf("Failed to create file %v", err) t.Fatalf("Failed to create file %v", err)
} }
defer f.Close() defer f.Close()
err = writeMetadata(f, newMetadata(100))
meta, err := newMetadata(f)
if err != nil { if err != nil {
t.Fatalf("Failed to write metadata %v", err) t.Fatalf("Failed to new metadata %v", err)
} }
meta, err := readMetadata(f) meta.setVirtualTail(100, false)
meta, err = newMetadata(f)
if err != nil { if err != nil {
t.Fatalf("Failed to read metadata %v", err) t.Fatalf("Failed to reload metadata %v", err)
} }
if meta.Version != freezerVersion { if meta.version != freezerTableV2 {
t.Fatalf("Unexpected version field") t.Fatalf("Unexpected version field")
} }
if meta.VirtualTail != uint64(100) { if meta.virtualTail != uint64(100) {
t.Fatalf("Unexpected virtual tail field") t.Fatalf("Unexpected virtual tail field")
} }
} }
func TestInitializeFreezerTableMeta(t *testing.T) { func TestUpgradeMetadata(t *testing.T) {
f, err := os.CreateTemp(t.TempDir(), "*") f, err := os.CreateTemp(t.TempDir(), "*")
if err != nil { if err != nil {
t.Fatalf("Failed to create file %v", err) t.Fatalf("Failed to create file %v", err)
} }
defer f.Close() defer f.Close()
meta, err := loadMetadata(f, uint64(100))
// Write legacy metadata into file
type obj struct {
Version uint16
Tail uint64
}
var o obj
o.Version = freezerTableV1
o.Tail = 100
if err := rlp.Encode(f, &o); err != nil {
t.Fatalf("Failed to encode %v", err)
}
// Reload the metadata, a silent upgrade is expected
meta, err := newMetadata(f)
if err != nil { if err != nil {
t.Fatalf("Failed to read metadata %v", err) t.Fatalf("Failed to read metadata %v", err)
} }
if meta.Version != freezerVersion { if meta.version != freezerTableV2 {
t.Fatalf("Unexpected version field") t.Fatalf("Unexpected version field")
} }
if meta.VirtualTail != uint64(100) { if meta.virtualTail != uint64(100) {
t.Fatalf("Unexpected virtual tail field") t.Fatalf("Unexpected virtual tail field")
} }
} }
func TestInvalidMetadata(t *testing.T) {
f, err := os.CreateTemp(t.TempDir(), "*")
if err != nil {
t.Fatalf("Failed to create file %v", err)
}
defer f.Close()
// Write invalid legacy metadata into file
type obj struct {
Version uint16
Tail uint64
}
var o obj
o.Version = freezerTableV2 // -> invalid version tag
o.Tail = 100
if err := rlp.Encode(f, &o); err != nil {
t.Fatalf("Failed to encode %v", err)
}
// Reload the metadata, a silent upgrade is expected
_, err = newMetadata(f)
if err == nil {
t.Fatal("Unexpected success")
}
}

@ -49,7 +49,7 @@ type resettableFreezer struct {
// //
// The reset function will delete directory atomically and re-create the // The reset function will delete directory atomically and re-create the
// freezer from scratch. // freezer from scratch.
func newResettableFreezer(datadir string, namespace string, readonly bool, maxTableSize uint32, tables map[string]bool) (*resettableFreezer, error) { func newResettableFreezer(datadir string, namespace string, readonly bool, maxTableSize map[string]uint32, tables map[string]bool) (*resettableFreezer, error) {
if err := cleanup(datadir); err != nil { if err := cleanup(datadir); err != nil {
return nil, err return nil, err
} }

@ -33,7 +33,7 @@ func TestResetFreezer(t *testing.T) {
{1, bytes.Repeat([]byte{1}, 2048)}, {1, bytes.Repeat([]byte{1}, 2048)},
{2, bytes.Repeat([]byte{2}, 2048)}, {2, bytes.Repeat([]byte{2}, 2048)},
} }
f, _ := newResettableFreezer(t.TempDir(), "", false, 2048, freezerTestTableDef) f, _ := newResettableFreezer(t.TempDir(), "", false, freezerTestTableSize, freezerTestTableDef)
defer f.Close() defer f.Close()
f.ModifyAncients(func(op ethdb.AncientWriteOp) error { f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
@ -87,7 +87,7 @@ func TestFreezerCleanup(t *testing.T) {
{2, bytes.Repeat([]byte{2}, 2048)}, {2, bytes.Repeat([]byte{2}, 2048)},
} }
datadir := t.TempDir() datadir := t.TempDir()
f, _ := newResettableFreezer(datadir, "", false, 2048, freezerTestTableDef) f, _ := newResettableFreezer(datadir, "", false, freezerTestTableSize, freezerTestTableDef)
f.ModifyAncients(func(op ethdb.AncientWriteOp) error { f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for _, item := range items { for _, item := range items {
op.AppendRaw("test", item.id, item.blob) op.AppendRaw("test", item.id, item.blob)
@ -98,7 +98,7 @@ func TestFreezerCleanup(t *testing.T) {
os.Rename(datadir, tmpName(datadir)) os.Rename(datadir, tmpName(datadir))
// Open the freezer again, trigger cleanup operation // Open the freezer again, trigger cleanup operation
f, _ = newResettableFreezer(datadir, "", false, 2048, freezerTestTableDef) f, _ = newResettableFreezer(datadir, "", false, freezerTestTableSize, freezerTestTableDef)
f.Close() f.Close()
if _, err := os.Lstat(tmpName(datadir)); !os.IsNotExist(err) { if _, err := os.Lstat(tmpName(datadir)); !os.IsNotExist(err) {

@ -108,11 +108,12 @@ type freezerTable struct {
head *os.File // File descriptor for the data head of the table head *os.File // File descriptor for the data head of the table
index *os.File // File descriptor for the indexEntry file of the table index *os.File // File descriptor for the indexEntry file of the table
meta *os.File // File descriptor for metadata of the table
files map[uint32]*os.File // open files files map[uint32]*os.File // open files
headId uint32 // number of the currently active head file headId uint32 // number of the currently active head file
tailId uint32 // number of the earliest file tailId uint32 // number of the earliest file
metadata *freezerTableMeta // metadata of the table
headBytes int64 // Number of bytes written to the head file headBytes int64 // Number of bytes written to the head file
readMeter metrics.Meter // Meter for measuring the effective amount of data read readMeter metrics.Meter // Meter for measuring the effective amount of data read
writeMeter metrics.Meter // Meter for measuring the effective amount of data written writeMeter metrics.Meter // Meter for measuring the effective amount of data written
@ -123,8 +124,8 @@ type freezerTable struct {
} }
// newFreezerTable opens the given path as a freezer table. // newFreezerTable opens the given path as a freezer table.
func newFreezerTable(path, name string, disableSnappy, readonly bool) (*freezerTable, error) { func newFreezerTable(path, name string, disableSnappy, readonly bool, maxSize uint32) (*freezerTable, error) {
return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, freezerTableSize, disableSnappy, readonly) return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, maxSize, disableSnappy, readonly)
} }
// newTable opens a freezer table, creating the data and index files if they are // newTable opens a freezer table, creating the data and index files if they are
@ -166,10 +167,15 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr
return nil, err return nil, err
} }
} }
// Load metadata from the file
metadata, err := newMetadata(meta)
if err != nil {
return nil, err
}
// Create the table and repair any past inconsistency // Create the table and repair any past inconsistency
tab := &freezerTable{ tab := &freezerTable{
index: index, index: index,
meta: meta, metadata: metadata,
files: make(map[uint32]*os.File), files: make(map[uint32]*os.File),
readMeter: readMeter, readMeter: readMeter,
writeMeter: writeMeter, writeMeter: writeMeter,
@ -222,8 +228,9 @@ func (t *freezerTable) repair() error {
} // New file can't trigger this path } // New file can't trigger this path
} }
// Validate the index file as it might contain some garbage data after the // Validate the index file as it might contain some garbage data after the
// power failures. // power failures. The data before the 'IndexFlushOffset' are considered
if err := t.repairIndex(); err != nil { // as fully flushed and can be skipped for verification.
if err := t.repairIndex(int64(t.metadata.indexFlushOffset)); err != nil {
return err return err
} }
// Retrieve the file sizes and prepare for truncation. Note the file size // Retrieve the file sizes and prepare for truncation. Note the file size
@ -253,12 +260,14 @@ func (t *freezerTable) repair() error {
t.tailId = firstIndex.filenum t.tailId = firstIndex.filenum
t.itemOffset.Store(uint64(firstIndex.offset)) t.itemOffset.Store(uint64(firstIndex.offset))
// Load metadata from the file // Adjust the number of hidden items if it is less than the number of items
meta, err := loadMetadata(t.meta, t.itemOffset.Load()) // being removed.
if err != nil { if t.itemOffset.Load() > t.metadata.virtualTail {
return err if err := t.metadata.setVirtualTail(t.itemOffset.Load(), true); err != nil {
return err
}
} }
t.itemHidden.Store(meta.VirtualTail) t.itemHidden.Store(t.metadata.virtualTail)
// Read the last index, use the default value in case the freezer is empty // Read the last index, use the default value in case the freezer is empty
if offsetsSize == indexEntrySize { if offsetsSize == indexEntrySize {
@ -304,6 +313,17 @@ func (t *freezerTable) repair() error {
// Truncate the index to point within the head file // Truncate the index to point within the head file
if contentExp > contentSize { if contentExp > contentSize {
t.logger.Warn("Truncating dangling indexes", "indexes", offsetsSize/indexEntrySize, "indexed", contentExp, "stored", contentSize) t.logger.Warn("Truncating dangling indexes", "indexes", offsetsSize/indexEntrySize, "indexed", contentExp, "stored", contentSize)
// If table truncation removes all items after the indexFlushOffset, reset it
// to zero to force a full validation of the index file on the next restart.
//
// This operation should be performed before modifying the index file. In the
// worst-case scenario, the offset modification could be persisted without any
// changes to the index file due to an unexpected power failure, resulting in
// some additional validation workload, which is an acceptable consequence.
if t.metadata.indexFlushOffset < uint64(offsetsSize-indexEntrySize) {
t.metadata.setIndexFlushOffset(0, true)
}
if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil { if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil {
return err return err
} }
@ -345,9 +365,6 @@ func (t *freezerTable) repair() error {
if err := t.head.Sync(); err != nil { if err := t.head.Sync(); err != nil {
return err return err
} }
if err := t.meta.Sync(); err != nil {
return err
}
} }
// Update the item and byte counters and return // Update the item and byte counters and return
t.items.Store(t.itemOffset.Load() + uint64(offsetsSize/indexEntrySize-1)) // last indexEntry points to the end of the data file t.items.Store(t.itemOffset.Load() + uint64(offsetsSize/indexEntrySize-1)) // last indexEntry points to the end of the data file
@ -392,7 +409,7 @@ func (t *freezerTable) repair() error {
// leftover garbage or if all items in the table have zero size is impossible. // leftover garbage or if all items in the table have zero size is impossible.
// In such instances, the file will remain unchanged to prevent potential data // In such instances, the file will remain unchanged to prevent potential data
// loss or misinterpretation. // loss or misinterpretation.
func (t *freezerTable) repairIndex() error { func (t *freezerTable) repairIndex(checkOffset int64) error {
// Retrieve the file sizes and prepare for validation // Retrieve the file sizes and prepare for validation
stat, err := t.index.Stat() stat, err := t.index.Stat()
if err != nil { if err != nil {
@ -400,8 +417,19 @@ func (t *freezerTable) repairIndex() error {
} }
size := stat.Size() size := stat.Size()
// Move the read cursor to the beginning of the file // Short circuit if the specified start offset is out of range.
_, err = t.index.Seek(0, io.SeekStart) if checkOffset > size {
return fmt.Errorf("index check offset out of range, offset: %d, size: %d", checkOffset, size)
}
// Short circuit if the offset points to the end of index file,
// it could happen when all items in the latest data file are
// truncated. In this case, all the items in the index file are
// fully synced and nothing to validate.
if checkOffset == size {
return nil
}
// Move the read cursor to the specified offset
_, err = t.index.Seek(checkOffset, io.SeekStart)
if err != nil { if err != nil {
return err return err
} }
@ -411,7 +439,6 @@ func (t *freezerTable) repairIndex() error {
start = time.Now() start = time.Now()
buff = make([]byte, indexEntrySize) buff = make([]byte, indexEntrySize)
prev indexEntry prev indexEntry
head indexEntry
read = func() (indexEntry, error) { read = func() (indexEntry, error) {
n, err := io.ReadFull(fr, buff) n, err := io.ReadFull(fr, buff)
@ -436,27 +463,27 @@ func (t *freezerTable) repairIndex() error {
return nil return nil
} }
) )
for offset := int64(0); offset < size; offset += indexEntrySize { for offset := checkOffset; offset < size; offset += indexEntrySize {
entry, err := read() entry, err := read()
if err != nil { if err != nil {
return err return err
} }
if offset == 0 { if offset == checkOffset {
head = entry prev = entry
continue continue
} }
// Ensure that the first non-head index refers to the earliest file, // Specialize if the index validation starts from zero, in which the "offset"
// or the next file if the earliest file has no space to place the // field of the first index entry represents the number of deleted items
// first item. // from the tail, rather the offset in the data file. Therefore, skip the
// offset validation for the first two entries.
if offset == indexEntrySize { if offset == indexEntrySize {
if entry.filenum != head.filenum && entry.filenum != head.filenum+1 { if entry.filenum != prev.filenum && entry.filenum != prev.filenum+1 {
log.Error("Corrupted index item detected", "earliest", head.filenum, "filenumber", entry.filenum) log.Error("Corrupted index item detected", "earliest", prev.filenum, "next", entry.filenum)
return truncate(offset) return truncate(offset)
} }
prev = entry prev = entry
continue continue
} }
// ensure two consecutive index items are in order
if err := t.checkIndexItems(prev, entry); err != nil { if err := t.checkIndexItems(prev, entry); err != nil {
log.Error("Corrupted index item detected", "err", err) log.Error("Corrupted index item detected", "err", err)
return truncate(offset) return truncate(offset)
@ -470,7 +497,7 @@ func (t *freezerTable) repairIndex() error {
if err != nil { if err != nil {
return err return err
} }
log.Debug("Verified index file", "items", size/indexEntrySize, "elapsed", common.PrettyDuration(time.Since(start))) log.Debug("Verified index file", "items", (size-checkOffset)/indexEntrySize, "elapsed", common.PrettyDuration(time.Since(start)))
return nil return nil
} }
@ -550,6 +577,17 @@ func (t *freezerTable) truncateHead(items uint64) error {
// Truncate the index file first, the tail position is also considered // Truncate the index file first, the tail position is also considered
// when calculating the new freezer table length. // when calculating the new freezer table length.
length := items - t.itemOffset.Load() length := items - t.itemOffset.Load()
// If head truncation removes all items after the indexFlushOffset, reset it
// to zero to force a full validation of the index file on the next restart.
//
// This operation should be performed before modifying the index file. In the
// worst-case scenario, the offset modification could be persisted without any
// changes to the index file due to an unexpected power failure, resulting in
// some additional validation workload, which is an acceptable consequence.
if t.metadata.indexFlushOffset > (length+1)*indexEntrySize {
t.metadata.setIndexFlushOffset(0, true)
}
if err := truncateFreezerFile(t.index, int64(length+1)*indexEntrySize); err != nil { if err := truncateFreezerFile(t.index, int64(length+1)*indexEntrySize); err != nil {
return err return err
} }
@ -652,7 +690,10 @@ func (t *freezerTable) truncateTail(items uint64) error {
} }
// Update the virtual tail marker and hidden these entries in table. // Update the virtual tail marker and hidden these entries in table.
t.itemHidden.Store(items) t.itemHidden.Store(items)
if err := writeMetadata(t.meta, newMetadata(items)); err != nil {
// Update the virtual tail without fsync, otherwise it will significantly
// impact the overall performance.
if err := t.metadata.setVirtualTail(items, false); err != nil {
return err return err
} }
// Hidden items still fall in the current tail file, no data file // Hidden items still fall in the current tail file, no data file
@ -681,10 +722,24 @@ func (t *freezerTable) truncateTail(items uint64) error {
} }
newDeleted = current newDeleted = current
} }
// Commit the changes of metadata file first before manipulating // Move backward the index flush offset due to the index segment deletion.
// the indexes file. //
if err := t.meta.Sync(); err != nil { // This operation should be performed before modifying the index file. In the
return err // worst-case scenario, the offset modification could be persisted without any
// changes to the index file due to an unexpected power failure, resulting in
// some additional validation workload, which is an acceptable consequence.
shorten := indexEntrySize * (newDeleted - deleted)
if t.metadata.indexFlushOffset <= shorten {
// It's never expected to happen, reset the flush offset to zero just
// in case.
t.logger.Error("Reset the index flush offset", "current", t.metadata.indexFlushOffset, "shorten", shorten)
if err := t.metadata.setIndexFlushOffset(0, true); err != nil {
return err
}
} else {
if err := t.metadata.setIndexFlushOffset(t.metadata.indexFlushOffset-shorten, true); err != nil {
return err
}
} }
// Close the index file before shorten it. // Close the index file before shorten it.
if err := t.index.Close(); err != nil { if err := t.index.Close(); err != nil {
@ -746,7 +801,7 @@ func (t *freezerTable) Close() error {
// Trying to fsync a file opened in rdonly causes "Access denied" // Trying to fsync a file opened in rdonly causes "Access denied"
// error on Windows. // error on Windows.
doClose(t.index, true, true) doClose(t.index, true, true)
doClose(t.meta, true, true) doClose(t.metadata.file, true, true)
// The preopened non-head data-files are all opened in readonly. // The preopened non-head data-files are all opened in readonly.
// The head is opened in rw-mode, so we sync it here - but since it's also // The head is opened in rw-mode, so we sync it here - but since it's also
@ -757,7 +812,6 @@ func (t *freezerTable) Close() error {
doClose(f, false, true) // close but do not sync doClose(f, false, true) // close but do not sync
} }
t.index = nil t.index = nil
t.meta = nil
t.head = nil t.head = nil
if errs != nil { if errs != nil {
@ -917,7 +971,7 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i
defer t.lock.RUnlock() defer t.lock.RUnlock()
// Ensure the table and the item are accessible // Ensure the table and the item are accessible
if t.index == nil || t.head == nil || t.meta == nil { if t.index == nil || t.head == nil {
return nil, nil, errClosed return nil, nil, errClosed
} }
var ( var (
@ -1042,6 +1096,21 @@ func (t *freezerTable) advanceHead() error {
t.lock.Lock() t.lock.Lock()
defer t.lock.Unlock() defer t.lock.Unlock()
// Flush the index file content into disk whenever the head is advanced.
// It should be performed before updating the index flush offset.
if err := t.index.Sync(); err != nil {
return err
}
// Move forward the flush offset to skip unnecessary validation workload
stat, err := t.index.Stat()
if err != nil {
return err
}
// Move the index flush offset forward to reduce the workload for index
// validation.
if err := t.metadata.setIndexFlushOffset(uint64(stat.Size()), true); err != nil {
return err
}
// We open the next file in truncated mode -- if this file already // We open the next file in truncated mode -- if this file already
// exists, we need to start over from scratch on it. // exists, we need to start over from scratch on it.
nextID := t.headId + 1 nextID := t.headId + 1
@ -1069,7 +1138,7 @@ func (t *freezerTable) advanceHead() error {
func (t *freezerTable) Sync() error { func (t *freezerTable) Sync() error {
t.lock.Lock() t.lock.Lock()
defer t.lock.Unlock() defer t.lock.Unlock()
if t.index == nil || t.head == nil || t.meta == nil { if t.index == nil || t.head == nil {
return errClosed return errClosed
} }
var err error var err error
@ -1078,10 +1147,9 @@ func (t *freezerTable) Sync() error {
err = e err = e
} }
} }
trackError(t.index.Sync()) trackError(t.index.Sync())
trackError(t.meta.Sync())
trackError(t.head.Sync()) trackError(t.head.Sync())
trackError(t.metadata.file.Sync())
return err return err
} }
@ -1097,13 +1165,8 @@ func (t *freezerTable) dumpIndexString(start, stop int64) string {
} }
func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) { func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) {
meta, err := readMetadata(t.meta) fmt.Fprintf(w, "Version %d count %d, deleted %d, hidden %d\n",
if err != nil { t.metadata.version, t.items.Load(), t.itemOffset.Load(), t.itemHidden.Load())
fmt.Fprintf(w, "Failed to decode freezer table %v\n", err)
return
}
fmt.Fprintf(w, "Version %d count %d, deleted %d, hidden %d\n", meta.Version,
t.items.Load(), t.itemOffset.Load(), t.itemHidden.Load())
buf := make([]byte, indexEntrySize) buf := make([]byte, indexEntrySize)

@ -206,6 +206,19 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
idxFile.Truncate(2*indexEntrySize + indexEntrySize/2) idxFile.Truncate(2*indexEntrySize + indexEntrySize/2)
idxFile.Close() idxFile.Close()
// reset the check offset
m := filepath.Join(os.TempDir(), fmt.Sprintf("%s.meta", fname))
metaFile, err := os.OpenFile(m, os.O_RDWR, 0644)
if err != nil {
t.Fatal(err)
}
meta, err := newMetadata(metaFile)
if err != nil {
t.Fatal(err)
}
meta.setIndexFlushOffset(0, true)
metaFile.Close()
// Now open it again // Now open it again
{ {
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false)
@ -262,6 +275,19 @@ func TestSnappyDetection(t *testing.T) {
f.Close() f.Close()
} }
// reset the check offset
m := filepath.Join(os.TempDir(), fmt.Sprintf("%s.meta", fname))
metaFile, err := os.OpenFile(m, os.O_RDWR, 0644)
if err != nil {
t.Fatal(err)
}
meta, err := newMetadata(metaFile)
if err != nil {
t.Fatal(err)
}
meta.setIndexFlushOffset(0, true)
metaFile.Close()
// Open without snappy // Open without snappy
{ {
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, false, false) f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, false, false)
@ -521,93 +547,53 @@ func TestFreezerOffset(t *testing.T) {
fname := fmt.Sprintf("offset-%d", rand.Uint64()) fname := fmt.Sprintf("offset-%d", rand.Uint64())
// Fill table // Fill table
{ f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false)
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) if err != nil {
if err != nil { t.Fatal(err)
t.Fatal(err)
}
// Write 6 x 20 bytes, splitting out into three files
batch := f.newBatch()
require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF)))
require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE)))
require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd)))
require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc)))
require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb)))
require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa)))
require.NoError(t, batch.commit())
t.Log(f.dumpIndexString(0, 100))
f.Close()
} }
// Now crop it. // Write 6 x 20 bytes, splitting out into three files
{ batch := f.newBatch()
// delete files 0 and 1 require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF)))
for i := 0; i < 2; i++ { require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE)))
p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.%04d.rdat", fname, i))
if err := os.Remove(p); err != nil {
t.Fatal(err)
}
}
// Read the index file
p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.ridx", fname))
indexFile, err := os.OpenFile(p, os.O_RDWR, 0644)
if err != nil {
t.Fatal(err)
}
indexBuf := make([]byte, 7*indexEntrySize)
indexFile.Read(indexBuf)
// Update the index file, so that we store
// [ file = 2, offset = 4 ] at index zero
zeroIndex := indexEntry{ require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd)))
filenum: uint32(2), // First file is 2 require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc)))
offset: uint32(4), // We have removed four items
}
buf := zeroIndex.append(nil)
// Overwrite index zero require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb)))
copy(indexBuf, buf) require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa)))
require.NoError(t, batch.commit())
// Remove the four next indices by overwriting t.Log(f.dumpIndexString(0, 100))
copy(indexBuf[indexEntrySize:], indexBuf[indexEntrySize*5:])
indexFile.WriteAt(indexBuf, 0)
// Need to truncate the moved index items // Now crop it.
indexFile.Truncate(indexEntrySize * (1 + 2)) f.truncateTail(4)
indexFile.Close() f.Close()
}
// Now open again // Now open again
{ f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false)
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) if err != nil {
if err != nil { t.Fatal(err)
t.Fatal(err) }
} t.Log(f.dumpIndexString(0, 100))
defer f.Close()
t.Log(f.dumpIndexString(0, 100))
// It should allow writing item 6. // It should allow writing item 6.
batch := f.newBatch() batch = f.newBatch()
require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99))) require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99)))
require.NoError(t, batch.commit()) require.NoError(t, batch.commit())
checkRetrieveError(t, f, map[uint64]error{ checkRetrieveError(t, f, map[uint64]error{
0: errOutOfBounds, 0: errOutOfBounds,
1: errOutOfBounds, 1: errOutOfBounds,
2: errOutOfBounds, 2: errOutOfBounds,
3: errOutOfBounds, 3: errOutOfBounds,
}) })
checkRetrieve(t, f, map[uint64][]byte{ checkRetrieve(t, f, map[uint64][]byte{
4: getChunk(20, 0xbb), 4: getChunk(20, 0xbb),
5: getChunk(20, 0xaa), 5: getChunk(20, 0xaa),
6: getChunk(20, 0x99), 6: getChunk(20, 0x99),
}) })
} f.Close()
// Edit the index again, with a much larger initial offset of 1M. // Edit the index again, with a much larger initial offset of 1M.
{ {
@ -633,6 +619,19 @@ func TestFreezerOffset(t *testing.T) {
copy(indexBuf, buf) copy(indexBuf, buf)
indexFile.WriteAt(indexBuf, 0) indexFile.WriteAt(indexBuf, 0)
indexFile.Close() indexFile.Close()
// reset the check offset
m := filepath.Join(os.TempDir(), fmt.Sprintf("%s.meta", fname))
metaFile, err := os.OpenFile(m, os.O_RDWR, 0644)
if err != nil {
t.Fatal(err)
}
meta, err := newMetadata(metaFile)
if err != nil {
t.Fatal(err)
}
meta.setIndexFlushOffset(0, true)
metaFile.Close()
} }
// Check that existing items have been moved to index 1M. // Check that existing items have been moved to index 1M.
@ -1369,45 +1368,63 @@ func TestRandom(t *testing.T) {
} }
func TestIndexValidation(t *testing.T) { func TestIndexValidation(t *testing.T) {
const ( const dataSize = 10
items = 30
dataSize = 10
)
garbage := indexEntry{ garbage := indexEntry{
filenum: 100, filenum: 100,
offset: 200, offset: 200,
} }
var cases = []struct { var cases = []struct {
offset int64 write int
data []byte offset int64
expItems int data []byte
expItems int
hasCorruption bool
}{ }{
// extend index file with zero bytes at the end // extend index file with zero bytes at the end
{ {
offset: (items + 1) * indexEntrySize, write: 5,
offset: (5 + 1) * indexEntrySize,
data: make([]byte, indexEntrySize), data: make([]byte, indexEntrySize),
expItems: 30, expItems: 5,
},
// extend index file with unaligned zero bytes at the end
{
write: 5,
offset: (5 + 1) * indexEntrySize,
data: make([]byte, indexEntrySize*1.5),
expItems: 5,
}, },
// write garbage in the first non-head item // write garbage in the first non-head item
{ {
write: 5,
offset: indexEntrySize, offset: indexEntrySize,
data: garbage.append(nil), data: garbage.append(nil),
expItems: 0, expItems: 0,
}, },
// write garbage in the first non-head item // write garbage in the middle
{ {
offset: (items/2 + 1) * indexEntrySize, write: 5,
offset: 3 * indexEntrySize,
data: garbage.append(nil), data: garbage.append(nil),
expItems: items / 2, expItems: 2,
},
// fulfill the first data file (but not yet advanced), the zero bytes
// at tail should be truncated.
{
write: 10,
offset: 11 * indexEntrySize,
data: garbage.append(nil),
expItems: 10,
}, },
} }
for _, c := range cases { for _, c := range cases {
fn := fmt.Sprintf("t-%d", rand.Uint64()) fn := fmt.Sprintf("t-%d", rand.Uint64())
f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 100, true, false) f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 10*dataSize, true, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
writeChunks(t, f, items, dataSize) writeChunks(t, f, c.write, dataSize)
// write corrupted data // write corrupted data
f.index.WriteAt(c.data, c.offset) f.index.WriteAt(c.data, c.offset)
@ -1421,10 +1438,10 @@ func TestIndexValidation(t *testing.T) {
for i := 0; i < c.expItems; i++ { for i := 0; i < c.expItems; i++ {
exp := getChunk(10, i) exp := getChunk(10, i)
got, err := f.Retrieve(uint64(i)) got, err := f.Retrieve(uint64(i))
if err != nil { if err != nil && !c.hasCorruption {
t.Fatalf("Failed to read from table, %v", err) t.Fatalf("Failed to read from table, %v", err)
} }
if !bytes.Equal(exp, got) { if !bytes.Equal(exp, got) && !c.hasCorruption {
t.Fatalf("Unexpected item data, want: %v, got: %v", exp, got) t.Fatalf("Unexpected item data, want: %v, got: %v", exp, got)
} }
} }
@ -1433,3 +1450,114 @@ func TestIndexValidation(t *testing.T) {
} }
} }
} }
func TestIndexFlushOffsetTracking(t *testing.T) {
const (
items = 35
dataSize = 10
fileSize = 100
)
fn := fmt.Sprintf("t-%d", rand.Uint64())
f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), fileSize, true, false)
if err != nil {
t.Fatal(err)
}
// Data files:
// F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(5 items, non-full)
writeChunks(t, f, items, dataSize)
var cases = []struct {
op func(*freezerTable)
offset uint64
}{
{
// Data files:
// F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(5 items, non-full)
func(f *freezerTable) {}, // no-op
31 * indexEntrySize,
},
{
// Write more items to fulfill the newest data file, but the file advance
// is not triggered.
// Data files:
// F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(10 items, full)
func(f *freezerTable) {
batch := f.newBatch()
for i := 0; i < 5; i++ {
batch.AppendRaw(items+uint64(i), make([]byte, dataSize))
}
batch.commit()
},
31 * indexEntrySize,
},
{
// Write more items to trigger the data file advance
// Data files:
// F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(1 item)
func(f *freezerTable) {
batch := f.newBatch()
batch.AppendRaw(items+5, make([]byte, dataSize))
batch.commit()
},
41 * indexEntrySize,
},
{
// Head truncate
// Data files:
// F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(0 item)
func(f *freezerTable) {
f.truncateHead(items + 5)
},
41 * indexEntrySize,
},
{
// Tail truncate
// Data files:
// F1(1 hidden, 9 visible) -> F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(0 item)
func(f *freezerTable) {
f.truncateTail(1)
},
41 * indexEntrySize,
},
{
// Tail truncate
// Data files:
// F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(0 item)
func(f *freezerTable) {
f.truncateTail(10)
},
31 * indexEntrySize,
},
{
// Tail truncate
// Data files:
// F4(10 items) -> F5(0 item)
func(f *freezerTable) {
f.truncateTail(30)
},
11 * indexEntrySize,
},
{
// Head truncate
// Data files:
// F4(9 items)
func(f *freezerTable) {
f.truncateHead(items + 4)
},
0,
},
}
for _, c := range cases {
c.op(f)
if f.metadata.indexFlushOffset != c.offset {
t.Fatalf("Unexpected index flush offset, want: %d, got: %d", c.offset, f.metadata.indexFlushOffset)
}
}
}

@ -31,7 +31,10 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
var freezerTestTableDef = map[string]bool{"test": true} var (
freezerTestTableDef = map[string]bool{"test": true}
freezerTestTableSize = map[string]uint32{"test": 2049}
)
func TestFreezerModify(t *testing.T) { func TestFreezerModify(t *testing.T) {
t.Parallel() t.Parallel()
@ -48,7 +51,7 @@ func TestFreezerModify(t *testing.T) {
} }
tables := map[string]bool{"raw": true, "rlp": false} tables := map[string]bool{"raw": true, "rlp": false}
f, _ := newFreezerForTesting(t, tables) f, _ := newFreezerForTesting(t, tables, map[string]uint32{"raw": 2049, "rlp": 2049})
defer f.Close() defer f.Close()
// Commit test data. // Commit test data.
@ -93,7 +96,7 @@ func TestFreezerModify(t *testing.T) {
func TestFreezerModifyRollback(t *testing.T) { func TestFreezerModifyRollback(t *testing.T) {
t.Parallel() t.Parallel()
f, dir := newFreezerForTesting(t, freezerTestTableDef) f, dir := newFreezerForTesting(t, freezerTestTableDef, freezerTestTableSize)
theError := errors.New("oops") theError := errors.New("oops")
_, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error { _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
@ -112,7 +115,7 @@ func TestFreezerModifyRollback(t *testing.T) {
// Reopen and check that the rolled-back data doesn't reappear. // Reopen and check that the rolled-back data doesn't reappear.
tables := map[string]bool{"test": true} tables := map[string]bool{"test": true}
f2, err := NewFreezer(dir, "", false, 2049, tables) f2, err := NewFreezer(dir, "", false, map[string]uint32{"test": 2049}, tables)
if err != nil { if err != nil {
t.Fatalf("can't reopen freezer after failed ModifyAncients: %v", err) t.Fatalf("can't reopen freezer after failed ModifyAncients: %v", err)
} }
@ -124,7 +127,7 @@ func TestFreezerModifyRollback(t *testing.T) {
func TestFreezerConcurrentModifyRetrieve(t *testing.T) { func TestFreezerConcurrentModifyRetrieve(t *testing.T) {
t.Parallel() t.Parallel()
f, _ := newFreezerForTesting(t, freezerTestTableDef) f, _ := newFreezerForTesting(t, freezerTestTableDef, freezerTestTableSize)
defer f.Close() defer f.Close()
var ( var (
@ -184,7 +187,7 @@ func TestFreezerConcurrentModifyRetrieve(t *testing.T) {
// This test runs ModifyAncients and TruncateHead concurrently with each other. // This test runs ModifyAncients and TruncateHead concurrently with each other.
func TestFreezerConcurrentModifyTruncate(t *testing.T) { func TestFreezerConcurrentModifyTruncate(t *testing.T) {
f, _ := newFreezerForTesting(t, freezerTestTableDef) f, _ := newFreezerForTesting(t, freezerTestTableDef, freezerTestTableSize)
defer f.Close() defer f.Close()
var item = make([]byte, 256) var item = make([]byte, 256)
@ -253,7 +256,7 @@ func TestFreezerReadonlyValidate(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
// Open non-readonly freezer and fill individual tables // Open non-readonly freezer and fill individual tables
// with different amount of data. // with different amount of data.
f, err := NewFreezer(dir, "", false, 2049, tables) f, err := NewFreezer(dir, "", false, map[string]uint32{"a": 2049, "b": 2049}, tables)
if err != nil { if err != nil {
t.Fatal("can't open freezer", err) t.Fatal("can't open freezer", err)
} }
@ -276,7 +279,7 @@ func TestFreezerReadonlyValidate(t *testing.T) {
// Re-opening as readonly should fail when validating // Re-opening as readonly should fail when validating
// table lengths. // table lengths.
_, err = NewFreezer(dir, "", true, 2049, tables) _, err = NewFreezer(dir, "", true, map[string]uint32{"a": 2049, "b": 2049}, tables)
if err == nil { if err == nil {
t.Fatal("readonly freezer should fail with differing table lengths") t.Fatal("readonly freezer should fail with differing table lengths")
} }
@ -288,7 +291,7 @@ func TestFreezerConcurrentReadonly(t *testing.T) {
tables := map[string]bool{"a": true} tables := map[string]bool{"a": true}
dir := t.TempDir() dir := t.TempDir()
f, err := NewFreezer(dir, "", false, 2049, tables) f, err := NewFreezer(dir, "", false, map[string]uint32{"a": 2049}, tables)
if err != nil { if err != nil {
t.Fatal("can't open freezer", err) t.Fatal("can't open freezer", err)
} }
@ -314,7 +317,7 @@ func TestFreezerConcurrentReadonly(t *testing.T) {
go func(i int) { go func(i int) {
defer wg.Done() defer wg.Done()
f, err := NewFreezer(dir, "", true, 2049, tables) f, err := NewFreezer(dir, "", true, map[string]uint32{"a": 2049}, tables)
if err == nil { if err == nil {
fs[i] = f fs[i] = f
} else { } else {
@ -333,13 +336,13 @@ func TestFreezerConcurrentReadonly(t *testing.T) {
} }
} }
func newFreezerForTesting(t *testing.T, tables map[string]bool) (*Freezer, string) { func newFreezerForTesting(t *testing.T, tables map[string]bool, sizes map[string]uint32) (*Freezer, string) {
t.Helper() t.Helper()
dir := t.TempDir() dir := t.TempDir()
// note: using low max table size here to ensure the tests actually // note: using low max table size here to ensure the tests actually
// switch between multiple files. // switch between multiple files.
f, err := NewFreezer(dir, "", false, 2049, tables) f, err := NewFreezer(dir, "", false, sizes, tables)
if err != nil { if err != nil {
t.Fatal("can't open freezer", err) t.Fatal("can't open freezer", err)
} }
@ -379,7 +382,7 @@ func checkAncientCount(t *testing.T, f *Freezer, kind string, n uint64) {
func TestFreezerCloseSync(t *testing.T) { func TestFreezerCloseSync(t *testing.T) {
t.Parallel() t.Parallel()
f, _ := newFreezerForTesting(t, map[string]bool{"a": true, "b": true}) f, _ := newFreezerForTesting(t, map[string]bool{"a": true, "b": true}, map[string]uint32{"a": 2049, "b": 2049})
defer f.Close() defer f.Close()
// Now, close and sync. This mimics the behaviour if the node is shut down, // Now, close and sync. This mimics the behaviour if the node is shut down,
@ -401,19 +404,27 @@ func TestFreezerCloseSync(t *testing.T) {
func TestFreezerSuite(t *testing.T) { func TestFreezerSuite(t *testing.T) {
ancienttest.TestAncientSuite(t, func(kinds []string) ethdb.AncientStore { ancienttest.TestAncientSuite(t, func(kinds []string) ethdb.AncientStore {
tables := make(map[string]bool) var (
sizes = make(map[string]uint32)
tables = make(map[string]bool)
)
for _, kind := range kinds { for _, kind := range kinds {
sizes[kind] = 2048
tables[kind] = true tables[kind] = true
} }
f, _ := newFreezerForTesting(t, tables) f, _ := newFreezerForTesting(t, tables, sizes)
return f return f
}) })
ancienttest.TestResettableAncientSuite(t, func(kinds []string) ethdb.ResettableAncientStore { ancienttest.TestResettableAncientSuite(t, func(kinds []string) ethdb.ResettableAncientStore {
tables := make(map[string]bool) var (
sizes = make(map[string]uint32)
tables = make(map[string]bool)
)
for _, kind := range kinds { for _, kind := range kinds {
sizes[kind] = 2048
tables[kind] = true tables[kind] = true
} }
f, _ := newResettableFreezer(t.TempDir(), "", false, 2048, tables) f, _ := newResettableFreezer(t.TempDir(), "", false, sizes, tables)
return f return f
}) })
} }

Loading…
Cancel
Save