// Copyright 2019 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . package rawdb import ( "bufio" "bytes" "encoding/binary" "errors" "fmt" "io" "os" "path/filepath" "sync" "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/golang/snappy" ) var ( // errClosed is returned if an operation attempts to read from or write to the // freezer table after it has already been closed. errClosed = errors.New("closed") // errOutOfBounds is returned if the item requested is not contained within the // freezer table. errOutOfBounds = errors.New("out of bounds") // errNotSupported is returned if the database doesn't support the required operation. errNotSupported = errors.New("this operation is not supported") ) // indexEntry contains the number/id of the file that the data resides in, as well as the // offset within the file to the end of the data. // In serialized form, the filenum is stored as uint16. type indexEntry struct { filenum uint32 // stored as uint16 ( 2 bytes ) offset uint32 // stored as uint32 ( 4 bytes ) } const indexEntrySize = 6 // unmarshalBinary deserializes binary b into the rawIndex entry. func (i *indexEntry) unmarshalBinary(b []byte) { i.filenum = uint32(binary.BigEndian.Uint16(b[:2])) i.offset = binary.BigEndian.Uint32(b[2:6]) } // append adds the encoded entry to the end of b. func (i *indexEntry) append(b []byte) []byte { offset := len(b) out := append(b, make([]byte, indexEntrySize)...) binary.BigEndian.PutUint16(out[offset:], uint16(i.filenum)) binary.BigEndian.PutUint32(out[offset+2:], i.offset) return out } // bounds returns the start- and end- offsets, and the file number of where to // read there data item marked by the two index entries. The two entries are // assumed to be sequential. func (i *indexEntry) bounds(end *indexEntry) (startOffset, endOffset, fileId uint32) { if i.filenum != end.filenum { // If a piece of data 'crosses' a data-file, // it's actually in one piece on the second data-file. // We return a zero-indexEntry for the second file as start return 0, end.offset, end.filenum } return i.offset, end.offset, end.filenum } // freezerTable represents a single chained data table within the freezer (e.g. blocks). // It consists of a data file (snappy encoded arbitrary data blobs) and an indexEntry // file (uncompressed 64 bit indices into the data file). type freezerTable struct { items atomic.Uint64 // Number of items stored in the table (including items removed from tail) itemOffset atomic.Uint64 // Number of items removed from the table // itemHidden is the number of items marked as deleted. Tail deletion is // only supported at file level which means the actual deletion will be // delayed until the entire data file is marked as deleted. Before that // these items will be hidden to prevent being visited again. The value // should never be lower than itemOffset. itemHidden atomic.Uint64 noCompression bool // if true, disables snappy compression. Note: does not work retroactively readonly bool maxFileSize uint32 // Max file size for data-files name string path string head *os.File // File descriptor for the data head of the table index *os.File // File descriptor for the indexEntry file of the table meta *os.File // File descriptor for metadata of the table files map[uint32]*os.File // open files headId uint32 // number of the currently active head file tailId uint32 // number of the earliest file headBytes int64 // Number of bytes written to the head file readMeter metrics.Meter // Meter for measuring the effective amount of data read writeMeter metrics.Meter // Meter for measuring the effective amount of data written sizeGauge metrics.Gauge // Gauge for tracking the combined size of all freezer tables logger log.Logger // Logger with database path and table name embedded lock sync.RWMutex // Mutex protecting the data file descriptors } // newFreezerTable opens the given path as a freezer table. func newFreezerTable(path, name string, disableSnappy, readonly bool) (*freezerTable, error) { return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, freezerTableSize, disableSnappy, readonly) } // newTable opens a freezer table, creating the data and index files if they are // non-existent. Both files are truncated to the shortest common length to ensure // they don't go out of sync. func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression, readonly bool) (*freezerTable, error) { // Ensure the containing directory exists and open the indexEntry file if err := os.MkdirAll(path, 0755); err != nil { return nil, err } var idxName string if noCompression { idxName = fmt.Sprintf("%s.ridx", name) // raw index file } else { idxName = fmt.Sprintf("%s.cidx", name) // compressed index file } var ( err error index *os.File meta *os.File ) if readonly { // Will fail if table index file or meta file is not existent index, err = openFreezerFileForReadOnly(filepath.Join(path, idxName)) if err != nil { return nil, err } meta, err = openFreezerFileForReadOnly(filepath.Join(path, fmt.Sprintf("%s.meta", name))) if err != nil { return nil, err } } else { index, err = openFreezerFileForAppend(filepath.Join(path, idxName)) if err != nil { return nil, err } meta, err = openFreezerFileForAppend(filepath.Join(path, fmt.Sprintf("%s.meta", name))) if err != nil { return nil, err } } // Create the table and repair any past inconsistency tab := &freezerTable{ index: index, meta: meta, files: make(map[uint32]*os.File), readMeter: readMeter, writeMeter: writeMeter, sizeGauge: sizeGauge, name: name, path: path, logger: log.New("database", path, "table", name), noCompression: noCompression, readonly: readonly, maxFileSize: maxFilesize, } if err := tab.repair(); err != nil { tab.Close() return nil, err } // Initialize the starting size counter size, err := tab.sizeNolock() if err != nil { tab.Close() return nil, err } tab.sizeGauge.Inc(int64(size)) return tab, nil } // repair cross-checks the head and the index file and truncates them to // be in sync with each other after a potential crash / data loss. func (t *freezerTable) repair() error { // Create a temporary offset buffer to init files with and read indexEntry into buffer := make([]byte, indexEntrySize) // If we've just created the files, initialize the index with the 0 indexEntry stat, err := t.index.Stat() if err != nil { return err } if stat.Size() == 0 { if _, err := t.index.Write(buffer); err != nil { return err } } // Ensure the index is a multiple of indexEntrySize bytes if overflow := stat.Size() % indexEntrySize; overflow != 0 { if t.readonly { return fmt.Errorf("index file(path: %s, name: %s) size is not a multiple of %d", t.path, t.name, indexEntrySize) } if err := truncateFreezerFile(t.index, stat.Size()-overflow); err != nil { return err } // New file can't trigger this path } // Validate the index file as it might contain some garbage data after the // power failures. if err := t.repairIndex(); err != nil { return err } // Retrieve the file sizes and prepare for truncation. Note the file size // might be changed after index validation. if stat, err = t.index.Stat(); err != nil { return err } offsetsSize := stat.Size() // Open the head file var ( firstIndex indexEntry lastIndex indexEntry contentSize int64 contentExp int64 verbose bool ) // Read index zero, determine what file is the earliest // and what item offset to use t.index.ReadAt(buffer, 0) firstIndex.unmarshalBinary(buffer) // Assign the tail fields with the first stored index. // The total removed items is represented with an uint32, // which is not enough in theory but enough in practice. // TODO: use uint64 to represent total removed items. t.tailId = firstIndex.filenum t.itemOffset.Store(uint64(firstIndex.offset)) // Load metadata from the file meta, err := loadMetadata(t.meta, t.itemOffset.Load()) if err != nil { return err } t.itemHidden.Store(meta.VirtualTail) // Read the last index, use the default value in case the freezer is empty if offsetsSize == indexEntrySize { lastIndex = indexEntry{filenum: t.tailId, offset: 0} } else { t.index.ReadAt(buffer, offsetsSize-indexEntrySize) lastIndex.unmarshalBinary(buffer) } // Print an error log if the index is corrupted due to an incorrect // last index item. While it is theoretically possible to have a zero offset // by storing all zero-size items, it is highly unlikely to occur in practice. if lastIndex.offset == 0 && offsetsSize/indexEntrySize > 1 { log.Error("Corrupted index file detected", "lastOffset", lastIndex.offset, "indexes", offsetsSize/indexEntrySize) } if t.readonly { t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForReadOnly) } else { t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend) } if err != nil { return err } if stat, err = t.head.Stat(); err != nil { return err } contentSize = stat.Size() // Keep truncating both files until they come in sync contentExp = int64(lastIndex.offset) for contentExp != contentSize { if t.readonly { return fmt.Errorf("freezer table(path: %s, name: %s, num: %d) is corrupted", t.path, t.name, lastIndex.filenum) } verbose = true // Truncate the head file to the last offset pointer if contentExp < contentSize { t.logger.Warn("Truncating dangling head", "indexed", contentExp, "stored", contentSize) if err := truncateFreezerFile(t.head, contentExp); err != nil { return err } contentSize = contentExp } // Truncate the index to point within the head file if contentExp > contentSize { t.logger.Warn("Truncating dangling indexes", "indexes", offsetsSize/indexEntrySize, "indexed", contentExp, "stored", contentSize) if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil { return err } offsetsSize -= indexEntrySize // Read the new head index, use the default value in case // the freezer is already empty. var newLastIndex indexEntry if offsetsSize == indexEntrySize { newLastIndex = indexEntry{filenum: t.tailId, offset: 0} } else { t.index.ReadAt(buffer, offsetsSize-indexEntrySize) newLastIndex.unmarshalBinary(buffer) } // We might have slipped back into an earlier head-file here if newLastIndex.filenum != lastIndex.filenum { // Release earlier opened file t.releaseFile(lastIndex.filenum) if t.head, err = t.openFile(newLastIndex.filenum, openFreezerFileForAppend); err != nil { return err } if stat, err = t.head.Stat(); err != nil { // TODO, anything more we can do here? // A data file has gone missing... return err } contentSize = stat.Size() } lastIndex = newLastIndex contentExp = int64(lastIndex.offset) } } // Sync() fails for read-only files on windows. if !t.readonly { // Ensure all reparation changes have been written to disk if err := t.index.Sync(); err != nil { return err } if err := t.head.Sync(); err != nil { return err } if err := t.meta.Sync(); err != nil { return err } } // Update the item and byte counters and return t.items.Store(t.itemOffset.Load() + uint64(offsetsSize/indexEntrySize-1)) // last indexEntry points to the end of the data file t.headBytes = contentSize t.headId = lastIndex.filenum // Delete the leftover files because of head deletion t.releaseFilesAfter(t.headId, true) // Delete the leftover files because of tail deletion t.releaseFilesBefore(t.tailId, true) // Close opened files and preopen all files if err := t.preopen(); err != nil { return err } if verbose { t.logger.Info("Chain freezer table opened", "items", t.items.Load(), "deleted", t.itemOffset.Load(), "hidden", t.itemHidden.Load(), "tailId", t.tailId, "headId", t.headId, "size", t.headBytes) } else { t.logger.Debug("Chain freezer table opened", "items", t.items.Load(), "size", common.StorageSize(t.headBytes)) } return nil } // repairIndex validates the integrity of the index file. According to the design, // the initial entry in the file denotes the earliest data file along with the // count of deleted items. Following this, all subsequent entries in the file must // be in order. This function identifies any corrupted entries and truncates items // occurring after the corruption point. // // corruption can occur because of the power failure. In the Linux kernel, the // file metadata update and data update are not necessarily performed at the // same time. Typically, the metadata will be flushed/journalled ahead of the file // data. Therefore, we make the pessimistic assumption that the file is first // extended with invalid "garbage" data (normally zero bytes) and that afterwards // the correct data replaces the garbage. As all the items in index file are // supposed to be in-order, the leftover garbage must be truncated before the // index data is utilized. // // It's important to note an exception that's unfortunately undetectable: when // all index entries in the file are zero. Distinguishing whether they represent // leftover garbage or if all items in the table have zero size is impossible. // In such instances, the file will remain unchanged to prevent potential data // loss or misinterpretation. func (t *freezerTable) repairIndex() error { // Retrieve the file sizes and prepare for validation stat, err := t.index.Stat() if err != nil { return err } size := stat.Size() // Move the read cursor to the beginning of the file _, err = t.index.Seek(0, io.SeekStart) if err != nil { return err } fr := bufio.NewReader(t.index) var ( start = time.Now() buff = make([]byte, indexEntrySize) prev indexEntry head indexEntry read = func() (indexEntry, error) { n, err := io.ReadFull(fr, buff) if err != nil { return indexEntry{}, err } if n != indexEntrySize { return indexEntry{}, fmt.Errorf("failed to read from index, n: %d", n) } var entry indexEntry entry.unmarshalBinary(buff) return entry, nil } truncate = func(offset int64) error { if t.readonly { return fmt.Errorf("index file is corrupted at %d, size: %d", offset, size) } if err := truncateFreezerFile(t.index, offset); err != nil { return err } log.Warn("Truncated index file", "offset", offset, "truncated", size-offset) return nil } ) for offset := int64(0); offset < size; offset += indexEntrySize { entry, err := read() if err != nil { return err } if offset == 0 { head = entry continue } // Ensure that the first non-head index refers to the earliest file, // or the next file if the earliest file has no space to place the // first item. if offset == indexEntrySize { if entry.filenum != head.filenum && entry.filenum != head.filenum+1 { log.Error("Corrupted index item detected", "earliest", head.filenum, "filenumber", entry.filenum) return truncate(offset) } prev = entry continue } // ensure two consecutive index items are in order if err := t.checkIndexItems(prev, entry); err != nil { log.Error("Corrupted index item detected", "err", err) return truncate(offset) } prev = entry } // Move the read cursor to the end of the file. While theoretically, the // cursor should reach the end by reading all the items in the file, perform // the seek operation anyway as a precaution. _, err = t.index.Seek(0, io.SeekEnd) if err != nil { return err } log.Debug("Verified index file", "items", size/indexEntrySize, "elapsed", common.PrettyDuration(time.Since(start))) return nil } // checkIndexItems validates the correctness of two consecutive index items based // on the following rules: // // - The file number of two consecutive index items must either be the same or // increase monotonically. If the file number decreases or skips in a // non-sequential manner, the index item is considered invalid. // // - For index items with the same file number, the data offset must be in // non-decreasing order. Note: Two index items with the same file number // and the same data offset are permitted if the entry size is zero. // // - The first index item in a new data file must not have a zero data offset. func (t *freezerTable) checkIndexItems(a, b indexEntry) error { if b.filenum != a.filenum && b.filenum != a.filenum+1 { return fmt.Errorf("index items with inconsistent file number, prev: %d, next: %d", a.filenum, b.filenum) } if b.filenum == a.filenum && b.offset < a.offset { return fmt.Errorf("index items with unordered offset, prev: %d, next: %d", a.offset, b.offset) } if b.filenum == a.filenum+1 && b.offset == 0 { return fmt.Errorf("index items with zero offset, file number: %d", b.filenum) } return nil } // preopen opens all files that the freezer will need. This method should be called from an init-context, // since it assumes that it doesn't have to bother with locking // The rationale for doing preopen is to not have to do it from within Retrieve, thus not needing to ever // obtain a write-lock within Retrieve. func (t *freezerTable) preopen() (err error) { // The repair might have already opened (some) files t.releaseFilesAfter(0, false) // Open all except head in RDONLY for i := t.tailId; i < t.headId; i++ { if _, err = t.openFile(i, openFreezerFileForReadOnly); err != nil { return err } } if t.readonly { t.head, err = t.openFile(t.headId, openFreezerFileForReadOnly) } else { // Open head in read/write t.head, err = t.openFile(t.headId, openFreezerFileForAppend) } return err } // truncateHead discards any recent data above the provided threshold number. func (t *freezerTable) truncateHead(items uint64) error { t.lock.Lock() defer t.lock.Unlock() // Ensure the given truncate target falls in the correct range existing := t.items.Load() if existing <= items { return nil } if items < t.itemHidden.Load() { return errors.New("truncation below tail") } // We need to truncate, save the old size for metrics tracking oldSize, err := t.sizeNolock() if err != nil { return err } // Something's out of sync, truncate the table's offset index log := t.logger.Debug if existing > items+1 { log = t.logger.Warn // Only loud warn if we delete multiple items } log("Truncating freezer table", "items", existing, "limit", items) // Truncate the index file first, the tail position is also considered // when calculating the new freezer table length. length := items - t.itemOffset.Load() if err := truncateFreezerFile(t.index, int64(length+1)*indexEntrySize); err != nil { return err } if err := t.index.Sync(); err != nil { return err } // Calculate the new expected size of the data file and truncate it var expected indexEntry if length == 0 { expected = indexEntry{filenum: t.tailId, offset: 0} } else { buffer := make([]byte, indexEntrySize) if _, err := t.index.ReadAt(buffer, int64(length*indexEntrySize)); err != nil { return err } expected.unmarshalBinary(buffer) } // We might need to truncate back to older files if expected.filenum != t.headId { // If already open for reading, force-reopen for writing t.releaseFile(expected.filenum) newHead, err := t.openFile(expected.filenum, openFreezerFileForAppend) if err != nil { return err } // Release any files _after the current head -- both the previous head // and any files which may have been opened for reading t.releaseFilesAfter(expected.filenum, true) // Set back the historic head t.head = newHead t.headId = expected.filenum } if err := truncateFreezerFile(t.head, int64(expected.offset)); err != nil { return err } if err := t.head.Sync(); err != nil { return err } // All data files truncated, set internal counters and return t.headBytes = int64(expected.offset) t.items.Store(items) // Retrieve the new size and update the total size counter newSize, err := t.sizeNolock() if err != nil { return err } t.sizeGauge.Dec(int64(oldSize - newSize)) return nil } // sizeHidden returns the total data size of hidden items in the freezer table. // This function assumes the lock is already held. func (t *freezerTable) sizeHidden() (uint64, error) { hidden, offset := t.itemHidden.Load(), t.itemOffset.Load() if hidden <= offset { return 0, nil } indices, err := t.getIndices(hidden-1, 1) if err != nil { return 0, err } return uint64(indices[1].offset), nil } // truncateTail discards any recent data before the provided threshold number. func (t *freezerTable) truncateTail(items uint64) error { t.lock.Lock() defer t.lock.Unlock() // Ensure the given truncate target falls in the correct range if t.itemHidden.Load() >= items { return nil } if t.items.Load() < items { return errors.New("truncation above head") } // Load the new tail index by the given new tail position var ( newTailId uint32 buffer = make([]byte, indexEntrySize) ) if t.items.Load() == items { newTailId = t.headId } else { offset := items - t.itemOffset.Load() if _, err := t.index.ReadAt(buffer, int64((offset+1)*indexEntrySize)); err != nil { return err } var newTail indexEntry newTail.unmarshalBinary(buffer) newTailId = newTail.filenum } // Save the old size for metrics tracking. This needs to be done // before any updates to either itemHidden or itemOffset. oldSize, err := t.sizeNolock() if err != nil { return err } // Update the virtual tail marker and hidden these entries in table. t.itemHidden.Store(items) if err := writeMetadata(t.meta, newMetadata(items)); err != nil { return err } // Hidden items still fall in the current tail file, no data file // can be dropped. if t.tailId == newTailId { return nil } // Hidden items fall in the incorrect range, returns the error. if t.tailId > newTailId { return fmt.Errorf("invalid index, tail-file %d, item-file %d", t.tailId, newTailId) } // Count how many items can be deleted from the file. var ( newDeleted = items deleted = t.itemOffset.Load() ) // Hidden items exceed the current tail file, drop the relevant data files. for current := items - 1; current >= deleted; current -= 1 { if _, err := t.index.ReadAt(buffer, int64((current-deleted+1)*indexEntrySize)); err != nil { return err } var pre indexEntry pre.unmarshalBinary(buffer) if pre.filenum != newTailId { break } newDeleted = current } // Commit the changes of metadata file first before manipulating // the indexes file. if err := t.meta.Sync(); err != nil { return err } // Close the index file before shorten it. if err := t.index.Close(); err != nil { return err } // Truncate the deleted index entries from the index file. err = copyFrom(t.index.Name(), t.index.Name(), indexEntrySize*(newDeleted-deleted+1), func(f *os.File) error { tailIndex := indexEntry{ filenum: newTailId, offset: uint32(newDeleted), } _, err := f.Write(tailIndex.append(nil)) return err }) if err != nil { return err } // Reopen the modified index file to load the changes t.index, err = openFreezerFileForAppend(t.index.Name()) if err != nil { return err } // Sync the file to ensure changes are flushed to disk if err := t.index.Sync(); err != nil { return err } // Release any files before the current tail t.tailId = newTailId t.itemOffset.Store(newDeleted) t.releaseFilesBefore(t.tailId, true) // Retrieve the new size and update the total size counter newSize, err := t.sizeNolock() if err != nil { return err } t.sizeGauge.Dec(int64(oldSize - newSize)) return nil } // Close closes all opened files. func (t *freezerTable) Close() error { t.lock.Lock() defer t.lock.Unlock() var errs []error doClose := func(f *os.File, sync bool, close bool) { if sync && !t.readonly { if err := f.Sync(); err != nil { errs = append(errs, err) } } if close { if err := f.Close(); err != nil { errs = append(errs, err) } } } // Trying to fsync a file opened in rdonly causes "Access denied" // error on Windows. doClose(t.index, true, true) doClose(t.meta, true, true) // The preopened non-head data-files are all opened in readonly. // The head is opened in rw-mode, so we sync it here - but since it's also // part of t.files, it will be closed in the loop below. doClose(t.head, true, false) // sync but do not close for _, f := range t.files { doClose(f, false, true) // close but do not sync } t.index = nil t.meta = nil t.head = nil if errs != nil { return fmt.Errorf("%v", errs) } return nil } // openFile assumes that the write-lock is held by the caller func (t *freezerTable) openFile(num uint32, opener func(string) (*os.File, error)) (f *os.File, err error) { var exist bool if f, exist = t.files[num]; !exist { var name string if t.noCompression { name = fmt.Sprintf("%s.%04d.rdat", t.name, num) } else { name = fmt.Sprintf("%s.%04d.cdat", t.name, num) } f, err = opener(filepath.Join(t.path, name)) if err != nil { return nil, err } t.files[num] = f } return f, err } // releaseFile closes a file, and removes it from the open file cache. // Assumes that the caller holds the write lock func (t *freezerTable) releaseFile(num uint32) { if f, exist := t.files[num]; exist { delete(t.files, num) f.Close() } } // releaseFilesAfter closes all open files with a higher number, and optionally also deletes the files func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) { for fnum, f := range t.files { if fnum > num { delete(t.files, fnum) f.Close() if remove { os.Remove(f.Name()) } } } } // releaseFilesBefore closes all open files with a lower number, and optionally also deletes the files func (t *freezerTable) releaseFilesBefore(num uint32, remove bool) { for fnum, f := range t.files { if fnum < num { delete(t.files, fnum) f.Close() if remove { os.Remove(f.Name()) } } } } // getIndices returns the index entries for the given from-item, covering 'count' items. // N.B: The actual number of returned indices for N items will always be N+1 (unless an // error is returned). // OBS: This method assumes that the caller has already verified (and/or trimmed) the range // so that the items are within bounds. If this method is used to read out of bounds, // it will return error. func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) { // Apply the table-offset from = from - t.itemOffset.Load() // For reading N items, we need N+1 indices. buffer := make([]byte, (count+1)*indexEntrySize) if _, err := t.index.ReadAt(buffer, int64(from*indexEntrySize)); err != nil { return nil, err } var ( indices []*indexEntry offset int ) for i := from; i <= from+count; i++ { index := new(indexEntry) index.unmarshalBinary(buffer[offset:]) offset += indexEntrySize indices = append(indices, index) } if from == 0 { // Special case if we're reading the first item in the freezer. We assume that // the first item always start from zero(regarding the deletion, we // only support deletion by files, so that the assumption is held). // This means we can use the first item metadata to carry information about // the 'global' offset, for the deletion-case indices[0].offset = 0 indices[0].filenum = indices[1].filenum } return indices, nil } // Retrieve looks up the data offset of an item with the given number and retrieves // the raw binary blob from the data file. func (t *freezerTable) Retrieve(item uint64) ([]byte, error) { items, err := t.RetrieveItems(item, 1, 0) if err != nil { return nil, err } return items[0], nil } // RetrieveItems returns multiple items in sequence, starting from the index 'start'. // It will return at most 'max' items, but will abort earlier to respect the // 'maxBytes' argument. However, if the 'maxBytes' is smaller than the size of one // item, it _will_ return one element and possibly overflow the maxBytes. func (t *freezerTable) RetrieveItems(start, count, maxBytes uint64) ([][]byte, error) { // First we read the 'raw' data, which might be compressed. diskData, sizes, err := t.retrieveItems(start, count, maxBytes) if err != nil { return nil, err } var ( output = make([][]byte, 0, count) offset int // offset for reading outputSize int // size of uncompressed data ) // Now slice up the data and decompress. for i, diskSize := range sizes { item := diskData[offset : offset+diskSize] offset += diskSize decompressedSize := diskSize if !t.noCompression { decompressedSize, _ = snappy.DecodedLen(item) } if i > 0 && maxBytes != 0 && uint64(outputSize+decompressedSize) > maxBytes { break } if !t.noCompression { data, err := snappy.Decode(nil, item) if err != nil { return nil, err } output = append(output, data) } else { output = append(output, item) } outputSize += decompressedSize } return output, nil } // retrieveItems reads up to 'count' items from the table. It reads at least // one item, but otherwise avoids reading more than maxBytes bytes. Freezer // will ignore the size limitation and continuously allocate memory to store // data if maxBytes is 0. It returns the (potentially compressed) data, and // the sizes. func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []int, error) { t.lock.RLock() defer t.lock.RUnlock() // Ensure the table and the item are accessible if t.index == nil || t.head == nil || t.meta == nil { return nil, nil, errClosed } var ( items = t.items.Load() // the total items(head + 1) hidden = t.itemHidden.Load() // the number of hidden items ) // Ensure the start is written, not deleted from the tail, and that the // caller actually wants something if items <= start || hidden > start || count == 0 { return nil, nil, errOutOfBounds } if start+count > items { count = items - start } var output []byte // Buffer to read data into if maxBytes != 0 { output = make([]byte, 0, maxBytes) } else { output = make([]byte, 0, 1024) // initial buffer cap } // readData is a helper method to read a single data item from disk. readData := func(fileId, start uint32, length int) error { output = grow(output, length) dataFile, exist := t.files[fileId] if !exist { return fmt.Errorf("missing data file %d", fileId) } if _, err := dataFile.ReadAt(output[len(output)-length:], int64(start)); err != nil { return fmt.Errorf("%w, fileid: %d, start: %d, length: %d", err, fileId, start, length) } return nil } // Read all the indexes in one go indices, err := t.getIndices(start, count) if err != nil { return nil, nil, err } var ( sizes []int // The sizes for each element totalSize = 0 // The total size of all data read so far readStart = indices[0].offset // Where, in the file, to start reading unreadSize = 0 // The size of the as-yet-unread data ) for i, firstIndex := range indices[:len(indices)-1] { secondIndex := indices[i+1] // Determine the size of the item. offset1, offset2, _ := firstIndex.bounds(secondIndex) size := int(offset2 - offset1) // Crossing a file boundary? if secondIndex.filenum != firstIndex.filenum { // If we have unread data in the first file, we need to do that read now. if unreadSize > 0 { if err := readData(firstIndex.filenum, readStart, unreadSize); err != nil { return nil, nil, err } unreadSize = 0 } readStart = 0 } if i > 0 && uint64(totalSize+size) > maxBytes && maxBytes != 0 { // About to break out due to byte limit being exceeded. We don't // read this last item, but we need to do the deferred reads now. if unreadSize > 0 { if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil { return nil, nil, err } } break } // Defer the read for later unreadSize += size totalSize += size sizes = append(sizes, size) if i == len(indices)-2 || (uint64(totalSize) > maxBytes && maxBytes != 0) { // Last item, need to do the read now if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil { return nil, nil, err } break } } // Update metrics. t.readMeter.Mark(int64(totalSize)) return output, sizes, nil } // has returns an indicator whether the specified number data is still accessible // in the freezer table. func (t *freezerTable) has(number uint64) bool { return t.items.Load() > number && t.itemHidden.Load() <= number } // size returns the total data size in the freezer table. func (t *freezerTable) size() (uint64, error) { t.lock.RLock() defer t.lock.RUnlock() return t.sizeNolock() } // sizeNolock returns the total data size in the freezer table. This function // assumes the lock is already held. func (t *freezerTable) sizeNolock() (uint64, error) { stat, err := t.index.Stat() if err != nil { return 0, err } hidden, err := t.sizeHidden() if err != nil { return 0, err } total := uint64(t.maxFileSize)*uint64(t.headId-t.tailId) + uint64(t.headBytes) + uint64(stat.Size()) - hidden return total, nil } // advanceHead should be called when the current head file would outgrow the file limits, // and a new file must be opened. The caller of this method must hold the write-lock // before calling this method. func (t *freezerTable) advanceHead() error { t.lock.Lock() defer t.lock.Unlock() // We open the next file in truncated mode -- if this file already // exists, we need to start over from scratch on it. nextID := t.headId + 1 newHead, err := t.openFile(nextID, openFreezerFileTruncated) if err != nil { return err } // Commit the contents of the old file to stable storage and // tear it down. It will be re-opened in read-only mode. if err := t.head.Sync(); err != nil { return err } t.releaseFile(t.headId) t.openFile(t.headId, openFreezerFileForReadOnly) // Swap out the current head. t.head = newHead t.headBytes = 0 t.headId = nextID return nil } // Sync pushes any pending data from memory out to disk. This is an expensive // operation, so use it with care. func (t *freezerTable) Sync() error { t.lock.Lock() defer t.lock.Unlock() if t.index == nil || t.head == nil || t.meta == nil { return errClosed } var err error trackError := func(e error) { if e != nil && err == nil { err = e } } trackError(t.index.Sync()) trackError(t.meta.Sync()) trackError(t.head.Sync()) return err } func (t *freezerTable) dumpIndexStdout(start, stop int64) { t.dumpIndex(os.Stdout, start, stop) } func (t *freezerTable) dumpIndexString(start, stop int64) string { var out bytes.Buffer out.WriteString("\n") t.dumpIndex(&out, start, stop) return out.String() } func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) { meta, err := readMetadata(t.meta) if err != nil { fmt.Fprintf(w, "Failed to decode freezer table %v\n", err) return } fmt.Fprintf(w, "Version %d count %d, deleted %d, hidden %d\n", meta.Version, t.items.Load(), t.itemOffset.Load(), t.itemHidden.Load()) buf := make([]byte, indexEntrySize) fmt.Fprintf(w, "| number | fileno | offset |\n") fmt.Fprintf(w, "|--------|--------|--------|\n") for i := uint64(start); ; i++ { if _, err := t.index.ReadAt(buf, int64((i+1)*indexEntrySize)); err != nil { break } var entry indexEntry entry.unmarshalBinary(buf) fmt.Fprintf(w, "| %03d | %03d | %03d | \n", i, entry.filenum, entry.offset) if stop > 0 && i >= uint64(stop) { break } } fmt.Fprintf(w, "|--------------------------|\n") }