@ -465,35 +465,59 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) {
// Note, this method will *not* flush any data to disk so be sure to explicitly
// fsync before irreversibly deleting data from the database.
func ( t * freezerTable ) Append ( item uint64 , blob [ ] byte ) error {
// Encode the blob before the lock portion
if ! t . noCompression {
blob = snappy . Encode ( nil , blob )
}
// Read lock prevents competition with truncate
t . lock . RLock ( )
retry , err := t . append ( item , blob , false )
if err != nil {
return err
}
if retry {
// Read lock was insufficient, retry with a writelock
_ , err = t . append ( item , blob , true )
}
return err
}
// append injects a binary blob at the end of the freezer table.
// Normally, inserts do not require holding the write-lock, so it should be invoked with 'wlock' set to
// false.
// However, if the data will grown the current file out of bounds, then this
// method will return 'true, nil', indicating that the caller should retry, this time
// with 'wlock' set to true.
func ( t * freezerTable ) append ( item uint64 , encodedBlob [ ] byte , wlock bool ) ( bool , error ) {
if wlock {
t . lock . Lock ( )
defer t . lock . Unlock ( )
} else {
t . lock . RLock ( )
defer t . lock . RUnlock ( )
}
// Ensure the table is still accessible
if t . index == nil || t . head == nil {
t . lock . RUnlock ( )
return errClosed
return false , errClosed
}
// Ensure only the next item can be written, nothing else
if atomic . LoadUint64 ( & t . items ) != item {
t . lock . RUnlock ( )
return fmt . Errorf ( "appending unexpected item: want %d, have %d" , t . items , item )
}
// Encode the blob and write it into the data file
if ! t . noCompression {
blob = snappy . Encode ( nil , blob )
return false , fmt . Errorf ( "appending unexpected item: want %d, have %d" , t . items , item )
}
bLen := uint32 ( len ( blob ) )
bLen := uint32 ( len ( encodedBlob ) )
if t . headBytes + bLen < bLen ||
t . headBytes + bLen > t . maxFileSize {
// we need a new file, writing would overflow
t . lock . RUnlock ( )
t . lock . Lock ( )
// Writing would overflow, so we need to open a new data file.
// If we don't already hold the writelock, abort and let the caller
// invoke this method a second time.
if ! wlock {
return true , nil
}
nextID := atomic . LoadUint32 ( & t . headId ) + 1
// We open the next file in truncated mode -- if this file already
// exists, we need to start over from scratch on it
newHead , err := t . openFile ( nextID , openFreezerFileTruncated )
if err != nil {
t . lock . Unlock ( )
return err
return false , err
}
// Close old file, and reopen in RDONLY mode
t . releaseFile ( t . headId )
@ -503,13 +527,9 @@ func (t *freezerTable) Append(item uint64, blob []byte) error {
t . head = newHead
atomic . StoreUint32 ( & t . headBytes , 0 )
atomic . StoreUint32 ( & t . headId , nextID )
t . lock . Unlock ( )
t . lock . RLock ( )
}
defer t . lock . RUnlock ( )
if _ , err := t . head . Write ( blob ) ; err != nil {
return err
if _ , err := t . head . Write ( encodedBlob ) ; err != nil {
return false , err
}
newOffset := atomic . AddUint32 ( & t . headBytes , bLen )
idx := indexEntry {
@ -523,7 +543,7 @@ func (t *freezerTable) Append(item uint64, blob []byte) error {
t . sizeGauge . Inc ( int64 ( bLen + indexEntrySize ) )
atomic . AddUint64 ( & t . items , 1 )
return nil
return false , nil
}
// getBounds returns the indexes for the item
@ -562,44 +582,48 @@ func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) {
// Retrieve looks up the data offset of an item with the given number and retrieves
// the raw binary blob from the data file.
func ( t * freezerTable ) Retrieve ( item uint64 ) ( [ ] byte , error ) {
blob , err := t . retrieve ( item )
if err != nil {
return nil , err
}
if t . noCompression {
return blob , nil
}
return snappy . Decode ( nil , blob )
}
// retrieve looks up the data offset of an item with the given number and retrieves
// the raw binary blob from the data file. OBS! This method does not decode
// compressed data.
func ( t * freezerTable ) retrieve ( item uint64 ) ( [ ] byte , error ) {
t . lock . RLock ( )
defer t . lock . RUnlock ( )
// Ensure the table and the item is accessible
if t . index == nil || t . head == nil {
t . lock . RUnlock ( )
return nil , errClosed
}
if atomic . LoadUint64 ( & t . items ) <= item {
t . lock . RUnlock ( )
return nil , errOutOfBounds
}
// Ensure the item was not deleted from the tail either
if uint64 ( t . itemOffset ) > item {
t . lock . RUnlock ( )
return nil , errOutOfBounds
}
startOffset , endOffset , filenum , err := t . getBounds ( item - uint64 ( t . itemOffset ) )
if err != nil {
t . lock . RUnlock ( )
return nil , err
}
dataFile , exist := t . files [ filenum ]
if ! exist {
t . lock . RUnlock ( )
return nil , fmt . Errorf ( "missing data file %d" , filenum )
}
// Retrieve the data itself, decompress and return
blob := make ( [ ] byte , endOffset - startOffset )
if _ , err := dataFile . ReadAt ( blob , int64 ( startOffset ) ) ; err != nil {
t . lock . RUnlock ( )
return nil , err
}
t . lock . RUnlock ( )
t . readMeter . Mark ( int64 ( len ( blob ) + 2 * indexEntrySize ) )
if t . noCompression {
return blob , nil
}
return snappy . Decode ( nil , blob )
return blob , nil
}
// has returns an indicator whether the specified number data