Official Go implementation of the Ethereum protocol
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go-ethereum/core/rawdb/freezer_batch.go

251 lines
7.2 KiB

// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rawdb
import (
"fmt"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/rlp"
"github.com/golang/snappy"
)
// This is the maximum amount of data that will be buffered in memory
// for a single freezer table batch.
const freezerBatchBufferLimit = 2 * 1024 * 1024
// freezerBatch is a write operation of multiple items on a freezer.
type freezerBatch struct {
tables map[string]*freezerTableBatch
}
func newFreezerBatch(f *Freezer) *freezerBatch {
batch := &freezerBatch{tables: make(map[string]*freezerTableBatch, len(f.tables))}
for kind, table := range f.tables {
batch.tables[kind] = table.newBatch()
}
return batch
}
// Append adds an RLP-encoded item of the given kind.
func (batch *freezerBatch) Append(kind string, num uint64, item interface{}) error {
return batch.tables[kind].Append(num, item)
}
// AppendRaw adds an item of the given kind.
func (batch *freezerBatch) AppendRaw(kind string, num uint64, item []byte) error {
return batch.tables[kind].AppendRaw(num, item)
}
// reset initializes the batch.
func (batch *freezerBatch) reset() {
for _, tb := range batch.tables {
tb.reset()
}
}
// commit is called at the end of a write operation and
// writes all remaining data to tables.
func (batch *freezerBatch) commit() (item uint64, writeSize int64, err error) {
// Check that count agrees on all batches.
item = uint64(math.MaxUint64)
for name, tb := range batch.tables {
if item < math.MaxUint64 && tb.curItem != item {
return 0, 0, fmt.Errorf("table %s is at item %d, want %d", name, tb.curItem, item)
}
item = tb.curItem
}
// Commit all table batches.
for _, tb := range batch.tables {
if err := tb.commit(); err != nil {
return 0, 0, err
}
writeSize += tb.totalBytes
}
return item, writeSize, nil
}
// freezerTableBatch is a batch for a freezer table.
type freezerTableBatch struct {
t *freezerTable
sb *snappyBuffer
encBuffer writeBuffer
dataBuffer []byte
indexBuffer []byte
curItem uint64 // expected index of next append
totalBytes int64 // counts written bytes since reset
}
// newBatch creates a new batch for the freezer table.
func (t *freezerTable) newBatch() *freezerTableBatch {
batch := &freezerTableBatch{t: t}
if !t.noCompression {
batch.sb = new(snappyBuffer)
}
batch.reset()
return batch
}
// reset clears the batch for reuse.
func (batch *freezerTableBatch) reset() {
batch.dataBuffer = batch.dataBuffer[:0]
batch.indexBuffer = batch.indexBuffer[:0]
batch.curItem = batch.t.items.Load()
batch.totalBytes = 0
}
// Append rlp-encodes and adds data at the end of the freezer table. The item number is a
// precautionary parameter to ensure data correctness, but the table will reject already
// existing data.
func (batch *freezerTableBatch) Append(item uint64, data interface{}) error {
if item != batch.curItem {
return fmt.Errorf("%w: have %d want %d", errOutOrderInsertion, item, batch.curItem)
}
// Encode the item.
batch.encBuffer.Reset()
if err := rlp.Encode(&batch.encBuffer, data); err != nil {
return err
}
encItem := batch.encBuffer.data
if batch.sb != nil {
encItem = batch.sb.compress(encItem)
}
return batch.appendItem(encItem)
}
// AppendRaw injects a binary blob at the end of the freezer table. The item number is a
// precautionary parameter to ensure data correctness, but the table will reject already
// existing data.
func (batch *freezerTableBatch) AppendRaw(item uint64, blob []byte) error {
if item != batch.curItem {
return fmt.Errorf("%w: have %d want %d", errOutOrderInsertion, item, batch.curItem)
}
encItem := blob
if batch.sb != nil {
encItem = batch.sb.compress(blob)
}
return batch.appendItem(encItem)
}
func (batch *freezerTableBatch) appendItem(data []byte) error {
// Check if item fits into current data file.
itemSize := int64(len(data))
itemOffset := batch.t.headBytes + int64(len(batch.dataBuffer))
if itemOffset+itemSize > int64(batch.t.maxFileSize) {
// It doesn't fit, go to next file first.
if err := batch.commit(); err != nil {
return err
}
if err := batch.t.advanceHead(); err != nil {
return err
}
itemOffset = 0
}
// Put data to buffer.
batch.dataBuffer = append(batch.dataBuffer, data...)
batch.totalBytes += itemSize
// Put index entry to buffer.
entry := indexEntry{filenum: batch.t.headId, offset: uint32(itemOffset + itemSize)}
batch.indexBuffer = entry.append(batch.indexBuffer)
batch.curItem++
return batch.maybeCommit()
}
// maybeCommit writes the buffered data if the buffer is full enough.
func (batch *freezerTableBatch) maybeCommit() error {
if len(batch.dataBuffer) > freezerBatchBufferLimit {
return batch.commit()
}
return nil
}
core/rawdb: freezer index repair (#29792) This pull request removes the `fsync` of index files in freezer.ModifyAncients function for performance gain. Originally, fsync is added after each freezer write operation to ensure the written data is truly transferred into disk. Unfortunately, it turns out `fsync` can be relatively slow, especially on macOS (see https://github.com/ethereum/go-ethereum/issues/28754 for more information). In this pull request, fsync for index file is removed as it turns out index file can be recovered even after a unclean shutdown. But fsync for data file is still kept, as we have no meaningful way to validate the data correctness after unclean shutdown. --- **But why do we need the `fsync` in the first place?** As it's necessary for freezer to survive/recover after the machine crash (e.g. power failure). In linux, whenever the file write is performed, the file metadata update and data update are not necessarily performed at the same time. Typically, the metadata will be flushed/journalled ahead of the file data. Therefore, we make the pessimistic assumption that the file is first extended with invalid "garbage" data (normally zero bytes) and that afterwards the correct data replaces the garbage. We have observed that the index file of the freezer often contain garbage entry with zero value (filenumber = 0, offset = 0) after a machine power failure. It proves that the index file is extended without the data being flushed. And this corruption can destroy the whole freezer data eventually. Performing fsync after each write operation can reduce the time window for data to be transferred to the disk and ensure the correctness of the data in the disk to the greatest extent. --- **How can we maintain this guarantee without relying on fsync?** Because the items in the index file are strictly in order, we can leverage this characteristic to detect the corruption and truncate them when freezer is opened. Specifically these validation rules are performed for each index file: For two consecutive index items: - If their file numbers are the same, then the offset of the latter one MUST not be less than that of the former. - If the file number of the latter one is equal to that of the former plus one, then the offset of the latter one MUST not be 0. - If their file numbers are not equal, and the latter's file number is not equal to the former plus 1, the latter one is valid And also, for the first non-head item, it must refer to the earliest data file, or the next file if the earliest file is not sufficient to place the first item(very special case, only theoretical possible in tests) With these validation rules, we can detect the invalid item in index file with greatest possibility. --- But unfortunately, these scenarios are not covered and could still lead to a freezer corruption if it occurs: **All items in index file are in zero value** It's impossible to distinguish if they are truly zero (e.g. all the data entries maintained in freezer are zero size) or just the garbage left by OS. In this case, these index items will be kept by truncating the entire data file, namely the freezer is corrupted. However, we can consider that the probability of this situation occurring is quite low, and even if it occurs, the freezer can be considered to be close to an empty state. Rerun the state sync should be acceptable. **Index file is integral while relative data file is corrupted** It might be possible the data file is corrupted whose file size is extended correctly with garbage filled (e.g. zero bytes). In this case, it's impossible to detect the corruption by index validation. We can either choose to `fsync` the data file, or blindly believe that if index file is integral then the data file could be integral with very high chance. In this pull request, the first option is taken.
2 months ago
// commit writes the batched items to the backing freezerTable. Note index
// file isn't fsync'd after the file write, the recent write can be lost
// after the power failure.
func (batch *freezerTableBatch) commit() error {
_, err := batch.t.head.Write(batch.dataBuffer)
if err != nil {
return err
}
if err := batch.t.head.Sync(); err != nil {
return err
}
dataSize := int64(len(batch.dataBuffer))
batch.dataBuffer = batch.dataBuffer[:0]
_, err = batch.t.index.Write(batch.indexBuffer)
if err != nil {
return err
}
indexSize := int64(len(batch.indexBuffer))
batch.indexBuffer = batch.indexBuffer[:0]
// Update headBytes of table.
batch.t.headBytes += dataSize
batch.t.items.Store(batch.curItem)
// Update metrics.
batch.t.sizeGauge.Inc(dataSize + indexSize)
batch.t.writeMeter.Mark(dataSize + indexSize)
return nil
}
// snappyBuffer writes snappy in block format, and can be reused. It is
// reset when WriteTo is called.
type snappyBuffer struct {
dst []byte
}
// compress snappy-compresses the data.
func (s *snappyBuffer) compress(data []byte) []byte {
// The snappy library does not care what the capacity of the buffer is,
// but only checks the length. If the length is too small, it will
// allocate a brand new buffer.
// To avoid that, we check the required size here, and grow the size of the
// buffer to utilize the full capacity.
if n := snappy.MaxEncodedLen(len(data)); len(s.dst) < n {
if cap(s.dst) < n {
s.dst = make([]byte, n)
}
s.dst = s.dst[:n]
}
s.dst = snappy.Encode(s.dst, data)
return s.dst
}
// writeBuffer implements io.Writer for a byte slice.
type writeBuffer struct {
data []byte
}
func (wb *writeBuffer) Write(data []byte) (int, error) {
wb.data = append(wb.data, data...)
return len(data), nil
}
func (wb *writeBuffer) Reset() {
wb.data = wb.data[:0]
}