mirror of https://github.com/ethereum/go-ethereum
Merge pull request #19244 from karalabe/freezer-2
cmd, core, eth, les, node: chain freezer on top of db reworkpull/19591/head
commit
f5d89cdb72
@ -0,0 +1,382 @@ |
||||
// Copyright 2018 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package rawdb |
||||
|
||||
import ( |
||||
"errors" |
||||
"fmt" |
||||
"math" |
||||
"os" |
||||
"path/filepath" |
||||
"sync/atomic" |
||||
"time" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
"github.com/ethereum/go-ethereum/log" |
||||
"github.com/ethereum/go-ethereum/metrics" |
||||
"github.com/ethereum/go-ethereum/params" |
||||
"github.com/prometheus/tsdb/fileutil" |
||||
) |
||||
|
||||
var ( |
||||
// errUnknownTable is returned if the user attempts to read from a table that is
|
||||
// not tracked by the freezer.
|
||||
errUnknownTable = errors.New("unknown table") |
||||
|
||||
// errOutOrderInsertion is returned if the user attempts to inject out-of-order
|
||||
// binary blobs into the freezer.
|
||||
errOutOrderInsertion = errors.New("the append operation is out-order") |
||||
|
||||
// errSymlinkDatadir is returned if the ancient directory specified by user
|
||||
// is a symbolic link.
|
||||
errSymlinkDatadir = errors.New("symbolic link datadir is not supported") |
||||
) |
||||
|
||||
const ( |
||||
// freezerRecheckInterval is the frequency to check the key-value database for
|
||||
// chain progression that might permit new blocks to be frozen into immutable
|
||||
// storage.
|
||||
freezerRecheckInterval = time.Minute |
||||
|
||||
// freezerBatchLimit is the maximum number of blocks to freeze in one batch
|
||||
// before doing an fsync and deleting it from the key-value store.
|
||||
freezerBatchLimit = 30000 |
||||
) |
||||
|
||||
// freezer is an memory mapped append-only database to store immutable chain data
|
||||
// into flat files:
|
||||
//
|
||||
// - The append only nature ensures that disk writes are minimized.
|
||||
// - The memory mapping ensures we can max out system memory for caching without
|
||||
// reserving it for go-ethereum. This would also reduce the memory requirements
|
||||
// of Geth, and thus also GC overhead.
|
||||
type freezer struct { |
||||
tables map[string]*freezerTable // Data tables for storing everything
|
||||
frozen uint64 // Number of blocks already frozen
|
||||
instanceLock fileutil.Releaser // File-system lock to prevent double opens
|
||||
} |
||||
|
||||
// newFreezer creates a chain freezer that moves ancient chain data into
|
||||
// append-only flat file containers.
|
||||
func newFreezer(datadir string, namespace string) (*freezer, error) { |
||||
// Create the initial freezer object
|
||||
var ( |
||||
readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) |
||||
writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil) |
||||
) |
||||
// Ensure the datadir is not a symbolic link if it exists.
|
||||
if info, err := os.Lstat(datadir); !os.IsNotExist(err) { |
||||
if info.Mode()&os.ModeSymlink != 0 { |
||||
log.Warn("Symbolic link ancient database is not supported", "path", datadir) |
||||
return nil, errSymlinkDatadir |
||||
} |
||||
} |
||||
// Leveldb uses LOCK as the filelock filename. To prevent the
|
||||
// name collision, we use FLOCK as the lock name.
|
||||
lock, _, err := fileutil.Flock(filepath.Join(datadir, "FLOCK")) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
// Open all the supported data tables
|
||||
freezer := &freezer{ |
||||
tables: make(map[string]*freezerTable), |
||||
instanceLock: lock, |
||||
} |
||||
for name, disableSnappy := range freezerNoSnappy { |
||||
table, err := newTable(datadir, name, readMeter, writeMeter, disableSnappy) |
||||
if err != nil { |
||||
for _, table := range freezer.tables { |
||||
table.Close() |
||||
} |
||||
lock.Release() |
||||
return nil, err |
||||
} |
||||
freezer.tables[name] = table |
||||
} |
||||
if err := freezer.repair(); err != nil { |
||||
for _, table := range freezer.tables { |
||||
table.Close() |
||||
} |
||||
lock.Release() |
||||
return nil, err |
||||
} |
||||
log.Info("Opened ancient database", "database", datadir) |
||||
return freezer, nil |
||||
} |
||||
|
||||
// Close terminates the chain freezer, unmapping all the data files.
|
||||
func (f *freezer) Close() error { |
||||
var errs []error |
||||
for _, table := range f.tables { |
||||
if err := table.Close(); err != nil { |
||||
errs = append(errs, err) |
||||
} |
||||
} |
||||
if err := f.instanceLock.Release(); err != nil { |
||||
errs = append(errs, err) |
||||
} |
||||
if errs != nil { |
||||
return fmt.Errorf("%v", errs) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// HasAncient returns an indicator whether the specified ancient data exists
|
||||
// in the freezer.
|
||||
func (f *freezer) HasAncient(kind string, number uint64) (bool, error) { |
||||
if table := f.tables[kind]; table != nil { |
||||
return table.has(number), nil |
||||
} |
||||
return false, nil |
||||
} |
||||
|
||||
// Ancient retrieves an ancient binary blob from the append-only immutable files.
|
||||
func (f *freezer) Ancient(kind string, number uint64) ([]byte, error) { |
||||
if table := f.tables[kind]; table != nil { |
||||
return table.Retrieve(number) |
||||
} |
||||
return nil, errUnknownTable |
||||
} |
||||
|
||||
// Ancients returns the length of the frozen items.
|
||||
func (f *freezer) Ancients() (uint64, error) { |
||||
return atomic.LoadUint64(&f.frozen), nil |
||||
} |
||||
|
||||
// AncientSize returns the ancient size of the specified category.
|
||||
func (f *freezer) AncientSize(kind string) (uint64, error) { |
||||
if table := f.tables[kind]; table != nil { |
||||
return table.size() |
||||
} |
||||
return 0, errUnknownTable |
||||
} |
||||
|
||||
// AppendAncient injects all binary blobs belong to block at the end of the
|
||||
// append-only immutable table files.
|
||||
//
|
||||
// Notably, this function is lock free but kind of thread-safe. All out-of-order
|
||||
// injection will be rejected. But if two injections with same number happen at
|
||||
// the same time, we can get into the trouble.
|
||||
func (f *freezer) AppendAncient(number uint64, hash, header, body, receipts, td []byte) (err error) { |
||||
// Ensure the binary blobs we are appending is continuous with freezer.
|
||||
if atomic.LoadUint64(&f.frozen) != number { |
||||
return errOutOrderInsertion |
||||
} |
||||
// Rollback all inserted data if any insertion below failed to ensure
|
||||
// the tables won't out of sync.
|
||||
defer func() { |
||||
if err != nil { |
||||
rerr := f.repair() |
||||
if rerr != nil { |
||||
log.Crit("Failed to repair freezer", "err", rerr) |
||||
} |
||||
log.Info("Append ancient failed", "number", number, "err", err) |
||||
} |
||||
}() |
||||
// Inject all the components into the relevant data tables
|
||||
if err := f.tables[freezerHashTable].Append(f.frozen, hash[:]); err != nil { |
||||
log.Error("Failed to append ancient hash", "number", f.frozen, "hash", hash, "err", err) |
||||
return err |
||||
} |
||||
if err := f.tables[freezerHeaderTable].Append(f.frozen, header); err != nil { |
||||
log.Error("Failed to append ancient header", "number", f.frozen, "hash", hash, "err", err) |
||||
return err |
||||
} |
||||
if err := f.tables[freezerBodiesTable].Append(f.frozen, body); err != nil { |
||||
log.Error("Failed to append ancient body", "number", f.frozen, "hash", hash, "err", err) |
||||
return err |
||||
} |
||||
if err := f.tables[freezerReceiptTable].Append(f.frozen, receipts); err != nil { |
||||
log.Error("Failed to append ancient receipts", "number", f.frozen, "hash", hash, "err", err) |
||||
return err |
||||
} |
||||
if err := f.tables[freezerDifficultyTable].Append(f.frozen, td); err != nil { |
||||
log.Error("Failed to append ancient difficulty", "number", f.frozen, "hash", hash, "err", err) |
||||
return err |
||||
} |
||||
atomic.AddUint64(&f.frozen, 1) // Only modify atomically
|
||||
return nil |
||||
} |
||||
|
||||
// Truncate discards any recent data above the provided threshold number.
|
||||
func (f *freezer) TruncateAncients(items uint64) error { |
||||
if atomic.LoadUint64(&f.frozen) <= items { |
||||
return nil |
||||
} |
||||
for _, table := range f.tables { |
||||
if err := table.truncate(items); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
atomic.StoreUint64(&f.frozen, items) |
||||
return nil |
||||
} |
||||
|
||||
// sync flushes all data tables to disk.
|
||||
func (f *freezer) Sync() error { |
||||
var errs []error |
||||
for _, table := range f.tables { |
||||
if err := table.Sync(); err != nil { |
||||
errs = append(errs, err) |
||||
} |
||||
} |
||||
if errs != nil { |
||||
return fmt.Errorf("%v", errs) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// freeze is a background thread that periodically checks the blockchain for any
|
||||
// import progress and moves ancient data from the fast database into the freezer.
|
||||
//
|
||||
// This functionality is deliberately broken off from block importing to avoid
|
||||
// incurring additional data shuffling delays on block propagation.
|
||||
func (f *freezer) freeze(db ethdb.KeyValueStore) { |
||||
nfdb := &nofreezedb{KeyValueStore: db} |
||||
|
||||
for { |
||||
// Retrieve the freezing threshold.
|
||||
hash := ReadHeadBlockHash(nfdb) |
||||
if hash == (common.Hash{}) { |
||||
log.Debug("Current full block hash unavailable") // new chain, empty database
|
||||
time.Sleep(freezerRecheckInterval) |
||||
continue |
||||
} |
||||
number := ReadHeaderNumber(nfdb, hash) |
||||
switch { |
||||
case number == nil: |
||||
log.Error("Current full block number unavailable", "hash", hash) |
||||
time.Sleep(freezerRecheckInterval) |
||||
continue |
||||
|
||||
case *number < params.ImmutabilityThreshold: |
||||
log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", params.ImmutabilityThreshold) |
||||
time.Sleep(freezerRecheckInterval) |
||||
continue |
||||
|
||||
case *number-params.ImmutabilityThreshold <= f.frozen: |
||||
log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", f.frozen) |
||||
time.Sleep(freezerRecheckInterval) |
||||
continue |
||||
} |
||||
head := ReadHeader(nfdb, hash, *number) |
||||
if head == nil { |
||||
log.Error("Current full block unavailable", "number", *number, "hash", hash) |
||||
time.Sleep(freezerRecheckInterval) |
||||
continue |
||||
} |
||||
// Seems we have data ready to be frozen, process in usable batches
|
||||
limit := *number - params.ImmutabilityThreshold |
||||
if limit-f.frozen > freezerBatchLimit { |
||||
limit = f.frozen + freezerBatchLimit |
||||
} |
||||
var ( |
||||
start = time.Now() |
||||
first = f.frozen |
||||
ancients = make([]common.Hash, 0, limit) |
||||
) |
||||
for f.frozen < limit { |
||||
// Retrieves all the components of the canonical block
|
||||
hash := ReadCanonicalHash(nfdb, f.frozen) |
||||
if hash == (common.Hash{}) { |
||||
log.Error("Canonical hash missing, can't freeze", "number", f.frozen) |
||||
break |
||||
} |
||||
header := ReadHeaderRLP(nfdb, hash, f.frozen) |
||||
if len(header) == 0 { |
||||
log.Error("Block header missing, can't freeze", "number", f.frozen, "hash", hash) |
||||
break |
||||
} |
||||
body := ReadBodyRLP(nfdb, hash, f.frozen) |
||||
if len(body) == 0 { |
||||
log.Error("Block body missing, can't freeze", "number", f.frozen, "hash", hash) |
||||
break |
||||
} |
||||
receipts := ReadReceiptsRLP(nfdb, hash, f.frozen) |
||||
if len(receipts) == 0 { |
||||
log.Error("Block receipts missing, can't freeze", "number", f.frozen, "hash", hash) |
||||
break |
||||
} |
||||
td := ReadTdRLP(nfdb, hash, f.frozen) |
||||
if len(td) == 0 { |
||||
log.Error("Total difficulty missing, can't freeze", "number", f.frozen, "hash", hash) |
||||
break |
||||
} |
||||
log.Trace("Deep froze ancient block", "number", f.frozen, "hash", hash) |
||||
// Inject all the components into the relevant data tables
|
||||
if err := f.AppendAncient(f.frozen, hash[:], header, body, receipts, td); err != nil { |
||||
break |
||||
} |
||||
ancients = append(ancients, hash) |
||||
} |
||||
// Batch of blocks have been frozen, flush them before wiping from leveldb
|
||||
if err := f.Sync(); err != nil { |
||||
log.Crit("Failed to flush frozen tables", "err", err) |
||||
} |
||||
// Wipe out all data from the active database
|
||||
batch := db.NewBatch() |
||||
for i := 0; i < len(ancients); i++ { |
||||
DeleteBlockWithoutNumber(batch, ancients[i], first+uint64(i)) |
||||
DeleteCanonicalHash(batch, first+uint64(i)) |
||||
} |
||||
if err := batch.Write(); err != nil { |
||||
log.Crit("Failed to delete frozen canonical blocks", "err", err) |
||||
} |
||||
batch.Reset() |
||||
// Wipe out side chain also.
|
||||
for number := first; number < f.frozen; number++ { |
||||
for _, hash := range ReadAllHashes(db, number) { |
||||
DeleteBlock(batch, hash, number) |
||||
} |
||||
} |
||||
if err := batch.Write(); err != nil { |
||||
log.Crit("Failed to delete frozen side blocks", "err", err) |
||||
} |
||||
// Log something friendly for the user
|
||||
context := []interface{}{ |
||||
"blocks", f.frozen - first, "elapsed", common.PrettyDuration(time.Since(start)), "number", f.frozen - 1, |
||||
} |
||||
if n := len(ancients); n > 0 { |
||||
context = append(context, []interface{}{"hash", ancients[n-1]}...) |
||||
} |
||||
log.Info("Deep froze chain segment", context...) |
||||
|
||||
// Avoid database thrashing with tiny writes
|
||||
if f.frozen-first < freezerBatchLimit { |
||||
time.Sleep(freezerRecheckInterval) |
||||
} |
||||
} |
||||
} |
||||
|
||||
// repair truncates all data tables to the same length.
|
||||
func (f *freezer) repair() error { |
||||
min := uint64(math.MaxUint64) |
||||
for _, table := range f.tables { |
||||
items := atomic.LoadUint64(&table.items) |
||||
if min > items { |
||||
min = items |
||||
} |
||||
} |
||||
for _, table := range f.tables { |
||||
if err := table.truncate(min); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
atomic.StoreUint64(&f.frozen, min) |
||||
return nil |
||||
} |
@ -0,0 +1,561 @@ |
||||
// Copyright 2019 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package rawdb |
||||
|
||||
import ( |
||||
"encoding/binary" |
||||
"errors" |
||||
"fmt" |
||||
"os" |
||||
"path/filepath" |
||||
"sync" |
||||
"sync/atomic" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/log" |
||||
"github.com/ethereum/go-ethereum/metrics" |
||||
"github.com/golang/snappy" |
||||
) |
||||
|
||||
var ( |
||||
// errClosed is returned if an operation attempts to read from or write to the
|
||||
// freezer table after it has already been closed.
|
||||
errClosed = errors.New("closed") |
||||
|
||||
// errOutOfBounds is returned if the item requested is not contained within the
|
||||
// freezer table.
|
||||
errOutOfBounds = errors.New("out of bounds") |
||||
|
||||
// errNotSupported is returned if the database doesn't support the required operation.
|
||||
errNotSupported = errors.New("this operation is not supported") |
||||
) |
||||
|
||||
// indexEntry contains the number/id of the file that the data resides in, aswell as the
|
||||
// offset within the file to the end of the data
|
||||
// In serialized form, the filenum is stored as uint16.
|
||||
type indexEntry struct { |
||||
filenum uint32 // stored as uint16 ( 2 bytes)
|
||||
offset uint32 // stored as uint32 ( 4 bytes)
|
||||
} |
||||
|
||||
const indexEntrySize = 6 |
||||
|
||||
// unmarshallBinary deserializes binary b into the rawIndex entry.
|
||||
func (i *indexEntry) unmarshalBinary(b []byte) error { |
||||
i.filenum = uint32(binary.BigEndian.Uint16(b[:2])) |
||||
i.offset = binary.BigEndian.Uint32(b[2:6]) |
||||
return nil |
||||
} |
||||
|
||||
// marshallBinary serializes the rawIndex entry into binary.
|
||||
func (i *indexEntry) marshallBinary() []byte { |
||||
b := make([]byte, indexEntrySize) |
||||
binary.BigEndian.PutUint16(b[:2], uint16(i.filenum)) |
||||
binary.BigEndian.PutUint32(b[2:6], i.offset) |
||||
return b |
||||
} |
||||
|
||||
// freezerTable represents a single chained data table within the freezer (e.g. blocks).
|
||||
// It consists of a data file (snappy encoded arbitrary data blobs) and an indexEntry
|
||||
// file (uncompressed 64 bit indices into the data file).
|
||||
type freezerTable struct { |
||||
noCompression bool // if true, disables snappy compression. Note: does not work retroactively
|
||||
maxFileSize uint32 // Max file size for data-files
|
||||
name string |
||||
path string |
||||
|
||||
head *os.File // File descriptor for the data head of the table
|
||||
files map[uint32]*os.File // open files
|
||||
headId uint32 // number of the currently active head file
|
||||
tailId uint32 // number of the earliest file
|
||||
index *os.File // File descriptor for the indexEntry file of the table
|
||||
|
||||
// In the case that old items are deleted (from the tail), we use itemOffset
|
||||
// to count how many historic items have gone missing.
|
||||
items uint64 // Number of items stored in the table (including items removed from tail)
|
||||
itemOffset uint32 // Offset (number of discarded items)
|
||||
|
||||
headBytes uint32 // Number of bytes written to the head file
|
||||
readMeter metrics.Meter // Meter for measuring the effective amount of data read
|
||||
writeMeter metrics.Meter // Meter for measuring the effective amount of data written
|
||||
|
||||
logger log.Logger // Logger with database path and table name ambedded
|
||||
lock sync.RWMutex // Mutex protecting the data file descriptors
|
||||
} |
||||
|
||||
// newTable opens a freezer table with default settings - 2G files
|
||||
func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, disableSnappy bool) (*freezerTable, error) { |
||||
return newCustomTable(path, name, readMeter, writeMeter, 2*1000*1000*1000, disableSnappy) |
||||
} |
||||
|
||||
// newCustomTable opens a freezer table, creating the data and index files if they are
|
||||
// non existent. Both files are truncated to the shortest common length to ensure
|
||||
// they don't go out of sync.
|
||||
func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, maxFilesize uint32, noCompression bool) (*freezerTable, error) { |
||||
// Ensure the containing directory exists and open the indexEntry file
|
||||
if err := os.MkdirAll(path, 0755); err != nil { |
||||
return nil, err |
||||
} |
||||
var idxName string |
||||
if noCompression { |
||||
// raw idx
|
||||
idxName = fmt.Sprintf("%s.ridx", name) |
||||
} else { |
||||
// compressed idx
|
||||
idxName = fmt.Sprintf("%s.cidx", name) |
||||
} |
||||
offsets, err := os.OpenFile(filepath.Join(path, idxName), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
// Create the table and repair any past inconsistency
|
||||
tab := &freezerTable{ |
||||
index: offsets, |
||||
files: make(map[uint32]*os.File), |
||||
readMeter: readMeter, |
||||
writeMeter: writeMeter, |
||||
name: name, |
||||
path: path, |
||||
logger: log.New("database", path, "table", name), |
||||
noCompression: noCompression, |
||||
maxFileSize: maxFilesize, |
||||
} |
||||
if err := tab.repair(); err != nil { |
||||
tab.Close() |
||||
return nil, err |
||||
} |
||||
return tab, nil |
||||
} |
||||
|
||||
// repair cross checks the head and the index file and truncates them to
|
||||
// be in sync with each other after a potential crash / data loss.
|
||||
func (t *freezerTable) repair() error { |
||||
// Create a temporary offset buffer to init files with and read indexEntry into
|
||||
buffer := make([]byte, indexEntrySize) |
||||
|
||||
// If we've just created the files, initialize the index with the 0 indexEntry
|
||||
stat, err := t.index.Stat() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if stat.Size() == 0 { |
||||
if _, err := t.index.Write(buffer); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
// Ensure the index is a multiple of indexEntrySize bytes
|
||||
if overflow := stat.Size() % indexEntrySize; overflow != 0 { |
||||
t.index.Truncate(stat.Size() - overflow) // New file can't trigger this path
|
||||
} |
||||
// Retrieve the file sizes and prepare for truncation
|
||||
if stat, err = t.index.Stat(); err != nil { |
||||
return err |
||||
} |
||||
offsetsSize := stat.Size() |
||||
|
||||
// Open the head file
|
||||
var ( |
||||
firstIndex indexEntry |
||||
lastIndex indexEntry |
||||
contentSize int64 |
||||
contentExp int64 |
||||
) |
||||
// Read index zero, determine what file is the earliest
|
||||
// and what item offset to use
|
||||
t.index.ReadAt(buffer, 0) |
||||
firstIndex.unmarshalBinary(buffer) |
||||
|
||||
t.tailId = firstIndex.offset |
||||
t.itemOffset = firstIndex.filenum |
||||
|
||||
t.index.ReadAt(buffer, offsetsSize-indexEntrySize) |
||||
lastIndex.unmarshalBinary(buffer) |
||||
t.head, err = t.openFile(lastIndex.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if stat, err = t.head.Stat(); err != nil { |
||||
return err |
||||
} |
||||
contentSize = stat.Size() |
||||
|
||||
// Keep truncating both files until they come in sync
|
||||
contentExp = int64(lastIndex.offset) |
||||
|
||||
for contentExp != contentSize { |
||||
// Truncate the head file to the last offset pointer
|
||||
if contentExp < contentSize { |
||||
t.logger.Warn("Truncating dangling head", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize)) |
||||
if err := t.head.Truncate(contentExp); err != nil { |
||||
return err |
||||
} |
||||
contentSize = contentExp |
||||
} |
||||
// Truncate the index to point within the head file
|
||||
if contentExp > contentSize { |
||||
t.logger.Warn("Truncating dangling indexes", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize)) |
||||
if err := t.index.Truncate(offsetsSize - indexEntrySize); err != nil { |
||||
return err |
||||
} |
||||
offsetsSize -= indexEntrySize |
||||
t.index.ReadAt(buffer, offsetsSize-indexEntrySize) |
||||
var newLastIndex indexEntry |
||||
newLastIndex.unmarshalBinary(buffer) |
||||
// We might have slipped back into an earlier head-file here
|
||||
if newLastIndex.filenum != lastIndex.filenum { |
||||
// release earlier opened file
|
||||
t.releaseFile(lastIndex.filenum) |
||||
t.head, err = t.openFile(newLastIndex.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND) |
||||
if stat, err = t.head.Stat(); err != nil { |
||||
// TODO, anything more we can do here?
|
||||
// A data file has gone missing...
|
||||
return err |
||||
} |
||||
contentSize = stat.Size() |
||||
} |
||||
lastIndex = newLastIndex |
||||
contentExp = int64(lastIndex.offset) |
||||
} |
||||
} |
||||
// Ensure all reparation changes have been written to disk
|
||||
if err := t.index.Sync(); err != nil { |
||||
return err |
||||
} |
||||
if err := t.head.Sync(); err != nil { |
||||
return err |
||||
} |
||||
// Update the item and byte counters and return
|
||||
t.items = uint64(t.itemOffset) + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file
|
||||
t.headBytes = uint32(contentSize) |
||||
t.headId = lastIndex.filenum |
||||
|
||||
// Close opened files and preopen all files
|
||||
if err := t.preopen(); err != nil { |
||||
return err |
||||
} |
||||
t.logger.Debug("Chain freezer table opened", "items", t.items, "size", common.StorageSize(t.headBytes)) |
||||
return nil |
||||
} |
||||
|
||||
// preopen opens all files that the freezer will need. This method should be called from an init-context,
|
||||
// since it assumes that it doesn't have to bother with locking
|
||||
// The rationale for doing preopen is to not have to do it from within Retrieve, thus not needing to ever
|
||||
// obtain a write-lock within Retrieve.
|
||||
func (t *freezerTable) preopen() (err error) { |
||||
// The repair might have already opened (some) files
|
||||
t.releaseFilesAfter(0, false) |
||||
// Open all except head in RDONLY
|
||||
for i := t.tailId; i < t.headId; i++ { |
||||
if _, err = t.openFile(i, os.O_RDONLY); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
// Open head in read/write
|
||||
t.head, err = t.openFile(t.headId, os.O_RDWR|os.O_CREATE|os.O_APPEND) |
||||
return err |
||||
} |
||||
|
||||
// truncate discards any recent data above the provided threashold number.
|
||||
func (t *freezerTable) truncate(items uint64) error { |
||||
t.lock.Lock() |
||||
defer t.lock.Unlock() |
||||
|
||||
// If our item count is correct, don't do anything
|
||||
if atomic.LoadUint64(&t.items) <= items { |
||||
return nil |
||||
} |
||||
// Something's out of sync, truncate the table's offset index
|
||||
t.logger.Warn("Truncating freezer table", "items", t.items, "limit", items) |
||||
if err := t.index.Truncate(int64(items+1) * indexEntrySize); err != nil { |
||||
return err |
||||
} |
||||
// Calculate the new expected size of the data file and truncate it
|
||||
buffer := make([]byte, indexEntrySize) |
||||
if _, err := t.index.ReadAt(buffer, int64(items*indexEntrySize)); err != nil { |
||||
return err |
||||
} |
||||
var expected indexEntry |
||||
expected.unmarshalBinary(buffer) |
||||
|
||||
// We might need to truncate back to older files
|
||||
if expected.filenum != t.headId { |
||||
// If already open for reading, force-reopen for writing
|
||||
t.releaseFile(expected.filenum) |
||||
newHead, err := t.openFile(expected.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
// release any files _after the current head -- both the previous head
|
||||
// and any files which may have been opened for reading
|
||||
t.releaseFilesAfter(expected.filenum, true) |
||||
// set back the historic head
|
||||
t.head = newHead |
||||
atomic.StoreUint32(&t.headId, expected.filenum) |
||||
} |
||||
if err := t.head.Truncate(int64(expected.offset)); err != nil { |
||||
return err |
||||
} |
||||
// All data files truncated, set internal counters and return
|
||||
atomic.StoreUint64(&t.items, items) |
||||
atomic.StoreUint32(&t.headBytes, expected.offset) |
||||
return nil |
||||
} |
||||
|
||||
// Close closes all opened files.
|
||||
func (t *freezerTable) Close() error { |
||||
t.lock.Lock() |
||||
defer t.lock.Unlock() |
||||
|
||||
var errs []error |
||||
if err := t.index.Close(); err != nil { |
||||
errs = append(errs, err) |
||||
} |
||||
t.index = nil |
||||
|
||||
for _, f := range t.files { |
||||
if err := f.Close(); err != nil { |
||||
errs = append(errs, err) |
||||
} |
||||
} |
||||
t.head = nil |
||||
|
||||
if errs != nil { |
||||
return fmt.Errorf("%v", errs) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// openFile assumes that the write-lock is held by the caller
|
||||
func (t *freezerTable) openFile(num uint32, flag int) (f *os.File, err error) { |
||||
var exist bool |
||||
if f, exist = t.files[num]; !exist { |
||||
var name string |
||||
if t.noCompression { |
||||
name = fmt.Sprintf("%s.%04d.rdat", t.name, num) |
||||
} else { |
||||
name = fmt.Sprintf("%s.%04d.cdat", t.name, num) |
||||
} |
||||
f, err = os.OpenFile(filepath.Join(t.path, name), flag, 0644) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
t.files[num] = f |
||||
} |
||||
return f, err |
||||
} |
||||
|
||||
// releaseFile closes a file, and removes it from the open file cache.
|
||||
// Assumes that the caller holds the write lock
|
||||
func (t *freezerTable) releaseFile(num uint32) { |
||||
if f, exist := t.files[num]; exist { |
||||
delete(t.files, num) |
||||
f.Close() |
||||
} |
||||
} |
||||
|
||||
// releaseFilesAfter closes all open files with a higher number, and optionally also deletes the files
|
||||
func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) { |
||||
for fnum, f := range t.files { |
||||
if fnum > num { |
||||
delete(t.files, fnum) |
||||
f.Close() |
||||
if remove { |
||||
os.Remove(f.Name()) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Append injects a binary blob at the end of the freezer table. The item number
|
||||
// is a precautionary parameter to ensure data correctness, but the table will
|
||||
// reject already existing data.
|
||||
//
|
||||
// Note, this method will *not* flush any data to disk so be sure to explicitly
|
||||
// fsync before irreversibly deleting data from the database.
|
||||
func (t *freezerTable) Append(item uint64, blob []byte) error { |
||||
// Read lock prevents competition with truncate
|
||||
t.lock.RLock() |
||||
// Ensure the table is still accessible
|
||||
if t.index == nil || t.head == nil { |
||||
t.lock.RUnlock() |
||||
return errClosed |
||||
} |
||||
// Ensure only the next item can be written, nothing else
|
||||
if atomic.LoadUint64(&t.items) != item { |
||||
t.lock.RUnlock() |
||||
return fmt.Errorf("appending unexpected item: want %d, have %d", t.items, item) |
||||
} |
||||
// Encode the blob and write it into the data file
|
||||
if !t.noCompression { |
||||
blob = snappy.Encode(nil, blob) |
||||
} |
||||
bLen := uint32(len(blob)) |
||||
if t.headBytes+bLen < bLen || |
||||
t.headBytes+bLen > t.maxFileSize { |
||||
// we need a new file, writing would overflow
|
||||
t.lock.RUnlock() |
||||
t.lock.Lock() |
||||
nextId := atomic.LoadUint32(&t.headId) + 1 |
||||
// We open the next file in truncated mode -- if this file already
|
||||
// exists, we need to start over from scratch on it
|
||||
newHead, err := t.openFile(nextId, os.O_RDWR|os.O_CREATE|os.O_TRUNC) |
||||
if err != nil { |
||||
t.lock.Unlock() |
||||
return err |
||||
} |
||||
// Close old file, and reopen in RDONLY mode
|
||||
t.releaseFile(t.headId) |
||||
t.openFile(t.headId, os.O_RDONLY) |
||||
|
||||
// Swap out the current head
|
||||
t.head = newHead |
||||
atomic.StoreUint32(&t.headBytes, 0) |
||||
atomic.StoreUint32(&t.headId, nextId) |
||||
t.lock.Unlock() |
||||
t.lock.RLock() |
||||
} |
||||
|
||||
defer t.lock.RUnlock() |
||||
|
||||
if _, err := t.head.Write(blob); err != nil { |
||||
return err |
||||
} |
||||
newOffset := atomic.AddUint32(&t.headBytes, bLen) |
||||
idx := indexEntry{ |
||||
filenum: atomic.LoadUint32(&t.headId), |
||||
offset: newOffset, |
||||
} |
||||
// Write indexEntry
|
||||
t.index.Write(idx.marshallBinary()) |
||||
t.writeMeter.Mark(int64(bLen + indexEntrySize)) |
||||
atomic.AddUint64(&t.items, 1) |
||||
return nil |
||||
} |
||||
|
||||
// getBounds returns the indexes for the item
|
||||
// returns start, end, filenumber and error
|
||||
func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) { |
||||
var startIdx, endIdx indexEntry |
||||
buffer := make([]byte, indexEntrySize) |
||||
if _, err := t.index.ReadAt(buffer, int64(item*indexEntrySize)); err != nil { |
||||
return 0, 0, 0, err |
||||
} |
||||
startIdx.unmarshalBinary(buffer) |
||||
if _, err := t.index.ReadAt(buffer, int64((item+1)*indexEntrySize)); err != nil { |
||||
return 0, 0, 0, err |
||||
} |
||||
endIdx.unmarshalBinary(buffer) |
||||
if startIdx.filenum != endIdx.filenum { |
||||
// If a piece of data 'crosses' a data-file,
|
||||
// it's actually in one piece on the second data-file.
|
||||
// We return a zero-indexEntry for the second file as start
|
||||
return 0, endIdx.offset, endIdx.filenum, nil |
||||
} |
||||
return startIdx.offset, endIdx.offset, endIdx.filenum, nil |
||||
} |
||||
|
||||
// Retrieve looks up the data offset of an item with the given number and retrieves
|
||||
// the raw binary blob from the data file.
|
||||
func (t *freezerTable) Retrieve(item uint64) ([]byte, error) { |
||||
// Ensure the table and the item is accessible
|
||||
if t.index == nil || t.head == nil { |
||||
return nil, errClosed |
||||
} |
||||
if atomic.LoadUint64(&t.items) <= item { |
||||
return nil, errOutOfBounds |
||||
} |
||||
// Ensure the item was not deleted from the tail either
|
||||
offset := atomic.LoadUint32(&t.itemOffset) |
||||
if uint64(offset) > item { |
||||
return nil, errOutOfBounds |
||||
} |
||||
t.lock.RLock() |
||||
startOffset, endOffset, filenum, err := t.getBounds(item - uint64(offset)) |
||||
if err != nil { |
||||
t.lock.RUnlock() |
||||
return nil, err |
||||
} |
||||
dataFile, exist := t.files[filenum] |
||||
if !exist { |
||||
t.lock.RUnlock() |
||||
return nil, fmt.Errorf("missing data file %d", filenum) |
||||
} |
||||
// Retrieve the data itself, decompress and return
|
||||
blob := make([]byte, endOffset-startOffset) |
||||
if _, err := dataFile.ReadAt(blob, int64(startOffset)); err != nil { |
||||
t.lock.RUnlock() |
||||
return nil, err |
||||
} |
||||
t.lock.RUnlock() |
||||
t.readMeter.Mark(int64(len(blob) + 2*indexEntrySize)) |
||||
|
||||
if t.noCompression { |
||||
return blob, nil |
||||
} |
||||
return snappy.Decode(nil, blob) |
||||
} |
||||
|
||||
// has returns an indicator whether the specified number data
|
||||
// exists in the freezer table.
|
||||
func (t *freezerTable) has(number uint64) bool { |
||||
return atomic.LoadUint64(&t.items) > number |
||||
} |
||||
|
||||
// size returns the total data size in the freezer table.
|
||||
func (t *freezerTable) size() (uint64, error) { |
||||
t.lock.RLock() |
||||
defer t.lock.RUnlock() |
||||
|
||||
stat, err := t.index.Stat() |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
total := uint64(t.maxFileSize)*uint64(t.headId-t.tailId) + uint64(t.headBytes) + uint64(stat.Size()) |
||||
return total, nil |
||||
} |
||||
|
||||
// Sync pushes any pending data from memory out to disk. This is an expensive
|
||||
// operation, so use it with care.
|
||||
func (t *freezerTable) Sync() error { |
||||
if err := t.index.Sync(); err != nil { |
||||
return err |
||||
} |
||||
return t.head.Sync() |
||||
} |
||||
|
||||
// printIndex is a debug print utility function for testing
|
||||
func (t *freezerTable) printIndex() { |
||||
buf := make([]byte, indexEntrySize) |
||||
|
||||
fmt.Printf("|-----------------|\n") |
||||
fmt.Printf("| fileno | offset |\n") |
||||
fmt.Printf("|--------+--------|\n") |
||||
|
||||
for i := uint64(0); ; i++ { |
||||
if _, err := t.index.ReadAt(buf, int64(i*indexEntrySize)); err != nil { |
||||
break |
||||
} |
||||
var entry indexEntry |
||||
entry.unmarshalBinary(buf) |
||||
fmt.Printf("| %03d | %03d | \n", entry.filenum, entry.offset) |
||||
if i > 100 { |
||||
fmt.Printf(" ... \n") |
||||
break |
||||
} |
||||
} |
||||
fmt.Printf("|-----------------|\n") |
||||
} |
@ -0,0 +1,609 @@ |
||||
// Copyright 2018 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package rawdb |
||||
|
||||
import ( |
||||
"bytes" |
||||
"fmt" |
||||
"math/rand" |
||||
"os" |
||||
"path/filepath" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/ethereum/go-ethereum/metrics" |
||||
) |
||||
|
||||
func init() { |
||||
rand.Seed(time.Now().Unix()) |
||||
} |
||||
|
||||
// Gets a chunk of data, filled with 'b'
|
||||
func getChunk(size int, b int) []byte { |
||||
data := make([]byte, size) |
||||
for i := range data { |
||||
data[i] = byte(b) |
||||
} |
||||
return data |
||||
} |
||||
|
||||
func print(t *testing.T, f *freezerTable, item uint64) { |
||||
a, err := f.Retrieve(item) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
fmt.Printf("db[%d] = %x\n", item, a) |
||||
} |
||||
|
||||
// TestFreezerBasics test initializing a freezertable from scratch, writing to the table,
|
||||
// and reading it back.
|
||||
func TestFreezerBasics(t *testing.T) { |
||||
t.Parallel() |
||||
// set cutoff at 50 bytes
|
||||
f, err := newCustomTable(os.TempDir(), |
||||
fmt.Sprintf("unittest-%d", rand.Uint64()), |
||||
metrics.NewMeter(), metrics.NewMeter(), 50, true) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
defer f.Close() |
||||
// Write 15 bytes 255 times, results in 85 files
|
||||
for x := 0; x < 255; x++ { |
||||
data := getChunk(15, x) |
||||
f.Append(uint64(x), data) |
||||
} |
||||
|
||||
//print(t, f, 0)
|
||||
//print(t, f, 1)
|
||||
//print(t, f, 2)
|
||||
//
|
||||
//db[0] = 000000000000000000000000000000
|
||||
//db[1] = 010101010101010101010101010101
|
||||
//db[2] = 020202020202020202020202020202
|
||||
|
||||
for y := 0; y < 255; y++ { |
||||
exp := getChunk(15, y) |
||||
got, err := f.Retrieve(uint64(y)) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
if !bytes.Equal(got, exp) { |
||||
t.Fatalf("test %d, got \n%x != \n%x", y, got, exp) |
||||
} |
||||
} |
||||
// Check that we cannot read too far
|
||||
_, err = f.Retrieve(uint64(255)) |
||||
if err != errOutOfBounds { |
||||
t.Fatal(err) |
||||
} |
||||
} |
||||
|
||||
// TestFreezerBasicsClosing tests same as TestFreezerBasics, but also closes and reopens the freezer between
|
||||
// every operation
|
||||
func TestFreezerBasicsClosing(t *testing.T) { |
||||
t.Parallel() |
||||
// set cutoff at 50 bytes
|
||||
var ( |
||||
fname = fmt.Sprintf("basics-close-%d", rand.Uint64()) |
||||
m1, m2 = metrics.NewMeter(), metrics.NewMeter() |
||||
f *freezerTable |
||||
err error |
||||
) |
||||
f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
// Write 15 bytes 255 times, results in 85 files
|
||||
for x := 0; x < 255; x++ { |
||||
data := getChunk(15, x) |
||||
f.Append(uint64(x), data) |
||||
f.Close() |
||||
f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true) |
||||
} |
||||
defer f.Close() |
||||
|
||||
for y := 0; y < 255; y++ { |
||||
exp := getChunk(15, y) |
||||
got, err := f.Retrieve(uint64(y)) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
if !bytes.Equal(got, exp) { |
||||
t.Fatalf("test %d, got \n%x != \n%x", y, got, exp) |
||||
} |
||||
f.Close() |
||||
f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
} |
||||
} |
||||
|
||||
// TestFreezerRepairDanglingHead tests that we can recover if index entries are removed
|
||||
func TestFreezerRepairDanglingHead(t *testing.T) { |
||||
t.Parallel() |
||||
wm, rm := metrics.NewMeter(), metrics.NewMeter() |
||||
fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64()) |
||||
|
||||
{ // Fill table
|
||||
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
// Write 15 bytes 255 times
|
||||
for x := 0; x < 255; x++ { |
||||
data := getChunk(15, x) |
||||
f.Append(uint64(x), data) |
||||
} |
||||
// The last item should be there
|
||||
if _, err = f.Retrieve(0xfe); err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
f.Close() |
||||
} |
||||
// open the index
|
||||
idxFile, err := os.OpenFile(filepath.Join(os.TempDir(), fmt.Sprintf("%s.ridx", fname)), os.O_RDWR, 0644) |
||||
if err != nil { |
||||
t.Fatalf("Failed to open index file: %v", err) |
||||
} |
||||
// Remove 4 bytes
|
||||
stat, err := idxFile.Stat() |
||||
if err != nil { |
||||
t.Fatalf("Failed to stat index file: %v", err) |
||||
} |
||||
idxFile.Truncate(stat.Size() - 4) |
||||
idxFile.Close() |
||||
// Now open it again
|
||||
{ |
||||
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) |
||||
// The last item should be missing
|
||||
if _, err = f.Retrieve(0xff); err == nil { |
||||
t.Errorf("Expected error for missing index entry") |
||||
} |
||||
// The one before should still be there
|
||||
if _, err = f.Retrieve(0xfd); err != nil { |
||||
t.Fatalf("Expected no error, got %v", err) |
||||
} |
||||
} |
||||
} |
||||
|
||||
// TestFreezerRepairDanglingHeadLarge tests that we can recover if very many index entries are removed
|
||||
func TestFreezerRepairDanglingHeadLarge(t *testing.T) { |
||||
t.Parallel() |
||||
wm, rm := metrics.NewMeter(), metrics.NewMeter() |
||||
fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64()) |
||||
|
||||
{ // Fill a table and close it
|
||||
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
// Write 15 bytes 255 times
|
||||
for x := 0; x < 0xff; x++ { |
||||
data := getChunk(15, x) |
||||
f.Append(uint64(x), data) |
||||
} |
||||
// The last item should be there
|
||||
if _, err = f.Retrieve(f.items - 1); err == nil { |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
} |
||||
f.Close() |
||||
} |
||||
// open the index
|
||||
idxFile, err := os.OpenFile(filepath.Join(os.TempDir(), fmt.Sprintf("%s.ridx", fname)), os.O_RDWR, 0644) |
||||
if err != nil { |
||||
t.Fatalf("Failed to open index file: %v", err) |
||||
} |
||||
// Remove everything but the first item, and leave data unaligned
|
||||
// 0-indexEntry, 1-indexEntry, corrupt-indexEntry
|
||||
idxFile.Truncate(indexEntrySize + indexEntrySize + indexEntrySize/2) |
||||
idxFile.Close() |
||||
// Now open it again
|
||||
{ |
||||
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) |
||||
// The first item should be there
|
||||
if _, err = f.Retrieve(0); err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
// The second item should be missing
|
||||
if _, err = f.Retrieve(1); err == nil { |
||||
t.Errorf("Expected error for missing index entry") |
||||
} |
||||
// We should now be able to store items again, from item = 1
|
||||
for x := 1; x < 0xff; x++ { |
||||
data := getChunk(15, ^x) |
||||
f.Append(uint64(x), data) |
||||
} |
||||
f.Close() |
||||
} |
||||
// And if we open it, we should now be able to read all of them (new values)
|
||||
{ |
||||
f, _ := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) |
||||
for y := 1; y < 255; y++ { |
||||
exp := getChunk(15, ^y) |
||||
got, err := f.Retrieve(uint64(y)) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
if !bytes.Equal(got, exp) { |
||||
t.Fatalf("test %d, got \n%x != \n%x", y, got, exp) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// TestSnappyDetection tests that we fail to open a snappy database and vice versa
|
||||
func TestSnappyDetection(t *testing.T) { |
||||
t.Parallel() |
||||
wm, rm := metrics.NewMeter(), metrics.NewMeter() |
||||
fname := fmt.Sprintf("snappytest-%d", rand.Uint64()) |
||||
// Open with snappy
|
||||
{ |
||||
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
// Write 15 bytes 255 times
|
||||
for x := 0; x < 0xff; x++ { |
||||
data := getChunk(15, x) |
||||
f.Append(uint64(x), data) |
||||
} |
||||
f.Close() |
||||
} |
||||
// Open without snappy
|
||||
{ |
||||
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, false) |
||||
if _, err = f.Retrieve(0); err == nil { |
||||
f.Close() |
||||
t.Fatalf("expected empty table") |
||||
} |
||||
} |
||||
|
||||
// Open with snappy
|
||||
{ |
||||
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) |
||||
// There should be 255 items
|
||||
if _, err = f.Retrieve(0xfe); err != nil { |
||||
f.Close() |
||||
t.Fatalf("expected no error, got %v", err) |
||||
} |
||||
} |
||||
|
||||
} |
||||
func assertFileSize(f string, size int64) error { |
||||
stat, err := os.Stat(f) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if stat.Size() != size { |
||||
return fmt.Errorf("error, expected size %d, got %d", size, stat.Size()) |
||||
} |
||||
return nil |
||||
|
||||
} |
||||
|
||||
// TestFreezerRepairDanglingIndex checks that if the index has more entries than there are data,
|
||||
// the index is repaired
|
||||
func TestFreezerRepairDanglingIndex(t *testing.T) { |
||||
t.Parallel() |
||||
wm, rm := metrics.NewMeter(), metrics.NewMeter() |
||||
fname := fmt.Sprintf("dangling_indextest-%d", rand.Uint64()) |
||||
|
||||
{ // Fill a table and close it
|
||||
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
// Write 15 bytes 9 times : 150 bytes
|
||||
for x := 0; x < 9; x++ { |
||||
data := getChunk(15, x) |
||||
f.Append(uint64(x), data) |
||||
} |
||||
// The last item should be there
|
||||
if _, err = f.Retrieve(f.items - 1); err != nil { |
||||
f.Close() |
||||
t.Fatal(err) |
||||
} |
||||
f.Close() |
||||
// File sizes should be 45, 45, 45 : items[3, 3, 3)
|
||||
} |
||||
// Crop third file
|
||||
fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.0002.rdat", fname)) |
||||
// Truncate third file: 45 ,45, 20
|
||||
{ |
||||
if err := assertFileSize(fileToCrop, 45); err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
file, err := os.OpenFile(fileToCrop, os.O_RDWR, 0644) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
file.Truncate(20) |
||||
file.Close() |
||||
} |
||||
// Open db it again
|
||||
// It should restore the file(s) to
|
||||
// 45, 45, 15
|
||||
// with 3+3+1 items
|
||||
{ |
||||
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
if f.items != 7 { |
||||
f.Close() |
||||
t.Fatalf("expected %d items, got %d", 7, f.items) |
||||
} |
||||
if err := assertFileSize(fileToCrop, 15); err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
} |
||||
} |
||||
|
||||
func TestFreezerTruncate(t *testing.T) { |
||||
|
||||
t.Parallel() |
||||
wm, rm := metrics.NewMeter(), metrics.NewMeter() |
||||
fname := fmt.Sprintf("truncation-%d", rand.Uint64()) |
||||
|
||||
{ // Fill table
|
||||
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
// Write 15 bytes 30 times
|
||||
for x := 0; x < 30; x++ { |
||||
data := getChunk(15, x) |
||||
f.Append(uint64(x), data) |
||||
} |
||||
// The last item should be there
|
||||
if _, err = f.Retrieve(f.items - 1); err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
f.Close() |
||||
} |
||||
// Reopen, truncate
|
||||
{ |
||||
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
defer f.Close() |
||||
f.truncate(10) // 150 bytes
|
||||
if f.items != 10 { |
||||
t.Fatalf("expected %d items, got %d", 10, f.items) |
||||
} |
||||
// 45, 45, 45, 15 -- bytes should be 15
|
||||
if f.headBytes != 15 { |
||||
t.Fatalf("expected %d bytes, got %d", 15, f.headBytes) |
||||
} |
||||
|
||||
} |
||||
|
||||
} |
||||
|
||||
// TestFreezerRepairFirstFile tests a head file with the very first item only half-written.
|
||||
// That will rewind the index, and _should_ truncate the head file
|
||||
func TestFreezerRepairFirstFile(t *testing.T) { |
||||
t.Parallel() |
||||
wm, rm := metrics.NewMeter(), metrics.NewMeter() |
||||
fname := fmt.Sprintf("truncationfirst-%d", rand.Uint64()) |
||||
{ // Fill table
|
||||
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
// Write 80 bytes, splitting out into two files
|
||||
f.Append(0, getChunk(40, 0xFF)) |
||||
f.Append(1, getChunk(40, 0xEE)) |
||||
// The last item should be there
|
||||
if _, err = f.Retrieve(f.items - 1); err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
f.Close() |
||||
} |
||||
// Truncate the file in half
|
||||
fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.0001.rdat", fname)) |
||||
{ |
||||
if err := assertFileSize(fileToCrop, 40); err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
file, err := os.OpenFile(fileToCrop, os.O_RDWR, 0644) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
file.Truncate(20) |
||||
file.Close() |
||||
} |
||||
// Reopen
|
||||
{ |
||||
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
if f.items != 1 { |
||||
f.Close() |
||||
t.Fatalf("expected %d items, got %d", 0, f.items) |
||||
} |
||||
// Write 40 bytes
|
||||
f.Append(1, getChunk(40, 0xDD)) |
||||
f.Close() |
||||
// Should have been truncated down to zero and then 40 written
|
||||
if err := assertFileSize(fileToCrop, 40); err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
} |
||||
} |
||||
|
||||
// TestFreezerReadAndTruncate tests:
|
||||
// - we have a table open
|
||||
// - do some reads, so files are open in readonly
|
||||
// - truncate so those files are 'removed'
|
||||
// - check that we did not keep the rdonly file descriptors
|
||||
func TestFreezerReadAndTruncate(t *testing.T) { |
||||
t.Parallel() |
||||
wm, rm := metrics.NewMeter(), metrics.NewMeter() |
||||
fname := fmt.Sprintf("read_truncate-%d", rand.Uint64()) |
||||
{ // Fill table
|
||||
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
// Write 15 bytes 30 times
|
||||
for x := 0; x < 30; x++ { |
||||
data := getChunk(15, x) |
||||
f.Append(uint64(x), data) |
||||
} |
||||
// The last item should be there
|
||||
if _, err = f.Retrieve(f.items - 1); err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
f.Close() |
||||
} |
||||
// Reopen and read all files
|
||||
{ |
||||
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
if f.items != 30 { |
||||
f.Close() |
||||
t.Fatalf("expected %d items, got %d", 0, f.items) |
||||
} |
||||
for y := byte(0); y < 30; y++ { |
||||
f.Retrieve(uint64(y)) |
||||
} |
||||
// Now, truncate back to zero
|
||||
f.truncate(0) |
||||
// Write the data again
|
||||
for x := 0; x < 30; x++ { |
||||
data := getChunk(15, ^x) |
||||
if err := f.Append(uint64(x), data); err != nil { |
||||
t.Fatalf("error %v", err) |
||||
} |
||||
} |
||||
f.Close() |
||||
} |
||||
} |
||||
|
||||
func TestOffset(t *testing.T) { |
||||
t.Parallel() |
||||
wm, rm := metrics.NewMeter(), metrics.NewMeter() |
||||
fname := fmt.Sprintf("offset-%d", rand.Uint64()) |
||||
{ // Fill table
|
||||
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 40, true) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
// Write 6 x 20 bytes, splitting out into three files
|
||||
f.Append(0, getChunk(20, 0xFF)) |
||||
f.Append(1, getChunk(20, 0xEE)) |
||||
|
||||
f.Append(2, getChunk(20, 0xdd)) |
||||
f.Append(3, getChunk(20, 0xcc)) |
||||
|
||||
f.Append(4, getChunk(20, 0xbb)) |
||||
f.Append(5, getChunk(20, 0xaa)) |
||||
f.printIndex() |
||||
f.Close() |
||||
} |
||||
// Now crop it.
|
||||
{ |
||||
// delete files 0 and 1
|
||||
for i := 0; i < 2; i++ { |
||||
p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.%04d.rdat", fname, i)) |
||||
if err := os.Remove(p); err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
} |
||||
// Read the index file
|
||||
p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.ridx", fname)) |
||||
indexFile, err := os.OpenFile(p, os.O_RDWR, 0644) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
indexBuf := make([]byte, 7*indexEntrySize) |
||||
indexFile.Read(indexBuf) |
||||
|
||||
// Update the index file, so that we store
|
||||
// [ file = 2, offset = 4 ] at index zero
|
||||
|
||||
tailId := uint32(2) // First file is 2
|
||||
itemOffset := uint32(4) // We have removed four items
|
||||
zeroIndex := indexEntry{ |
||||
offset: tailId, |
||||
filenum: itemOffset, |
||||
} |
||||
buf := zeroIndex.marshallBinary() |
||||
// Overwrite index zero
|
||||
copy(indexBuf, buf) |
||||
// Remove the four next indices by overwriting
|
||||
copy(indexBuf[indexEntrySize:], indexBuf[indexEntrySize*5:]) |
||||
indexFile.WriteAt(indexBuf, 0) |
||||
// Need to truncate the moved index items
|
||||
indexFile.Truncate(indexEntrySize * (1 + 2)) |
||||
indexFile.Close() |
||||
|
||||
} |
||||
// Now open again
|
||||
{ |
||||
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 40, true) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
f.printIndex() |
||||
// It should allow writing item 6
|
||||
f.Append(6, getChunk(20, 0x99)) |
||||
|
||||
// It should be fine to fetch 4,5,6
|
||||
if got, err := f.Retrieve(4); err != nil { |
||||
t.Fatal(err) |
||||
} else if exp := getChunk(20, 0xbb); !bytes.Equal(got, exp) { |
||||
t.Fatalf("expected %x got %x", exp, got) |
||||
} |
||||
if got, err := f.Retrieve(5); err != nil { |
||||
t.Fatal(err) |
||||
} else if exp := getChunk(20, 0xaa); !bytes.Equal(got, exp) { |
||||
t.Fatalf("expected %x got %x", exp, got) |
||||
} |
||||
if got, err := f.Retrieve(6); err != nil { |
||||
t.Fatal(err) |
||||
} else if exp := getChunk(20, 0x99); !bytes.Equal(got, exp) { |
||||
t.Fatalf("expected %x got %x", exp, got) |
||||
} |
||||
|
||||
// It should error at 0, 1,2,3
|
||||
for i := 0; i < 4; i++ { |
||||
if _, err := f.Retrieve(uint64(i)); err == nil { |
||||
t.Fatal("expected err") |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// TODO (?)
|
||||
// - test that if we remove several head-files, aswell as data last data-file,
|
||||
// the index is truncated accordingly
|
||||
// Right now, the freezer would fail on these conditions:
|
||||
// 1. have data files d0, d1, d2, d3
|
||||
// 2. remove d2,d3
|
||||
//
|
||||
// However, all 'normal' failure modes arising due to failing to sync() or save a file should be
|
||||
// handled already, and the case described above can only (?) happen if an external process/user
|
||||
// deletes files from the filesystem.
|
@ -0,0 +1,134 @@ |
||||
package tablewriter |
||||
|
||||
import ( |
||||
"fmt" |
||||
"strconv" |
||||
"strings" |
||||
) |
||||
|
||||
const ESC = "\033" |
||||
const SEP = ";" |
||||
|
||||
const ( |
||||
BgBlackColor int = iota + 40 |
||||
BgRedColor |
||||
BgGreenColor |
||||
BgYellowColor |
||||
BgBlueColor |
||||
BgMagentaColor |
||||
BgCyanColor |
||||
BgWhiteColor |
||||
) |
||||
|
||||
const ( |
||||
FgBlackColor int = iota + 30 |
||||
FgRedColor |
||||
FgGreenColor |
||||
FgYellowColor |
||||
FgBlueColor |
||||
FgMagentaColor |
||||
FgCyanColor |
||||
FgWhiteColor |
||||
) |
||||
|
||||
const ( |
||||
BgHiBlackColor int = iota + 100 |
||||
BgHiRedColor |
||||
BgHiGreenColor |
||||
BgHiYellowColor |
||||
BgHiBlueColor |
||||
BgHiMagentaColor |
||||
BgHiCyanColor |
||||
BgHiWhiteColor |
||||
) |
||||
|
||||
const ( |
||||
FgHiBlackColor int = iota + 90 |
||||
FgHiRedColor |
||||
FgHiGreenColor |
||||
FgHiYellowColor |
||||
FgHiBlueColor |
||||
FgHiMagentaColor |
||||
FgHiCyanColor |
||||
FgHiWhiteColor |
||||
) |
||||
|
||||
const ( |
||||
Normal = 0 |
||||
Bold = 1 |
||||
UnderlineSingle = 4 |
||||
Italic |
||||
) |
||||
|
||||
type Colors []int |
||||
|
||||
func startFormat(seq string) string { |
||||
return fmt.Sprintf("%s[%sm", ESC, seq) |
||||
} |
||||
|
||||
func stopFormat() string { |
||||
return fmt.Sprintf("%s[%dm", ESC, Normal) |
||||
} |
||||
|
||||
// Making the SGR (Select Graphic Rendition) sequence.
|
||||
func makeSequence(codes []int) string { |
||||
codesInString := []string{} |
||||
for _, code := range codes { |
||||
codesInString = append(codesInString, strconv.Itoa(code)) |
||||
} |
||||
return strings.Join(codesInString, SEP) |
||||
} |
||||
|
||||
// Adding ANSI escape sequences before and after string
|
||||
func format(s string, codes interface{}) string { |
||||
var seq string |
||||
|
||||
switch v := codes.(type) { |
||||
|
||||
case string: |
||||
seq = v |
||||
case []int: |
||||
seq = makeSequence(v) |
||||
default: |
||||
return s |
||||
} |
||||
|
||||
if len(seq) == 0 { |
||||
return s |
||||
} |
||||
return startFormat(seq) + s + stopFormat() |
||||
} |
||||
|
||||
// Adding header colors (ANSI codes)
|
||||
func (t *Table) SetHeaderColor(colors ...Colors) { |
||||
if t.colSize != len(colors) { |
||||
panic("Number of header colors must be equal to number of headers.") |
||||
} |
||||
for i := 0; i < len(colors); i++ { |
||||
t.headerParams = append(t.headerParams, makeSequence(colors[i])) |
||||
} |
||||
} |
||||
|
||||
// Adding column colors (ANSI codes)
|
||||
func (t *Table) SetColumnColor(colors ...Colors) { |
||||
if t.colSize != len(colors) { |
||||
panic("Number of column colors must be equal to number of headers.") |
||||
} |
||||
for i := 0; i < len(colors); i++ { |
||||
t.columnsParams = append(t.columnsParams, makeSequence(colors[i])) |
||||
} |
||||
} |
||||
|
||||
// Adding column colors (ANSI codes)
|
||||
func (t *Table) SetFooterColor(colors ...Colors) { |
||||
if len(t.footers) != len(colors) { |
||||
panic("Number of footer colors must be equal to number of footer.") |
||||
} |
||||
for i := 0; i < len(colors); i++ { |
||||
t.footerParams = append(t.footerParams, makeSequence(colors[i])) |
||||
} |
||||
} |
||||
|
||||
func Color(colors ...int) []int { |
||||
return colors |
||||
} |
|
|
Loading…
Reference in new issue