|
|
|
// Copyright 2021 The go-ethereum Authors
|
|
|
|
// This file is part of the go-ethereum library.
|
|
|
|
//
|
|
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
|
|
// (at your option) any later version.
|
|
|
|
//
|
|
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
// GNU Lesser General Public License for more details.
|
|
|
|
//
|
|
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
|
|
|
package rawdb
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"math"
|
|
|
|
|
|
|
|
"github.com/ethereum/go-ethereum/rlp"
|
|
|
|
"github.com/golang/snappy"
|
|
|
|
)
|
|
|
|
|
|
|
|
// This is the maximum amount of data that will be buffered in memory
|
|
|
|
// for a single freezer table batch.
|
|
|
|
const freezerBatchBufferLimit = 2 * 1024 * 1024
|
|
|
|
|
|
|
|
// freezerBatch is a write operation of multiple items on a freezer.
|
|
|
|
type freezerBatch struct {
|
|
|
|
tables map[string]*freezerTableBatch
|
|
|
|
}
|
|
|
|
|
|
|
|
func newFreezerBatch(f *Freezer) *freezerBatch {
|
|
|
|
batch := &freezerBatch{tables: make(map[string]*freezerTableBatch, len(f.tables))}
|
|
|
|
for kind, table := range f.tables {
|
|
|
|
batch.tables[kind] = table.newBatch()
|
|
|
|
}
|
|
|
|
return batch
|
|
|
|
}
|
|
|
|
|
|
|
|
// Append adds an RLP-encoded item of the given kind.
|
|
|
|
func (batch *freezerBatch) Append(kind string, num uint64, item interface{}) error {
|
|
|
|
return batch.tables[kind].Append(num, item)
|
|
|
|
}
|
|
|
|
|
|
|
|
// AppendRaw adds an item of the given kind.
|
|
|
|
func (batch *freezerBatch) AppendRaw(kind string, num uint64, item []byte) error {
|
|
|
|
return batch.tables[kind].AppendRaw(num, item)
|
|
|
|
}
|
|
|
|
|
|
|
|
// reset initializes the batch.
|
|
|
|
func (batch *freezerBatch) reset() {
|
|
|
|
for _, tb := range batch.tables {
|
|
|
|
tb.reset()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// commit is called at the end of a write operation and
|
|
|
|
// writes all remaining data to tables.
|
|
|
|
func (batch *freezerBatch) commit() (item uint64, writeSize int64, err error) {
|
|
|
|
// Check that count agrees on all batches.
|
|
|
|
item = uint64(math.MaxUint64)
|
|
|
|
for name, tb := range batch.tables {
|
|
|
|
if item < math.MaxUint64 && tb.curItem != item {
|
|
|
|
return 0, 0, fmt.Errorf("table %s is at item %d, want %d", name, tb.curItem, item)
|
|
|
|
}
|
|
|
|
item = tb.curItem
|
|
|
|
}
|
|
|
|
|
|
|
|
// Commit all table batches.
|
|
|
|
for _, tb := range batch.tables {
|
|
|
|
if err := tb.commit(); err != nil {
|
|
|
|
return 0, 0, err
|
|
|
|
}
|
|
|
|
writeSize += tb.totalBytes
|
|
|
|
}
|
|
|
|
return item, writeSize, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// freezerTableBatch is a batch for a freezer table.
|
|
|
|
type freezerTableBatch struct {
|
|
|
|
t *freezerTable
|
|
|
|
|
|
|
|
sb *snappyBuffer
|
|
|
|
encBuffer writeBuffer
|
|
|
|
dataBuffer []byte
|
|
|
|
indexBuffer []byte
|
|
|
|
curItem uint64 // expected index of next append
|
|
|
|
totalBytes int64 // counts written bytes since reset
|
|
|
|
}
|
|
|
|
|
|
|
|
// newBatch creates a new batch for the freezer table.
|
|
|
|
func (t *freezerTable) newBatch() *freezerTableBatch {
|
|
|
|
batch := &freezerTableBatch{t: t}
|
|
|
|
if !t.noCompression {
|
|
|
|
batch.sb = new(snappyBuffer)
|
|
|
|
}
|
|
|
|
batch.reset()
|
|
|
|
return batch
|
|
|
|
}
|
|
|
|
|
|
|
|
// reset clears the batch for reuse.
|
|
|
|
func (batch *freezerTableBatch) reset() {
|
|
|
|
batch.dataBuffer = batch.dataBuffer[:0]
|
|
|
|
batch.indexBuffer = batch.indexBuffer[:0]
|
|
|
|
batch.curItem = batch.t.items.Load()
|
|
|
|
batch.totalBytes = 0
|
|
|
|
}
|
|
|
|
|
|
|
|
// Append rlp-encodes and adds data at the end of the freezer table. The item number is a
|
|
|
|
// precautionary parameter to ensure data correctness, but the table will reject already
|
|
|
|
// existing data.
|
|
|
|
func (batch *freezerTableBatch) Append(item uint64, data interface{}) error {
|
|
|
|
if item != batch.curItem {
|
|
|
|
return fmt.Errorf("%w: have %d want %d", errOutOrderInsertion, item, batch.curItem)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Encode the item.
|
|
|
|
batch.encBuffer.Reset()
|
|
|
|
if err := rlp.Encode(&batch.encBuffer, data); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
encItem := batch.encBuffer.data
|
|
|
|
if batch.sb != nil {
|
|
|
|
encItem = batch.sb.compress(encItem)
|
|
|
|
}
|
|
|
|
return batch.appendItem(encItem)
|
|
|
|
}
|
|
|
|
|
|
|
|
// AppendRaw injects a binary blob at the end of the freezer table. The item number is a
|
|
|
|
// precautionary parameter to ensure data correctness, but the table will reject already
|
|
|
|
// existing data.
|
|
|
|
func (batch *freezerTableBatch) AppendRaw(item uint64, blob []byte) error {
|
|
|
|
if item != batch.curItem {
|
|
|
|
return fmt.Errorf("%w: have %d want %d", errOutOrderInsertion, item, batch.curItem)
|
|
|
|
}
|
|
|
|
|
|
|
|
encItem := blob
|
|
|
|
if batch.sb != nil {
|
|
|
|
encItem = batch.sb.compress(blob)
|
|
|
|
}
|
|
|
|
return batch.appendItem(encItem)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (batch *freezerTableBatch) appendItem(data []byte) error {
|
|
|
|
// Check if item fits into current data file.
|
|
|
|
itemSize := int64(len(data))
|
|
|
|
itemOffset := batch.t.headBytes + int64(len(batch.dataBuffer))
|
|
|
|
if itemOffset+itemSize > int64(batch.t.maxFileSize) {
|
|
|
|
// It doesn't fit, go to next file first.
|
|
|
|
if err := batch.commit(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := batch.t.advanceHead(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
itemOffset = 0
|
|
|
|
}
|
|
|
|
|
|
|
|
// Put data to buffer.
|
|
|
|
batch.dataBuffer = append(batch.dataBuffer, data...)
|
|
|
|
batch.totalBytes += itemSize
|
|
|
|
|
|
|
|
// Put index entry to buffer.
|
|
|
|
entry := indexEntry{filenum: batch.t.headId, offset: uint32(itemOffset + itemSize)}
|
|
|
|
batch.indexBuffer = entry.append(batch.indexBuffer)
|
|
|
|
batch.curItem++
|
|
|
|
|
|
|
|
return batch.maybeCommit()
|
|
|
|
}
|
|
|
|
|
|
|
|
// maybeCommit writes the buffered data if the buffer is full enough.
|
|
|
|
func (batch *freezerTableBatch) maybeCommit() error {
|
|
|
|
if len(batch.dataBuffer) > freezerBatchBufferLimit {
|
|
|
|
return batch.commit()
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
core/rawdb: freezer index repair (#29792)
This pull request removes the `fsync` of index files in freezer.ModifyAncients function for
performance gain.
Originally, fsync is added after each freezer write operation to ensure
the written data is truly transferred into disk. Unfortunately, it turns
out `fsync` can be relatively slow, especially on
macOS (see https://github.com/ethereum/go-ethereum/issues/28754 for more
information).
In this pull request, fsync for index file is removed as it turns out
index file can be recovered even after a unclean shutdown. But fsync for data file is still kept, as
we have no meaningful way to validate the data correctness after unclean shutdown.
---
**But why do we need the `fsync` in the first place?**
As it's necessary for freezer to survive/recover after the machine crash
(e.g. power failure).
In linux, whenever the file write is performed, the file metadata update
and data update are
not necessarily performed at the same time. Typically, the metadata will
be flushed/journalled
ahead of the file data. Therefore, we make the pessimistic assumption
that the file is first
extended with invalid "garbage" data (normally zero bytes) and that
afterwards the correct
data replaces the garbage.
We have observed that the index file of the freezer often contain
garbage entry with zero value
(filenumber = 0, offset = 0) after a machine power failure. It proves
that the index file is extended
without the data being flushed. And this corruption can destroy the
whole freezer data eventually.
Performing fsync after each write operation can reduce the time window
for data to be transferred
to the disk and ensure the correctness of the data in the disk to the
greatest extent.
---
**How can we maintain this guarantee without relying on fsync?**
Because the items in the index file are strictly in order, we can
leverage this characteristic to
detect the corruption and truncate them when freezer is opened.
Specifically these validation
rules are performed for each index file:
For two consecutive index items:
- If their file numbers are the same, then the offset of the latter one
MUST not be less than that of the former.
- If the file number of the latter one is equal to that of the former
plus one, then the offset of the latter one MUST not be 0.
- If their file numbers are not equal, and the latter's file number is
not equal to the former plus 1, the latter one is valid
And also, for the first non-head item, it must refer to the earliest
data file, or the next file if the
earliest file is not sufficient to place the first item(very special
case, only theoretical possible
in tests)
With these validation rules, we can detect the invalid item in index
file with greatest possibility.
---
But unfortunately, these scenarios are not covered and could still lead
to a freezer corruption if it occurs:
**All items in index file are in zero value**
It's impossible to distinguish if they are truly zero (e.g. all the data
entries maintained in freezer
are zero size) or just the garbage left by OS. In this case, these index
items will be kept by truncating
the entire data file, namely the freezer is corrupted.
However, we can consider that the probability of this situation
occurring is quite low, and even
if it occurs, the freezer can be considered to be close to an empty
state. Rerun the state sync
should be acceptable.
**Index file is integral while relative data file is corrupted**
It might be possible the data file is corrupted whose file size is
extended correctly with garbage
filled (e.g. zero bytes). In this case, it's impossible to detect the
corruption by index validation.
We can either choose to `fsync` the data file, or blindly believe that
if index file is integral then
the data file could be integral with very high chance. In this pull
request, the first option is taken.
2 months ago
|
|
|
// commit writes the batched items to the backing freezerTable. Note index
|
|
|
|
// file isn't fsync'd after the file write, the recent write can be lost
|
|
|
|
// after the power failure.
|
|
|
|
func (batch *freezerTableBatch) commit() error {
|
|
|
|
_, err := batch.t.head.Write(batch.dataBuffer)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := batch.t.head.Sync(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
dataSize := int64(len(batch.dataBuffer))
|
|
|
|
batch.dataBuffer = batch.dataBuffer[:0]
|
|
|
|
|
|
|
|
_, err = batch.t.index.Write(batch.indexBuffer)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
indexSize := int64(len(batch.indexBuffer))
|
|
|
|
batch.indexBuffer = batch.indexBuffer[:0]
|
|
|
|
|
|
|
|
// Update headBytes of table.
|
|
|
|
batch.t.headBytes += dataSize
|
|
|
|
batch.t.items.Store(batch.curItem)
|
|
|
|
|
|
|
|
// Update metrics.
|
|
|
|
batch.t.sizeGauge.Inc(dataSize + indexSize)
|
|
|
|
batch.t.writeMeter.Mark(dataSize + indexSize)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// snappyBuffer writes snappy in block format, and can be reused. It is
|
|
|
|
// reset when WriteTo is called.
|
|
|
|
type snappyBuffer struct {
|
|
|
|
dst []byte
|
|
|
|
}
|
|
|
|
|
|
|
|
// compress snappy-compresses the data.
|
|
|
|
func (s *snappyBuffer) compress(data []byte) []byte {
|
|
|
|
// The snappy library does not care what the capacity of the buffer is,
|
|
|
|
// but only checks the length. If the length is too small, it will
|
|
|
|
// allocate a brand new buffer.
|
|
|
|
// To avoid that, we check the required size here, and grow the size of the
|
|
|
|
// buffer to utilize the full capacity.
|
|
|
|
if n := snappy.MaxEncodedLen(len(data)); len(s.dst) < n {
|
|
|
|
if cap(s.dst) < n {
|
|
|
|
s.dst = make([]byte, n)
|
|
|
|
}
|
|
|
|
s.dst = s.dst[:n]
|
|
|
|
}
|
|
|
|
|
|
|
|
s.dst = snappy.Encode(s.dst, data)
|
|
|
|
return s.dst
|
|
|
|
}
|
|
|
|
|
|
|
|
// writeBuffer implements io.Writer for a byte slice.
|
|
|
|
type writeBuffer struct {
|
|
|
|
data []byte
|
|
|
|
}
|
|
|
|
|
|
|
|
func (wb *writeBuffer) Write(data []byte) (int, error) {
|
|
|
|
wb.data = append(wb.data, data...)
|
|
|
|
return len(data), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (wb *writeBuffer) Reset() {
|
|
|
|
wb.data = wb.data[:0]
|
|
|
|
}
|