|
|
|
@ -108,11 +108,13 @@ type freezerTable struct { |
|
|
|
|
|
|
|
|
|
head *os.File // File descriptor for the data head of the table
|
|
|
|
|
index *os.File // File descriptor for the indexEntry file of the table
|
|
|
|
|
meta *os.File // File descriptor for metadata of the table
|
|
|
|
|
files map[uint32]*os.File // open files
|
|
|
|
|
headId uint32 // number of the currently active head file
|
|
|
|
|
tailId uint32 // number of the earliest file
|
|
|
|
|
|
|
|
|
|
metadata *freezerTableMeta // metadata of the table
|
|
|
|
|
lastSync time.Time // Timestamp when the last sync was performed
|
|
|
|
|
|
|
|
|
|
headBytes int64 // Number of bytes written to the head file
|
|
|
|
|
readMeter *metrics.Meter // Meter for measuring the effective amount of data read
|
|
|
|
|
writeMeter *metrics.Meter // Meter for measuring the effective amount of data written
|
|
|
|
@ -166,10 +168,17 @@ func newTable(path string, name string, readMeter, writeMeter *metrics.Meter, si |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Load metadata from the file. The tag will be true if legacy metadata
|
|
|
|
|
// is detected.
|
|
|
|
|
metadata, err := newMetadata(meta) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
// Create the table and repair any past inconsistency
|
|
|
|
|
tab := &freezerTable{ |
|
|
|
|
index: index, |
|
|
|
|
meta: meta, |
|
|
|
|
metadata: metadata, |
|
|
|
|
lastSync: time.Now(), |
|
|
|
|
files: make(map[uint32]*os.File), |
|
|
|
|
readMeter: readMeter, |
|
|
|
|
writeMeter: writeMeter, |
|
|
|
@ -221,13 +230,11 @@ func (t *freezerTable) repair() error { |
|
|
|
|
return err |
|
|
|
|
} // New file can't trigger this path
|
|
|
|
|
} |
|
|
|
|
// Validate the index file as it might contain some garbage data after the
|
|
|
|
|
// power failures.
|
|
|
|
|
if err := t.repairIndex(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
// Retrieve the file sizes and prepare for truncation. Note the file size
|
|
|
|
|
// might be changed after index validation.
|
|
|
|
|
// might be changed after index repair.
|
|
|
|
|
if stat, err = t.index.Stat(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
@ -253,12 +260,14 @@ func (t *freezerTable) repair() error { |
|
|
|
|
t.tailId = firstIndex.filenum |
|
|
|
|
t.itemOffset.Store(uint64(firstIndex.offset)) |
|
|
|
|
|
|
|
|
|
// Load metadata from the file
|
|
|
|
|
meta, err := loadMetadata(t.meta, t.itemOffset.Load()) |
|
|
|
|
if err != nil { |
|
|
|
|
// Adjust the number of hidden items if it is less than the number of items
|
|
|
|
|
// being removed.
|
|
|
|
|
if t.itemOffset.Load() > t.metadata.virtualTail { |
|
|
|
|
if err := t.metadata.setVirtualTail(t.itemOffset.Load(), true); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
t.itemHidden.Store(meta.VirtualTail) |
|
|
|
|
} |
|
|
|
|
t.itemHidden.Store(t.metadata.virtualTail) |
|
|
|
|
|
|
|
|
|
// Read the last index, use the default value in case the freezer is empty
|
|
|
|
|
if offsetsSize == indexEntrySize { |
|
|
|
@ -267,12 +276,6 @@ func (t *freezerTable) repair() error { |
|
|
|
|
t.index.ReadAt(buffer, offsetsSize-indexEntrySize) |
|
|
|
|
lastIndex.unmarshalBinary(buffer) |
|
|
|
|
} |
|
|
|
|
// Print an error log if the index is corrupted due to an incorrect
|
|
|
|
|
// last index item. While it is theoretically possible to have a zero offset
|
|
|
|
|
// by storing all zero-size items, it is highly unlikely to occur in practice.
|
|
|
|
|
if lastIndex.offset == 0 && offsetsSize/indexEntrySize > 1 { |
|
|
|
|
log.Error("Corrupted index file detected", "lastOffset", lastIndex.offset, "indexes", offsetsSize/indexEntrySize) |
|
|
|
|
} |
|
|
|
|
if t.readonly { |
|
|
|
|
t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForReadOnly) |
|
|
|
|
} else { |
|
|
|
@ -293,6 +296,7 @@ func (t *freezerTable) repair() error { |
|
|
|
|
return fmt.Errorf("freezer table(path: %s, name: %s, num: %d) is corrupted", t.path, t.name, lastIndex.filenum) |
|
|
|
|
} |
|
|
|
|
verbose = true |
|
|
|
|
|
|
|
|
|
// Truncate the head file to the last offset pointer
|
|
|
|
|
if contentExp < contentSize { |
|
|
|
|
t.logger.Warn("Truncating dangling head", "indexed", contentExp, "stored", contentSize) |
|
|
|
@ -304,11 +308,23 @@ func (t *freezerTable) repair() error { |
|
|
|
|
// Truncate the index to point within the head file
|
|
|
|
|
if contentExp > contentSize { |
|
|
|
|
t.logger.Warn("Truncating dangling indexes", "indexes", offsetsSize/indexEntrySize, "indexed", contentExp, "stored", contentSize) |
|
|
|
|
if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil { |
|
|
|
|
|
|
|
|
|
newOffset := offsetsSize - indexEntrySize |
|
|
|
|
if err := truncateFreezerFile(t.index, newOffset); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
offsetsSize -= indexEntrySize |
|
|
|
|
|
|
|
|
|
// If the index file is truncated beyond the flush offset, move the flush
|
|
|
|
|
// offset back to the new end of the file. A crash may occur before the
|
|
|
|
|
// offset is updated, leaving a dangling reference that points to a position
|
|
|
|
|
// outside the file. If so, the offset will be reset to the new end of the
|
|
|
|
|
// file during the next run.
|
|
|
|
|
if t.metadata.flushOffset < uint64(newOffset) { |
|
|
|
|
if err := t.metadata.setFlushOffset(uint64(newOffset), true); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Read the new head index, use the default value in case
|
|
|
|
|
// the freezer is already empty.
|
|
|
|
|
var newLastIndex indexEntry |
|
|
|
@ -345,7 +361,7 @@ func (t *freezerTable) repair() error { |
|
|
|
|
if err := t.head.Sync(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if err := t.meta.Sync(); err != nil { |
|
|
|
|
if err := t.metadata.file.Sync(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -372,7 +388,60 @@ func (t *freezerTable) repair() error { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// repairIndex validates the integrity of the index file. According to the design,
|
|
|
|
|
func (t *freezerTable) repairIndex() error { |
|
|
|
|
stat, err := t.index.Stat() |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
offset := stat.Size() |
|
|
|
|
|
|
|
|
|
// Validate the items in the index file to ensure the data integrity.
|
|
|
|
|
// It's possible some garbage data is retained in the index file after
|
|
|
|
|
// the power failures and should be truncated first.
|
|
|
|
|
offset, err = t.checkIndex(offset) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
// If legacy metadata is detected, attempt to recover the offset from the
|
|
|
|
|
// index file to avoid clearing the entire table.
|
|
|
|
|
if t.metadata.version == freezerTableV1 { |
|
|
|
|
t.logger.Info("Recover the flush offset for legacy table", "offset", offset) |
|
|
|
|
return t.metadata.setFlushOffset(uint64(offset), true) |
|
|
|
|
} |
|
|
|
|
// Move the flush offset to the end of the file for fresh new freezer table
|
|
|
|
|
if offset == indexEntrySize && t.metadata.flushOffset == 0 { |
|
|
|
|
return t.metadata.setFlushOffset(uint64(offset), true) |
|
|
|
|
} |
|
|
|
|
// Short circuit if the offset is aligned with the index file
|
|
|
|
|
if offset == int64(t.metadata.flushOffset) { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
// Extra index items have been detected beyond the flush offset. Since these
|
|
|
|
|
// entries correspond to data that has not been fully flushed to disk in the
|
|
|
|
|
// last run (because of unclean shutdown), their integrity cannot be guaranteed.
|
|
|
|
|
// To ensure consistency, these index items will be truncated, as there is no
|
|
|
|
|
// reliable way to validate or recover their associated data.
|
|
|
|
|
if offset > int64(t.metadata.flushOffset) { |
|
|
|
|
size := offset - int64(t.metadata.flushOffset) |
|
|
|
|
if t.readonly { |
|
|
|
|
return fmt.Errorf("index file(path: %s, name: %s) contains garbage data %d", t.path, t.name, size) |
|
|
|
|
} |
|
|
|
|
t.logger.Info("Truncate excessive items", "size", size) |
|
|
|
|
return truncateFreezerFile(t.index, int64(t.metadata.flushOffset)) |
|
|
|
|
} |
|
|
|
|
// Flush offset refers to the position which exceeds the index file. The only
|
|
|
|
|
// possible scenario for this is: power failure or system crash occurs after
|
|
|
|
|
// truncating the segment in index file from head or tail, but without updating
|
|
|
|
|
// the flush offset. In this case, automatically reset the flush offset with
|
|
|
|
|
// the file size which implies the entire index file is complete.
|
|
|
|
|
if t.readonly { |
|
|
|
|
return nil // do nothing in read only mode
|
|
|
|
|
} |
|
|
|
|
t.logger.Info("Rewind the flush offset", "old", t.metadata.flushOffset, "new", offset) |
|
|
|
|
return t.metadata.setFlushOffset(uint64(offset), true) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// checkIndex validates the integrity of the index file. According to the design,
|
|
|
|
|
// the initial entry in the file denotes the earliest data file along with the
|
|
|
|
|
// count of deleted items. Following this, all subsequent entries in the file must
|
|
|
|
|
// be in order. This function identifies any corrupted entries and truncates items
|
|
|
|
@ -392,18 +461,11 @@ func (t *freezerTable) repair() error { |
|
|
|
|
// leftover garbage or if all items in the table have zero size is impossible.
|
|
|
|
|
// In such instances, the file will remain unchanged to prevent potential data
|
|
|
|
|
// loss or misinterpretation.
|
|
|
|
|
func (t *freezerTable) repairIndex() error { |
|
|
|
|
// Retrieve the file sizes and prepare for validation
|
|
|
|
|
stat, err := t.index.Stat() |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
size := stat.Size() |
|
|
|
|
|
|
|
|
|
func (t *freezerTable) checkIndex(size int64) (int64, error) { |
|
|
|
|
// Move the read cursor to the beginning of the file
|
|
|
|
|
_, err = t.index.Seek(0, io.SeekStart) |
|
|
|
|
_, err := t.index.Seek(0, io.SeekStart) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
return 0, err |
|
|
|
|
} |
|
|
|
|
fr := bufio.NewReader(t.index) |
|
|
|
|
|
|
|
|
@ -425,21 +487,21 @@ func (t *freezerTable) repairIndex() error { |
|
|
|
|
entry.unmarshalBinary(buff) |
|
|
|
|
return entry, nil |
|
|
|
|
} |
|
|
|
|
truncate = func(offset int64) error { |
|
|
|
|
truncate = func(offset int64) (int64, error) { |
|
|
|
|
if t.readonly { |
|
|
|
|
return fmt.Errorf("index file is corrupted at %d, size: %d", offset, size) |
|
|
|
|
return 0, fmt.Errorf("index file is corrupted at %d, size: %d", offset, size) |
|
|
|
|
} |
|
|
|
|
if err := truncateFreezerFile(t.index, offset); err != nil { |
|
|
|
|
return err |
|
|
|
|
return 0, err |
|
|
|
|
} |
|
|
|
|
log.Warn("Truncated index file", "offset", offset, "truncated", size-offset) |
|
|
|
|
return nil |
|
|
|
|
return offset, nil |
|
|
|
|
} |
|
|
|
|
) |
|
|
|
|
for offset := int64(0); offset < size; offset += indexEntrySize { |
|
|
|
|
entry, err := read() |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
return 0, err |
|
|
|
|
} |
|
|
|
|
if offset == 0 { |
|
|
|
|
head = entry |
|
|
|
@ -468,10 +530,10 @@ func (t *freezerTable) repairIndex() error { |
|
|
|
|
// the seek operation anyway as a precaution.
|
|
|
|
|
_, err = t.index.Seek(0, io.SeekEnd) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
return 0, err |
|
|
|
|
} |
|
|
|
|
log.Debug("Verified index file", "items", size/indexEntrySize, "elapsed", common.PrettyDuration(time.Since(start))) |
|
|
|
|
return nil |
|
|
|
|
return size, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// checkIndexItems validates the correctness of two consecutive index items based
|
|
|
|
@ -550,12 +612,23 @@ func (t *freezerTable) truncateHead(items uint64) error { |
|
|
|
|
// Truncate the index file first, the tail position is also considered
|
|
|
|
|
// when calculating the new freezer table length.
|
|
|
|
|
length := items - t.itemOffset.Load() |
|
|
|
|
if err := truncateFreezerFile(t.index, int64(length+1)*indexEntrySize); err != nil { |
|
|
|
|
newOffset := (length + 1) * indexEntrySize |
|
|
|
|
if err := truncateFreezerFile(t.index, int64(newOffset)); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if err := t.index.Sync(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
// If the index file is truncated beyond the flush offset, move the flush
|
|
|
|
|
// offset back to the new end of the file. A crash may occur before the
|
|
|
|
|
// offset is updated, leaving a dangling reference that points to a position
|
|
|
|
|
// outside the file. If so, the offset will be reset to the new end of the
|
|
|
|
|
// file during the next run.
|
|
|
|
|
if t.metadata.flushOffset > newOffset { |
|
|
|
|
if err := t.metadata.setFlushOffset(newOffset, true); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Calculate the new expected size of the data file and truncate it
|
|
|
|
|
var expected indexEntry |
|
|
|
|
if length == 0 { |
|
|
|
@ -652,7 +725,10 @@ func (t *freezerTable) truncateTail(items uint64) error { |
|
|
|
|
} |
|
|
|
|
// Update the virtual tail marker and hidden these entries in table.
|
|
|
|
|
t.itemHidden.Store(items) |
|
|
|
|
if err := writeMetadata(t.meta, newMetadata(items)); err != nil { |
|
|
|
|
|
|
|
|
|
// Update the virtual tail without fsync, otherwise it will significantly
|
|
|
|
|
// impact the overall performance.
|
|
|
|
|
if err := t.metadata.setVirtualTail(items, false); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
// Hidden items still fall in the current tail file, no data file
|
|
|
|
@ -664,6 +740,18 @@ func (t *freezerTable) truncateTail(items uint64) error { |
|
|
|
|
if t.tailId > newTailId { |
|
|
|
|
return fmt.Errorf("invalid index, tail-file %d, item-file %d", t.tailId, newTailId) |
|
|
|
|
} |
|
|
|
|
// Sync the table before performing the index tail truncation. A crash may
|
|
|
|
|
// occur after truncating the index file without updating the flush offset,
|
|
|
|
|
// leaving a dangling offset that points to a position outside the file.
|
|
|
|
|
// The offset will be rewound to the end of file during the next run
|
|
|
|
|
// automatically and implicitly assumes all the items within the file are
|
|
|
|
|
// complete.
|
|
|
|
|
//
|
|
|
|
|
// Therefore, forcibly flush everything above the offset to ensure this
|
|
|
|
|
// assumption is satisfied!
|
|
|
|
|
if err := t.syncWithNoLock(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
// Count how many items can be deleted from the file.
|
|
|
|
|
var ( |
|
|
|
|
newDeleted = items |
|
|
|
@ -681,11 +769,6 @@ func (t *freezerTable) truncateTail(items uint64) error { |
|
|
|
|
} |
|
|
|
|
newDeleted = current |
|
|
|
|
} |
|
|
|
|
// Commit the changes of metadata file first before manipulating
|
|
|
|
|
// the indexes file.
|
|
|
|
|
if err := t.meta.Sync(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
// Close the index file before shorten it.
|
|
|
|
|
if err := t.index.Close(); err != nil { |
|
|
|
|
return err |
|
|
|
@ -716,6 +799,21 @@ func (t *freezerTable) truncateTail(items uint64) error { |
|
|
|
|
t.itemOffset.Store(newDeleted) |
|
|
|
|
t.releaseFilesBefore(t.tailId, true) |
|
|
|
|
|
|
|
|
|
// Move the index flush offset backward due to the deletion of an index segment.
|
|
|
|
|
// A crash may occur before the offset is updated, leaving a dangling reference
|
|
|
|
|
// that points to a position outside the file. If so, the offset will be reset
|
|
|
|
|
// to the new end of the file during the next run.
|
|
|
|
|
//
|
|
|
|
|
// Note, both the index and head data file has been persisted before performing
|
|
|
|
|
// tail truncation and all the items in these files are regarded as complete.
|
|
|
|
|
shorten := indexEntrySize * (newDeleted - deleted) |
|
|
|
|
if t.metadata.flushOffset <= shorten { |
|
|
|
|
return fmt.Errorf("invalid index flush offset: %d, shorten: %d", t.metadata.flushOffset, shorten) |
|
|
|
|
} else { |
|
|
|
|
if err := t.metadata.setFlushOffset(t.metadata.flushOffset-shorten, true); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Retrieve the new size and update the total size counter
|
|
|
|
|
newSize, err := t.sizeNolock() |
|
|
|
|
if err != nil { |
|
|
|
@ -725,40 +823,30 @@ func (t *freezerTable) truncateTail(items uint64) error { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Close closes all opened files.
|
|
|
|
|
// Close closes all opened files and finalizes the freezer table for use.
|
|
|
|
|
// This operation must be completed before shutdown to prevent the loss of
|
|
|
|
|
// recent writes.
|
|
|
|
|
func (t *freezerTable) Close() error { |
|
|
|
|
t.lock.Lock() |
|
|
|
|
defer t.lock.Unlock() |
|
|
|
|
|
|
|
|
|
var errs []error |
|
|
|
|
doClose := func(f *os.File, sync bool, close bool) { |
|
|
|
|
if sync && !t.readonly { |
|
|
|
|
if err := f.Sync(); err != nil { |
|
|
|
|
errs = append(errs, err) |
|
|
|
|
} |
|
|
|
|
if err := t.syncWithNoLock(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if close { |
|
|
|
|
var errs []error |
|
|
|
|
doClose := func(f *os.File) { |
|
|
|
|
if err := f.Close(); err != nil { |
|
|
|
|
errs = append(errs, err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Trying to fsync a file opened in rdonly causes "Access denied"
|
|
|
|
|
// error on Windows.
|
|
|
|
|
doClose(t.index, true, true) |
|
|
|
|
doClose(t.meta, true, true) |
|
|
|
|
|
|
|
|
|
// The preopened non-head data-files are all opened in readonly.
|
|
|
|
|
// The head is opened in rw-mode, so we sync it here - but since it's also
|
|
|
|
|
// part of t.files, it will be closed in the loop below.
|
|
|
|
|
doClose(t.head, true, false) // sync but do not close
|
|
|
|
|
|
|
|
|
|
doClose(t.index) |
|
|
|
|
doClose(t.metadata.file) |
|
|
|
|
for _, f := range t.files { |
|
|
|
|
doClose(f, false, true) // close but do not sync
|
|
|
|
|
doClose(f) |
|
|
|
|
} |
|
|
|
|
t.index = nil |
|
|
|
|
t.meta = nil |
|
|
|
|
t.head = nil |
|
|
|
|
t.metadata.file = nil |
|
|
|
|
|
|
|
|
|
if errs != nil { |
|
|
|
|
return fmt.Errorf("%v", errs) |
|
|
|
@ -917,7 +1005,7 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i |
|
|
|
|
defer t.lock.RUnlock() |
|
|
|
|
|
|
|
|
|
// Ensure the table and the item are accessible
|
|
|
|
|
if t.index == nil || t.head == nil || t.meta == nil { |
|
|
|
|
if t.index == nil || t.head == nil || t.metadata.file == nil { |
|
|
|
|
return nil, nil, errClosed |
|
|
|
|
} |
|
|
|
|
var ( |
|
|
|
@ -1042,6 +1130,9 @@ func (t *freezerTable) advanceHead() error { |
|
|
|
|
t.lock.Lock() |
|
|
|
|
defer t.lock.Unlock() |
|
|
|
|
|
|
|
|
|
if err := t.syncWithNoLock(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
// We open the next file in truncated mode -- if this file already
|
|
|
|
|
// exists, we need to start over from scratch on it.
|
|
|
|
|
nextID := t.headId + 1 |
|
|
|
@ -1069,7 +1160,19 @@ func (t *freezerTable) advanceHead() error { |
|
|
|
|
func (t *freezerTable) Sync() error { |
|
|
|
|
t.lock.Lock() |
|
|
|
|
defer t.lock.Unlock() |
|
|
|
|
if t.index == nil || t.head == nil || t.meta == nil { |
|
|
|
|
|
|
|
|
|
return t.syncWithNoLock() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// syncWithNoLock is the internal version of Sync which assumes the lock is
|
|
|
|
|
// already held.
|
|
|
|
|
func (t *freezerTable) syncWithNoLock() error { |
|
|
|
|
// Trying to fsync a file opened in rdonly causes "Access denied"
|
|
|
|
|
// error on Windows.
|
|
|
|
|
if t.readonly { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
if t.index == nil || t.head == nil || t.metadata.file == nil { |
|
|
|
|
return errClosed |
|
|
|
|
} |
|
|
|
|
var err error |
|
|
|
@ -1078,10 +1181,18 @@ func (t *freezerTable) Sync() error { |
|
|
|
|
err = e |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
trackError(t.index.Sync()) |
|
|
|
|
trackError(t.meta.Sync()) |
|
|
|
|
trackError(t.head.Sync()) |
|
|
|
|
|
|
|
|
|
// A crash may occur before the offset is updated, leaving the offset
|
|
|
|
|
// points to a old position. If so, the extra items above the offset
|
|
|
|
|
// will be truncated during the next run.
|
|
|
|
|
stat, err := t.index.Stat() |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
offset := stat.Size() |
|
|
|
|
trackError(t.metadata.setFlushOffset(uint64(offset), true)) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1097,13 +1208,8 @@ func (t *freezerTable) dumpIndexString(start, stop int64) string { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) { |
|
|
|
|
meta, err := readMetadata(t.meta) |
|
|
|
|
if err != nil { |
|
|
|
|
fmt.Fprintf(w, "Failed to decode freezer table %v\n", err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
fmt.Fprintf(w, "Version %d count %d, deleted %d, hidden %d\n", meta.Version, |
|
|
|
|
t.items.Load(), t.itemOffset.Load(), t.itemHidden.Load()) |
|
|
|
|
fmt.Fprintf(w, "Version %d count %d, deleted %d, hidden %d\n", |
|
|
|
|
t.metadata.version, t.items.Load(), t.itemOffset.Load(), t.itemHidden.Load()) |
|
|
|
|
|
|
|
|
|
buf := make([]byte, indexEntrySize) |
|
|
|
|
|
|
|
|
|