@ -70,6 +70,19 @@ func (i *indexEntry) marshallBinary() []byte {
return b
}
// bounds returns the start- and end- offsets, and the file number of where to
// read there data item marked by the two index entries. The two entries are
// assumed to be sequential.
func ( start * indexEntry ) bounds ( end * indexEntry ) ( startOffset , endOffset , fileId uint32 ) {
if start . filenum != end . filenum {
// If a piece of data 'crosses' a data-file,
// it's actually in one piece on the second data-file.
// We return a zero-indexEntry for the second file as start
return 0 , end . offset , end . filenum
}
return start . offset , end . offset , end . filenum
}
// freezerTable represents a single chained data table within the freezer (e.g. blocks).
// It consists of a data file (snappy encoded arbitrary data blobs) and an indexEntry
// file (uncompressed 64 bit indices into the data file).
@ -546,84 +559,183 @@ func (t *freezerTable) append(item uint64, encodedBlob []byte, wlock bool) (bool
return false , nil
}
// getBounds returns the indexes for the item
// returns start, end, filenumber and error
func ( t * freezerTable ) getBounds ( item uint64 ) ( uint32 , uint32 , uint32 , error ) {
buffer := make ( [ ] byte , indexEntrySize )
var startIdx , endIdx indexEntry
// Read second index
if _ , err := t . index . ReadAt ( buffer , int64 ( ( item + 1 ) * indexEntrySize ) ) ; err != nil {
return 0 , 0 , 0 , err
}
endIdx . unmarshalBinary ( buffer )
// Read first index (unless it's the very first item)
if item != 0 {
if _ , err := t . index . ReadAt ( buffer , int64 ( item * indexEntrySize ) ) ; err != nil {
return 0 , 0 , 0 , err
}
startIdx . unmarshalBinary ( buffer )
} else {
// getIndices returns the index entries for the given from-item, covering 'count' items.
// N.B: The actual number of returned indices for N items will always be N+1 (unless an
// error is returned).
// OBS: This method assumes that the caller has already verified (and/or trimmed) the range
// so that the items are within bounds. If this method is used to read out of bounds,
// it will return error.
func ( t * freezerTable ) getIndices ( from , count uint64 ) ( [ ] * indexEntry , error ) {
// Apply the table-offset
from = from - uint64 ( t . itemOffset )
// For reading N items, we need N+1 indices.
buffer := make ( [ ] byte , ( count + 1 ) * indexEntrySize )
if _ , err := t . index . ReadAt ( buffer , int64 ( from * indexEntrySize ) ) ; err != nil {
return nil , err
}
var (
indices [ ] * indexEntry
offset int
)
for i := from ; i <= from + count ; i ++ {
index := new ( indexEntry )
index . unmarshalBinary ( buffer [ offset : ] )
offset += indexEntrySize
indices = append ( indices , index )
}
if from == 0 {
// Special case if we're reading the first item in the freezer. We assume that
// the first item always start from zero(regarding the deletion, we
// only support deletion by files, so that the assumption is held).
// This means we can use the first item metadata to carry information about
// the 'global' offset, for the deletion-case
return 0 , endIdx . offset , endIdx . filenum , nil
indices [ 0 ] . offset = 0
indices [ 0 ] . filenum = indices [ 1 ] . filenum
}
if startIdx . filenum != endIdx . filenum {
// If a piece of data 'crosses' a data-file,
// it's actually in one piece on the second data-file.
// We return a zero-indexEntry for the second file as start
return 0 , endIdx . offset , endIdx . filenum , nil
}
return startIdx . offset , endIdx . offset , endIdx . filenum , nil
return indices , nil
}
// Retrieve looks up the data offset of an item with the given number and retrieves
// the raw binary blob from the data file.
func ( t * freezerTable ) Retrieve ( item uint64 ) ( [ ] byte , error ) {
blob , err := t . retrieve ( item )
items , err := t . RetrieveItems ( item , 1 , 0 )
if err != nil {
return nil , err
}
if t . noCompression {
return blob , nil
return items [ 0 ] , nil
}
// RetrieveItems returns multiple items in sequence, starting from the index 'start'.
// It will return at most 'max' items, but will abort earlier to respect the
// 'maxBytes' argument. However, if the 'maxBytes' is smaller than the size of one
// item, it _will_ return one element and possibly overflow the maxBytes.
func ( t * freezerTable ) RetrieveItems ( start , count , maxBytes uint64 ) ( [ ] [ ] byte , error ) {
// First we read the 'raw' data, which might be compressed.
diskData , sizes , err := t . retrieveItems ( start , count , maxBytes )
if err != nil {
return nil , err
}
return snappy . Decode ( nil , blob )
var (
output = make ( [ ] [ ] byte , 0 , count )
offset int // offset for reading
outputSize int // size of uncompressed data
)
// Now slice up the data and decompress.
for i , diskSize := range sizes {
item := diskData [ offset : offset + diskSize ]
offset += diskSize
decompressedSize := diskSize
if ! t . noCompression {
decompressedSize , _ = snappy . DecodedLen ( item )
}
if i > 0 && uint64 ( outputSize + decompressedSize ) > maxBytes {
break
}
if ! t . noCompression {
data , err := snappy . Decode ( nil , item )
if err != nil {
return nil , err
}
output = append ( output , data )
} else {
output = append ( output , item )
}
outputSize += decompressedSize
}
return output , nil
}
// retrieve looks up the data offset of an item with the given number and retrieves
// the raw binary blob from the data file. OBS! This method does not decode
// compressed data.
func ( t * freezerTable ) retrieve ( item uint64 ) ( [ ] byte , error ) {
// retrieveItems reads up to 'count' items from the table. It reads at least
// one item, but otherwise avoids reading more than maxBytes bytes.
// It returns the (potentially compressed) data, and the sizes .
func ( t * freezerTable ) retrieveItems ( start , count , maxBytes uint64 ) ( [ ] byte , [ ] int , error ) {
t . lock . RLock ( )
defer t . lock . RUnlock ( )
// Ensure the table and the item is accessible
if t . index == nil || t . head == nil {
return nil , errClosed
return nil , nil , errClosed
}
if atomic . LoadUint64 ( & t . items ) <= item {
return nil , errOutOfBounds
itemCount := atomic . LoadUint64 ( & t . items ) // max number
// Ensure the start is written, not deleted from the tail, and that the
// caller actually wants something
if itemCount <= start || uint64 ( t . itemOffset ) > start || count == 0 {
return nil , nil , errOutOfBounds
}
// Ensure the item was not deleted from the tail either
if uint64 ( t . itemOffset ) > item {
return nil , errOutOfBounds
if start + count > itemCount {
count = itemCount - start
}
startOffset , endOffset , filenum , err := t . getBounds ( item - uint64 ( t . itemOffset ) )
if err != nil {
return nil , err
var (
output = make ( [ ] byte , maxBytes ) // Buffer to read data into
outputSize int // Used size of that buffer
)
// readData is a helper method to read a single data item from disk.
readData := func ( fileId , start uint32 , length int ) error {
// In case a small limit is used, and the elements are large, may need to
// realloc the read-buffer when reading the first (and only) item.
if len ( output ) < length {
output = make ( [ ] byte , length )
}
dataFile , exist := t . files [ fileId ]
if ! exist {
return fmt . Errorf ( "missing data file %d" , fileId )
}
if _ , err := dataFile . ReadAt ( output [ outputSize : outputSize + length ] , int64 ( start ) ) ; err != nil {
return err
}
outputSize += length
return nil
}
dataFile , exist := t . files [ filenum ]
if ! exist {
return nil , fmt . Errorf ( "missing data file %d" , filenum )
// Read all the indexes in one go
indices , err := t . getIndices ( start , count )
if err != nil {
return nil , nil , err
}
// Retrieve the data itself, decompress and return
blob := make ( [ ] byte , endOffset - startOffset )
if _ , err := dataFile . ReadAt ( blob , int64 ( startOffset ) ) ; err != nil {
return nil , err
var (
sizes [ ] int // The sizes for each element
totalSize = 0 // The total size of all data read so far
readStart = indices [ 0 ] . offset // Where, in the file, to start reading
unreadSize = 0 // The size of the as-yet-unread data
)
for i , firstIndex := range indices [ : len ( indices ) - 1 ] {
secondIndex := indices [ i + 1 ]
// Determine the size of the item.
offset1 , offset2 , _ := firstIndex . bounds ( secondIndex )
size := int ( offset2 - offset1 )
// Crossing a file boundary?
if secondIndex . filenum != firstIndex . filenum {
// If we have unread data in the first file, we need to do that read now.
if unreadSize > 0 {
if err := readData ( firstIndex . filenum , readStart , unreadSize ) ; err != nil {
return nil , nil , err
}
unreadSize = 0
}
readStart = 0
}
if i > 0 && uint64 ( totalSize + size ) > maxBytes {
// About to break out due to byte limit being exceeded. We don't
// read this last item, but we need to do the deferred reads now.
if unreadSize > 0 {
if err := readData ( secondIndex . filenum , readStart , unreadSize ) ; err != nil {
return nil , nil , err
}
}
break
}
// Defer the read for later
unreadSize += size
totalSize += size
sizes = append ( sizes , size )
if i == len ( indices ) - 2 || uint64 ( totalSize ) > maxBytes {
// Last item, need to do the read now
if err := readData ( secondIndex . filenum , readStart , unreadSize ) ; err != nil {
return nil , nil , err
}
break
}
}
t . readMeter . Mark ( int64 ( len ( blob ) + 2 * indexEntrySize ) )
return blob , nil
return output [ : outputSize ] , sizes , nil
}
// has returns an indicator whether the specified number data