|
|
|
@ -20,6 +20,7 @@ import ( |
|
|
|
|
"encoding/binary" |
|
|
|
|
"errors" |
|
|
|
|
"fmt" |
|
|
|
|
"io" |
|
|
|
|
"os" |
|
|
|
|
"path/filepath" |
|
|
|
|
"sync" |
|
|
|
@ -106,6 +107,44 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr |
|
|
|
|
return newCustomTable(path, name, readMeter, writeMeter, 2*1000*1000*1000, disableSnappy) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// openFreezerFileForAppend opens a freezer table file and seeks to the end
|
|
|
|
|
func openFreezerFileForAppend(filename string) (*os.File, error) { |
|
|
|
|
// Open the file without the O_APPEND flag
|
|
|
|
|
// because it has differing behaviour during Truncate operations
|
|
|
|
|
// on different OS's
|
|
|
|
|
file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
// Seek to end for append
|
|
|
|
|
if _, err = file.Seek(0, io.SeekEnd); err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
return file, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// openFreezerFileForReadOnly opens a freezer table file for read only access
|
|
|
|
|
func openFreezerFileForReadOnly(filename string) (*os.File, error) { |
|
|
|
|
return os.OpenFile(filename, os.O_RDONLY, 0644) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// openFreezerFileTruncated opens a freezer table making sure it is truncated
|
|
|
|
|
func openFreezerFileTruncated(filename string) (*os.File, error) { |
|
|
|
|
return os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// truncateFreezerFile resizes a freezer table file and seeks to the end
|
|
|
|
|
func truncateFreezerFile(file *os.File, size int64) error { |
|
|
|
|
if err := file.Truncate(size); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
// Seek to end for append
|
|
|
|
|
if _, err := file.Seek(0, io.SeekEnd); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// newCustomTable opens a freezer table, creating the data and index files if they are
|
|
|
|
|
// non existent. Both files are truncated to the shortest common length to ensure
|
|
|
|
|
// they don't go out of sync.
|
|
|
|
@ -116,13 +155,13 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete |
|
|
|
|
} |
|
|
|
|
var idxName string |
|
|
|
|
if noCompression { |
|
|
|
|
// raw idx
|
|
|
|
|
// Raw idx
|
|
|
|
|
idxName = fmt.Sprintf("%s.ridx", name) |
|
|
|
|
} else { |
|
|
|
|
// compressed idx
|
|
|
|
|
// Compressed idx
|
|
|
|
|
idxName = fmt.Sprintf("%s.cidx", name) |
|
|
|
|
} |
|
|
|
|
offsets, err := os.OpenFile(filepath.Join(path, idxName), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) |
|
|
|
|
offsets, err := openFreezerFileForAppend(filepath.Join(path, idxName)) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
@ -163,7 +202,7 @@ func (t *freezerTable) repair() error { |
|
|
|
|
} |
|
|
|
|
// Ensure the index is a multiple of indexEntrySize bytes
|
|
|
|
|
if overflow := stat.Size() % indexEntrySize; overflow != 0 { |
|
|
|
|
t.index.Truncate(stat.Size() - overflow) // New file can't trigger this path
|
|
|
|
|
truncateFreezerFile(t.index, stat.Size()-overflow) // New file can't trigger this path
|
|
|
|
|
} |
|
|
|
|
// Retrieve the file sizes and prepare for truncation
|
|
|
|
|
if stat, err = t.index.Stat(); err != nil { |
|
|
|
@ -188,7 +227,7 @@ func (t *freezerTable) repair() error { |
|
|
|
|
|
|
|
|
|
t.index.ReadAt(buffer, offsetsSize-indexEntrySize) |
|
|
|
|
lastIndex.unmarshalBinary(buffer) |
|
|
|
|
t.head, err = t.openFile(lastIndex.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND) |
|
|
|
|
t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
@ -204,7 +243,7 @@ func (t *freezerTable) repair() error { |
|
|
|
|
// Truncate the head file to the last offset pointer
|
|
|
|
|
if contentExp < contentSize { |
|
|
|
|
t.logger.Warn("Truncating dangling head", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize)) |
|
|
|
|
if err := t.head.Truncate(contentExp); err != nil { |
|
|
|
|
if err := truncateFreezerFile(t.head, contentExp); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
contentSize = contentExp |
|
|
|
@ -212,7 +251,7 @@ func (t *freezerTable) repair() error { |
|
|
|
|
// Truncate the index to point within the head file
|
|
|
|
|
if contentExp > contentSize { |
|
|
|
|
t.logger.Warn("Truncating dangling indexes", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize)) |
|
|
|
|
if err := t.index.Truncate(offsetsSize - indexEntrySize); err != nil { |
|
|
|
|
if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
offsetsSize -= indexEntrySize |
|
|
|
@ -221,9 +260,9 @@ func (t *freezerTable) repair() error { |
|
|
|
|
newLastIndex.unmarshalBinary(buffer) |
|
|
|
|
// We might have slipped back into an earlier head-file here
|
|
|
|
|
if newLastIndex.filenum != lastIndex.filenum { |
|
|
|
|
// release earlier opened file
|
|
|
|
|
// Release earlier opened file
|
|
|
|
|
t.releaseFile(lastIndex.filenum) |
|
|
|
|
t.head, err = t.openFile(newLastIndex.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND) |
|
|
|
|
t.head, err = t.openFile(newLastIndex.filenum, openFreezerFileForAppend) |
|
|
|
|
if stat, err = t.head.Stat(); err != nil { |
|
|
|
|
// TODO, anything more we can do here?
|
|
|
|
|
// A data file has gone missing...
|
|
|
|
@ -264,16 +303,16 @@ func (t *freezerTable) preopen() (err error) { |
|
|
|
|
t.releaseFilesAfter(0, false) |
|
|
|
|
// Open all except head in RDONLY
|
|
|
|
|
for i := t.tailId; i < t.headId; i++ { |
|
|
|
|
if _, err = t.openFile(i, os.O_RDONLY); err != nil { |
|
|
|
|
if _, err = t.openFile(i, openFreezerFileForReadOnly); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Open head in read/write
|
|
|
|
|
t.head, err = t.openFile(t.headId, os.O_RDWR|os.O_CREATE|os.O_APPEND) |
|
|
|
|
t.head, err = t.openFile(t.headId, openFreezerFileForAppend) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// truncate discards any recent data above the provided threashold number.
|
|
|
|
|
// truncate discards any recent data above the provided threshold number.
|
|
|
|
|
func (t *freezerTable) truncate(items uint64) error { |
|
|
|
|
t.lock.Lock() |
|
|
|
|
defer t.lock.Unlock() |
|
|
|
@ -284,7 +323,7 @@ func (t *freezerTable) truncate(items uint64) error { |
|
|
|
|
} |
|
|
|
|
// Something's out of sync, truncate the table's offset index
|
|
|
|
|
t.logger.Warn("Truncating freezer table", "items", t.items, "limit", items) |
|
|
|
|
if err := t.index.Truncate(int64(items+1) * indexEntrySize); err != nil { |
|
|
|
|
if err := truncateFreezerFile(t.index, int64(items+1)*indexEntrySize); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
// Calculate the new expected size of the data file and truncate it
|
|
|
|
@ -299,18 +338,18 @@ func (t *freezerTable) truncate(items uint64) error { |
|
|
|
|
if expected.filenum != t.headId { |
|
|
|
|
// If already open for reading, force-reopen for writing
|
|
|
|
|
t.releaseFile(expected.filenum) |
|
|
|
|
newHead, err := t.openFile(expected.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND) |
|
|
|
|
newHead, err := t.openFile(expected.filenum, openFreezerFileForAppend) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
// release any files _after the current head -- both the previous head
|
|
|
|
|
// Release any files _after the current head -- both the previous head
|
|
|
|
|
// and any files which may have been opened for reading
|
|
|
|
|
t.releaseFilesAfter(expected.filenum, true) |
|
|
|
|
// set back the historic head
|
|
|
|
|
// Set back the historic head
|
|
|
|
|
t.head = newHead |
|
|
|
|
atomic.StoreUint32(&t.headId, expected.filenum) |
|
|
|
|
} |
|
|
|
|
if err := t.head.Truncate(int64(expected.offset)); err != nil { |
|
|
|
|
if err := truncateFreezerFile(t.head, int64(expected.offset)); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
// All data files truncated, set internal counters and return
|
|
|
|
@ -344,7 +383,7 @@ func (t *freezerTable) Close() error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// openFile assumes that the write-lock is held by the caller
|
|
|
|
|
func (t *freezerTable) openFile(num uint32, flag int) (f *os.File, err error) { |
|
|
|
|
func (t *freezerTable) openFile(num uint32, opener func(string) (*os.File, error)) (f *os.File, err error) { |
|
|
|
|
var exist bool |
|
|
|
|
if f, exist = t.files[num]; !exist { |
|
|
|
|
var name string |
|
|
|
@ -353,7 +392,7 @@ func (t *freezerTable) openFile(num uint32, flag int) (f *os.File, err error) { |
|
|
|
|
} else { |
|
|
|
|
name = fmt.Sprintf("%s.%04d.cdat", t.name, num) |
|
|
|
|
} |
|
|
|
|
f, err = os.OpenFile(filepath.Join(t.path, name), flag, 0644) |
|
|
|
|
f, err = opener(filepath.Join(t.path, name)) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
@ -413,28 +452,27 @@ func (t *freezerTable) Append(item uint64, blob []byte) error { |
|
|
|
|
// we need a new file, writing would overflow
|
|
|
|
|
t.lock.RUnlock() |
|
|
|
|
t.lock.Lock() |
|
|
|
|
nextId := atomic.LoadUint32(&t.headId) + 1 |
|
|
|
|
nextID := atomic.LoadUint32(&t.headId) + 1 |
|
|
|
|
// We open the next file in truncated mode -- if this file already
|
|
|
|
|
// exists, we need to start over from scratch on it
|
|
|
|
|
newHead, err := t.openFile(nextId, os.O_RDWR|os.O_CREATE|os.O_TRUNC) |
|
|
|
|
newHead, err := t.openFile(nextID, openFreezerFileTruncated) |
|
|
|
|
if err != nil { |
|
|
|
|
t.lock.Unlock() |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
// Close old file, and reopen in RDONLY mode
|
|
|
|
|
t.releaseFile(t.headId) |
|
|
|
|
t.openFile(t.headId, os.O_RDONLY) |
|
|
|
|
t.openFile(t.headId, openFreezerFileForReadOnly) |
|
|
|
|
|
|
|
|
|
// Swap out the current head
|
|
|
|
|
t.head = newHead |
|
|
|
|
atomic.StoreUint32(&t.headBytes, 0) |
|
|
|
|
atomic.StoreUint32(&t.headId, nextId) |
|
|
|
|
atomic.StoreUint32(&t.headId, nextID) |
|
|
|
|
t.lock.Unlock() |
|
|
|
|
t.lock.RLock() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
defer t.lock.RUnlock() |
|
|
|
|
|
|
|
|
|
if _, err := t.head.Write(blob); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|