|
|
|
@ -9,13 +9,15 @@ package leveldb |
|
|
|
|
import ( |
|
|
|
|
"encoding/binary" |
|
|
|
|
"fmt" |
|
|
|
|
"io" |
|
|
|
|
|
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/errors" |
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/memdb" |
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/storage" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// ErrBatchCorrupted records reason of batch corruption.
|
|
|
|
|
// ErrBatchCorrupted records reason of batch corruption. This error will be
|
|
|
|
|
// wrapped with errors.ErrCorrupted.
|
|
|
|
|
type ErrBatchCorrupted struct { |
|
|
|
|
Reason string |
|
|
|
|
} |
|
|
|
@ -29,8 +31,9 @@ func newErrBatchCorrupted(reason string) error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
|
batchHdrLen = 8 + 4 |
|
|
|
|
batchGrowRec = 3000 |
|
|
|
|
batchHeaderLen = 8 + 4 |
|
|
|
|
batchGrowRec = 3000 |
|
|
|
|
batchBufioSize = 16 |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// BatchReplay wraps basic batch operations.
|
|
|
|
@ -39,34 +42,46 @@ type BatchReplay interface { |
|
|
|
|
Delete(key []byte) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type batchIndex struct { |
|
|
|
|
keyType keyType |
|
|
|
|
keyPos, keyLen int |
|
|
|
|
valuePos, valueLen int |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (index batchIndex) k(data []byte) []byte { |
|
|
|
|
return data[index.keyPos : index.keyPos+index.keyLen] |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (index batchIndex) v(data []byte) []byte { |
|
|
|
|
if index.valueLen != 0 { |
|
|
|
|
return data[index.valuePos : index.valuePos+index.valueLen] |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (index batchIndex) kv(data []byte) (key, value []byte) { |
|
|
|
|
return index.k(data), index.v(data) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Batch is a write batch.
|
|
|
|
|
type Batch struct { |
|
|
|
|
data []byte |
|
|
|
|
rLen, bLen int |
|
|
|
|
seq uint64 |
|
|
|
|
sync bool |
|
|
|
|
data []byte |
|
|
|
|
index []batchIndex |
|
|
|
|
|
|
|
|
|
// internalLen is sums of key/value pair length plus 8-bytes internal key.
|
|
|
|
|
internalLen int |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b *Batch) grow(n int) { |
|
|
|
|
off := len(b.data) |
|
|
|
|
if off == 0 { |
|
|
|
|
off = batchHdrLen |
|
|
|
|
if b.data != nil { |
|
|
|
|
b.data = b.data[:off] |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if cap(b.data)-off < n { |
|
|
|
|
if b.data == nil { |
|
|
|
|
b.data = make([]byte, off, off+n) |
|
|
|
|
} else { |
|
|
|
|
odata := b.data |
|
|
|
|
div := 1 |
|
|
|
|
if b.rLen > batchGrowRec { |
|
|
|
|
div = b.rLen / batchGrowRec |
|
|
|
|
} |
|
|
|
|
b.data = make([]byte, off, off+n+(off-batchHdrLen)/div) |
|
|
|
|
copy(b.data, odata) |
|
|
|
|
o := len(b.data) |
|
|
|
|
if cap(b.data)-o < n { |
|
|
|
|
div := 1 |
|
|
|
|
if len(b.index) > batchGrowRec { |
|
|
|
|
div = len(b.index) / batchGrowRec |
|
|
|
|
} |
|
|
|
|
ndata := make([]byte, o, o+n+o/div) |
|
|
|
|
copy(ndata, b.data) |
|
|
|
|
b.data = ndata |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -76,32 +91,36 @@ func (b *Batch) appendRec(kt keyType, key, value []byte) { |
|
|
|
|
n += binary.MaxVarintLen32 + len(value) |
|
|
|
|
} |
|
|
|
|
b.grow(n) |
|
|
|
|
off := len(b.data) |
|
|
|
|
data := b.data[:off+n] |
|
|
|
|
data[off] = byte(kt) |
|
|
|
|
off++ |
|
|
|
|
off += binary.PutUvarint(data[off:], uint64(len(key))) |
|
|
|
|
copy(data[off:], key) |
|
|
|
|
off += len(key) |
|
|
|
|
index := batchIndex{keyType: kt} |
|
|
|
|
o := len(b.data) |
|
|
|
|
data := b.data[:o+n] |
|
|
|
|
data[o] = byte(kt) |
|
|
|
|
o++ |
|
|
|
|
o += binary.PutUvarint(data[o:], uint64(len(key))) |
|
|
|
|
index.keyPos = o |
|
|
|
|
index.keyLen = len(key) |
|
|
|
|
o += copy(data[o:], key) |
|
|
|
|
if kt == keyTypeVal { |
|
|
|
|
off += binary.PutUvarint(data[off:], uint64(len(value))) |
|
|
|
|
copy(data[off:], value) |
|
|
|
|
off += len(value) |
|
|
|
|
o += binary.PutUvarint(data[o:], uint64(len(value))) |
|
|
|
|
index.valuePos = o |
|
|
|
|
index.valueLen = len(value) |
|
|
|
|
o += copy(data[o:], value) |
|
|
|
|
} |
|
|
|
|
b.data = data[:off] |
|
|
|
|
b.rLen++ |
|
|
|
|
// Include 8-byte ikey header
|
|
|
|
|
b.bLen += len(key) + len(value) + 8 |
|
|
|
|
b.data = data[:o] |
|
|
|
|
b.index = append(b.index, index) |
|
|
|
|
b.internalLen += index.keyLen + index.valueLen + 8 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Put appends 'put operation' of the given key/value pair to the batch.
|
|
|
|
|
// It is safe to modify the contents of the argument after Put returns.
|
|
|
|
|
// It is safe to modify the contents of the argument after Put returns but not
|
|
|
|
|
// before.
|
|
|
|
|
func (b *Batch) Put(key, value []byte) { |
|
|
|
|
b.appendRec(keyTypeVal, key, value) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Delete appends 'delete operation' of the given key to the batch.
|
|
|
|
|
// It is safe to modify the contents of the argument after Delete returns.
|
|
|
|
|
// It is safe to modify the contents of the argument after Delete returns but
|
|
|
|
|
// not before.
|
|
|
|
|
func (b *Batch) Delete(key []byte) { |
|
|
|
|
b.appendRec(keyTypeDel, key, nil) |
|
|
|
|
} |
|
|
|
@ -111,7 +130,7 @@ func (b *Batch) Delete(key []byte) { |
|
|
|
|
// The returned slice is not its own copy, so the contents should not be
|
|
|
|
|
// modified.
|
|
|
|
|
func (b *Batch) Dump() []byte { |
|
|
|
|
return b.encode() |
|
|
|
|
return b.data |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Load loads given slice into the batch. Previous contents of the batch
|
|
|
|
@ -119,144 +138,212 @@ func (b *Batch) Dump() []byte { |
|
|
|
|
// The given slice will not be copied and will be used as batch buffer, so
|
|
|
|
|
// it is not safe to modify the contents of the slice.
|
|
|
|
|
func (b *Batch) Load(data []byte) error { |
|
|
|
|
return b.decode(0, data) |
|
|
|
|
return b.decode(data, -1) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Replay replays batch contents.
|
|
|
|
|
func (b *Batch) Replay(r BatchReplay) error { |
|
|
|
|
return b.decodeRec(func(i int, kt keyType, key, value []byte) error { |
|
|
|
|
switch kt { |
|
|
|
|
for _, index := range b.index { |
|
|
|
|
switch index.keyType { |
|
|
|
|
case keyTypeVal: |
|
|
|
|
r.Put(key, value) |
|
|
|
|
r.Put(index.k(b.data), index.v(b.data)) |
|
|
|
|
case keyTypeDel: |
|
|
|
|
r.Delete(key) |
|
|
|
|
r.Delete(index.k(b.data)) |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Len returns number of records in the batch.
|
|
|
|
|
func (b *Batch) Len() int { |
|
|
|
|
return b.rLen |
|
|
|
|
return len(b.index) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Reset resets the batch.
|
|
|
|
|
func (b *Batch) Reset() { |
|
|
|
|
b.data = b.data[:0] |
|
|
|
|
b.seq = 0 |
|
|
|
|
b.rLen = 0 |
|
|
|
|
b.bLen = 0 |
|
|
|
|
b.sync = false |
|
|
|
|
b.index = b.index[:0] |
|
|
|
|
b.internalLen = 0 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b *Batch) init(sync bool) { |
|
|
|
|
b.sync = sync |
|
|
|
|
func (b *Batch) replayInternal(fn func(i int, kt keyType, k, v []byte) error) error { |
|
|
|
|
for i, index := range b.index { |
|
|
|
|
if err := fn(i, index.keyType, index.k(b.data), index.v(b.data)); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b *Batch) append(p *Batch) { |
|
|
|
|
if p.rLen > 0 { |
|
|
|
|
b.grow(len(p.data) - batchHdrLen) |
|
|
|
|
b.data = append(b.data, p.data[batchHdrLen:]...) |
|
|
|
|
b.rLen += p.rLen |
|
|
|
|
b.bLen += p.bLen |
|
|
|
|
} |
|
|
|
|
if p.sync { |
|
|
|
|
b.sync = true |
|
|
|
|
ob := len(b.data) |
|
|
|
|
oi := len(b.index) |
|
|
|
|
b.data = append(b.data, p.data...) |
|
|
|
|
b.index = append(b.index, p.index...) |
|
|
|
|
b.internalLen += p.internalLen |
|
|
|
|
|
|
|
|
|
// Updating index offset.
|
|
|
|
|
if ob != 0 { |
|
|
|
|
for ; oi < len(b.index); oi++ { |
|
|
|
|
index := &b.index[oi] |
|
|
|
|
index.keyPos += ob |
|
|
|
|
if index.valueLen != 0 { |
|
|
|
|
index.valuePos += ob |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// size returns sums of key/value pair length plus 8-bytes ikey.
|
|
|
|
|
func (b *Batch) size() int { |
|
|
|
|
return b.bLen |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b *Batch) encode() []byte { |
|
|
|
|
b.grow(0) |
|
|
|
|
binary.LittleEndian.PutUint64(b.data, b.seq) |
|
|
|
|
binary.LittleEndian.PutUint32(b.data[8:], uint32(b.rLen)) |
|
|
|
|
|
|
|
|
|
return b.data |
|
|
|
|
func (b *Batch) decode(data []byte, expectedLen int) error { |
|
|
|
|
b.data = data |
|
|
|
|
b.index = b.index[:0] |
|
|
|
|
b.internalLen = 0 |
|
|
|
|
err := decodeBatch(data, func(i int, index batchIndex) error { |
|
|
|
|
b.index = append(b.index, index) |
|
|
|
|
b.internalLen += index.keyLen + index.valueLen + 8 |
|
|
|
|
return nil |
|
|
|
|
}) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if expectedLen >= 0 && len(b.index) != expectedLen { |
|
|
|
|
return newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", expectedLen, len(b.index))) |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b *Batch) decode(prevSeq uint64, data []byte) error { |
|
|
|
|
if len(data) < batchHdrLen { |
|
|
|
|
return newErrBatchCorrupted("too short") |
|
|
|
|
func (b *Batch) putMem(seq uint64, mdb *memdb.DB) error { |
|
|
|
|
var ik []byte |
|
|
|
|
for i, index := range b.index { |
|
|
|
|
ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType) |
|
|
|
|
if err := mdb.Put(ik, index.v(b.data)); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
b.seq = binary.LittleEndian.Uint64(data) |
|
|
|
|
if b.seq < prevSeq { |
|
|
|
|
return newErrBatchCorrupted("invalid sequence number") |
|
|
|
|
} |
|
|
|
|
b.rLen = int(binary.LittleEndian.Uint32(data[8:])) |
|
|
|
|
if b.rLen < 0 { |
|
|
|
|
return newErrBatchCorrupted("invalid records length") |
|
|
|
|
func (b *Batch) revertMem(seq uint64, mdb *memdb.DB) error { |
|
|
|
|
var ik []byte |
|
|
|
|
for i, index := range b.index { |
|
|
|
|
ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType) |
|
|
|
|
if err := mdb.Delete(ik); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// No need to be precise at this point, it won't be used anyway
|
|
|
|
|
b.bLen = len(data) - batchHdrLen |
|
|
|
|
b.data = data |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b *Batch) decodeRec(f func(i int, kt keyType, key, value []byte) error) error { |
|
|
|
|
off := batchHdrLen |
|
|
|
|
for i := 0; i < b.rLen; i++ { |
|
|
|
|
if off >= len(b.data) { |
|
|
|
|
return newErrBatchCorrupted("invalid records length") |
|
|
|
|
} |
|
|
|
|
func newBatch() interface{} { |
|
|
|
|
return &Batch{} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
kt := keyType(b.data[off]) |
|
|
|
|
if kt > keyTypeVal { |
|
|
|
|
panic(kt) |
|
|
|
|
return newErrBatchCorrupted("bad record: invalid type") |
|
|
|
|
func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error { |
|
|
|
|
var index batchIndex |
|
|
|
|
for i, o := 0, 0; o < len(data); i++ { |
|
|
|
|
// Key type.
|
|
|
|
|
index.keyType = keyType(data[o]) |
|
|
|
|
if index.keyType > keyTypeVal { |
|
|
|
|
return newErrBatchCorrupted(fmt.Sprintf("bad record: invalid type %#x", uint(index.keyType))) |
|
|
|
|
} |
|
|
|
|
off++ |
|
|
|
|
o++ |
|
|
|
|
|
|
|
|
|
x, n := binary.Uvarint(b.data[off:]) |
|
|
|
|
off += n |
|
|
|
|
if n <= 0 || off+int(x) > len(b.data) { |
|
|
|
|
// Key.
|
|
|
|
|
x, n := binary.Uvarint(data[o:]) |
|
|
|
|
o += n |
|
|
|
|
if n <= 0 || o+int(x) > len(data) { |
|
|
|
|
return newErrBatchCorrupted("bad record: invalid key length") |
|
|
|
|
} |
|
|
|
|
key := b.data[off : off+int(x)] |
|
|
|
|
off += int(x) |
|
|
|
|
var value []byte |
|
|
|
|
if kt == keyTypeVal { |
|
|
|
|
x, n := binary.Uvarint(b.data[off:]) |
|
|
|
|
off += n |
|
|
|
|
if n <= 0 || off+int(x) > len(b.data) { |
|
|
|
|
index.keyPos = o |
|
|
|
|
index.keyLen = int(x) |
|
|
|
|
o += index.keyLen |
|
|
|
|
|
|
|
|
|
// Value.
|
|
|
|
|
if index.keyType == keyTypeVal { |
|
|
|
|
x, n = binary.Uvarint(data[o:]) |
|
|
|
|
o += n |
|
|
|
|
if n <= 0 || o+int(x) > len(data) { |
|
|
|
|
return newErrBatchCorrupted("bad record: invalid value length") |
|
|
|
|
} |
|
|
|
|
value = b.data[off : off+int(x)] |
|
|
|
|
off += int(x) |
|
|
|
|
index.valuePos = o |
|
|
|
|
index.valueLen = int(x) |
|
|
|
|
o += index.valueLen |
|
|
|
|
} else { |
|
|
|
|
index.valuePos = 0 |
|
|
|
|
index.valueLen = 0 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if err := f(i, kt, key, value); err != nil { |
|
|
|
|
if err := fn(i, index); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b *Batch) memReplay(to *memdb.DB) error { |
|
|
|
|
var ikScratch []byte |
|
|
|
|
return b.decodeRec(func(i int, kt keyType, key, value []byte) error { |
|
|
|
|
ikScratch = makeInternalKey(ikScratch, key, b.seq+uint64(i), kt) |
|
|
|
|
return to.Put(ikScratch, value) |
|
|
|
|
func decodeBatchToMem(data []byte, expectSeq uint64, mdb *memdb.DB) (seq uint64, batchLen int, err error) { |
|
|
|
|
seq, batchLen, err = decodeBatchHeader(data) |
|
|
|
|
if err != nil { |
|
|
|
|
return 0, 0, err |
|
|
|
|
} |
|
|
|
|
if seq < expectSeq { |
|
|
|
|
return 0, 0, newErrBatchCorrupted("invalid sequence number") |
|
|
|
|
} |
|
|
|
|
data = data[batchHeaderLen:] |
|
|
|
|
var ik []byte |
|
|
|
|
var decodedLen int |
|
|
|
|
err = decodeBatch(data, func(i int, index batchIndex) error { |
|
|
|
|
if i >= batchLen { |
|
|
|
|
return newErrBatchCorrupted("invalid records length") |
|
|
|
|
} |
|
|
|
|
ik = makeInternalKey(ik, index.k(data), seq+uint64(i), index.keyType) |
|
|
|
|
if err := mdb.Put(ik, index.v(data)); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
decodedLen++ |
|
|
|
|
return nil |
|
|
|
|
}) |
|
|
|
|
if err == nil && decodedLen != batchLen { |
|
|
|
|
err = newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", batchLen, decodedLen)) |
|
|
|
|
} |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b *Batch) memDecodeAndReplay(prevSeq uint64, data []byte, to *memdb.DB) error { |
|
|
|
|
if err := b.decode(prevSeq, data); err != nil { |
|
|
|
|
return err |
|
|
|
|
func encodeBatchHeader(dst []byte, seq uint64, batchLen int) []byte { |
|
|
|
|
dst = ensureBuffer(dst, batchHeaderLen) |
|
|
|
|
binary.LittleEndian.PutUint64(dst, seq) |
|
|
|
|
binary.LittleEndian.PutUint32(dst[8:], uint32(batchLen)) |
|
|
|
|
return dst |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func decodeBatchHeader(data []byte) (seq uint64, batchLen int, err error) { |
|
|
|
|
if len(data) < batchHeaderLen { |
|
|
|
|
return 0, 0, newErrBatchCorrupted("too short") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
seq = binary.LittleEndian.Uint64(data) |
|
|
|
|
batchLen = int(binary.LittleEndian.Uint32(data[8:])) |
|
|
|
|
if batchLen < 0 { |
|
|
|
|
return 0, 0, newErrBatchCorrupted("invalid records length") |
|
|
|
|
} |
|
|
|
|
return b.memReplay(to) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b *Batch) revertMemReplay(to *memdb.DB) error { |
|
|
|
|
var ikScratch []byte |
|
|
|
|
return b.decodeRec(func(i int, kt keyType, key, value []byte) error { |
|
|
|
|
ikScratch := makeInternalKey(ikScratch, key, b.seq+uint64(i), kt) |
|
|
|
|
return to.Delete(ikScratch) |
|
|
|
|
}) |
|
|
|
|
func batchesLen(batches []*Batch) int { |
|
|
|
|
batchLen := 0 |
|
|
|
|
for _, batch := range batches { |
|
|
|
|
batchLen += batch.Len() |
|
|
|
|
} |
|
|
|
|
return batchLen |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func writeBatchesWithHeader(wr io.Writer, batches []*Batch, seq uint64) error { |
|
|
|
|
if _, err := wr.Write(encodeBatchHeader(nil, seq, batchesLen(batches))); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
for _, batch := range batches { |
|
|
|
|
if _, err := wr.Write(batch.data); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|