vendor: update leveldb upstream which include a compaction fix (#19163)

pull/19166/head
gary rong 6 years ago committed by Péter Szilágyi
parent f0233948d2
commit 7ebd2fa5db
  1. 28
      vendor/github.com/syndtr/goleveldb/leveldb/db.go
  2. 2
      vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go
  3. 4
      vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go
  4. 6
      vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go
  5. 6
      vendor/github.com/syndtr/goleveldb/leveldb/session.go
  6. 8
      vendor/github.com/syndtr/goleveldb/leveldb/table.go
  7. 41
      vendor/github.com/syndtr/goleveldb/leveldb/version.go
  8. 6
      vendor/vendor.json

@ -468,7 +468,7 @@ func recoverTable(s *session, o *opt.Options) error {
} }
// Commit. // Commit.
return s.commit(rec) return s.commit(rec, false)
} }
func (db *DB) recoverJournal() error { func (db *DB) recoverJournal() error {
@ -538,7 +538,7 @@ func (db *DB) recoverJournal() error {
rec.setJournalNum(fd.Num) rec.setJournalNum(fd.Num)
rec.setSeqNum(db.seq) rec.setSeqNum(db.seq)
if err := db.s.commit(rec); err != nil { if err := db.s.commit(rec, false); err != nil {
fr.Close() fr.Close()
return err return err
} }
@ -617,7 +617,7 @@ func (db *DB) recoverJournal() error {
// Commit. // Commit.
rec.setJournalNum(db.journalFd.Num) rec.setJournalNum(db.journalFd.Num)
rec.setSeqNum(db.seq) rec.setSeqNum(db.seq)
if err := db.s.commit(rec); err != nil { if err := db.s.commit(rec, false); err != nil {
// Close journal on error. // Close journal on error.
if db.journal != nil { if db.journal != nil {
db.journal.Close() db.journal.Close()
@ -872,6 +872,10 @@ func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
// DB. And a nil Range.Limit is treated as a key after all keys in // DB. And a nil Range.Limit is treated as a key after all keys in
// the DB. // the DB.
// //
// WARNING: Any slice returned by interator (e.g. slice returned by calling
// Iterator.Key() or Iterator.Key() methods), its content should not be modified
// unless noted otherwise.
//
// The iterator must be released after use, by calling Release method. // The iterator must be released after use, by calling Release method.
// //
// Also read Iterator documentation of the leveldb/iterator package. // Also read Iterator documentation of the leveldb/iterator package.
@ -953,15 +957,27 @@ func (db *DB) GetProperty(name string) (value string, err error) {
value = "Compactions\n" + value = "Compactions\n" +
" Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" + " Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" +
"-------+------------+---------------+---------------+---------------+---------------\n" "-------+------------+---------------+---------------+---------------+---------------\n"
var totalTables int
var totalSize, totalRead, totalWrite int64
var totalDuration time.Duration
for level, tables := range v.levels { for level, tables := range v.levels {
duration, read, write := db.compStats.getStat(level) duration, read, write := db.compStats.getStat(level)
if len(tables) == 0 && duration == 0 { if len(tables) == 0 && duration == 0 {
continue continue
} }
totalTables += len(tables)
totalSize += tables.size()
totalRead += read
totalWrite += write
totalDuration += duration
value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n", value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(), level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(),
float64(read)/1048576.0, float64(write)/1048576.0) float64(read)/1048576.0, float64(write)/1048576.0)
} }
value += "-------+------------+---------------+---------------+---------------+---------------\n"
value += fmt.Sprintf(" Total | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
totalTables, float64(totalSize)/1048576.0, totalDuration.Seconds(),
float64(totalRead)/1048576.0, float64(totalWrite)/1048576.0)
case p == "iostats": case p == "iostats":
value = fmt.Sprintf("Read(MB):%.5f Write(MB):%.5f", value = fmt.Sprintf("Read(MB):%.5f Write(MB):%.5f",
float64(db.s.stor.reads())/1048576.0, float64(db.s.stor.reads())/1048576.0,
@ -1013,10 +1029,10 @@ type DBStats struct {
BlockCacheSize int BlockCacheSize int
OpenedTablesCount int OpenedTablesCount int
LevelSizes []int64 LevelSizes Sizes
LevelTablesCounts []int LevelTablesCounts []int
LevelRead []int64 LevelRead Sizes
LevelWrite []int64 LevelWrite Sizes
LevelDurations []time.Duration LevelDurations []time.Duration
} }

@ -260,7 +260,7 @@ func (db *DB) compactionCommit(name string, rec *sessionRecord) {
db.compCommitLk.Lock() db.compCommitLk.Lock()
defer db.compCommitLk.Unlock() // Defer is necessary. defer db.compCommitLk.Unlock() // Defer is necessary.
db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error { db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error {
return db.s.commit(rec) return db.s.commit(rec, true)
}, nil) }, nil)
} }

@ -142,6 +142,10 @@ func (snap *Snapshot) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error)
// DB. And a nil Range.Limit is treated as a key after all keys in // DB. And a nil Range.Limit is treated as a key after all keys in
// the DB. // the DB.
// //
// WARNING: Any slice returned by interator (e.g. slice returned by calling
// Iterator.Key() or Iterator.Value() methods), its content should not be
// modified unless noted otherwise.
//
// The iterator must be released after use, by calling Release method. // The iterator must be released after use, by calling Release method.
// Releasing the snapshot doesn't mean releasing the iterator too, the // Releasing the snapshot doesn't mean releasing the iterator too, the
// iterator would be still valid until released. // iterator would be still valid until released.

@ -69,6 +69,10 @@ func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) {
// DB. And a nil Range.Limit is treated as a key after all keys in // DB. And a nil Range.Limit is treated as a key after all keys in
// the DB. // the DB.
// //
// WARNING: Any slice returned by interator (e.g. slice returned by calling
// Iterator.Key() or Iterator.Key() methods), its content should not be modified
// unless noted otherwise.
//
// The iterator must be released after use, by calling Release method. // The iterator must be released after use, by calling Release method.
// //
// Also read Iterator documentation of the leveldb/iterator package. // Also read Iterator documentation of the leveldb/iterator package.
@ -205,7 +209,7 @@ func (tr *Transaction) Commit() error {
tr.stats.startTimer() tr.stats.startTimer()
var cerr error var cerr error
for retry := 0; retry < 3; retry++ { for retry := 0; retry < 3; retry++ {
cerr = tr.db.s.commit(&tr.rec) cerr = tr.db.s.commit(&tr.rec, false)
if cerr != nil { if cerr != nil {
tr.db.logf("transaction@commit error R·%d %q", retry, cerr) tr.db.logf("transaction@commit error R·%d %q", retry, cerr)
select { select {

@ -180,19 +180,19 @@ func (s *session) recover() (err error) {
} }
s.manifestFd = fd s.manifestFd = fd
s.setVersion(staging.finish()) s.setVersion(staging.finish(false))
s.setNextFileNum(rec.nextFileNum) s.setNextFileNum(rec.nextFileNum)
s.recordCommited(rec) s.recordCommited(rec)
return nil return nil
} }
// Commit session; need external synchronization. // Commit session; need external synchronization.
func (s *session) commit(r *sessionRecord) (err error) { func (s *session) commit(r *sessionRecord, trivial bool) (err error) {
v := s.version() v := s.version()
defer v.release() defer v.release()
// spawn new version based on current version // spawn new version based on current version
nv := v.spawn(r) nv := v.spawn(r, trivial)
if s.manifest == nil { if s.manifest == nil {
// manifest journal writer not yet created, create one // manifest journal writer not yet created, create one

@ -150,6 +150,14 @@ func (tf tFiles) searchMax(icmp *iComparer, ikey internalKey) int {
}) })
} }
// Searches smallest index of tables whose its file number
// is smaller than the given number.
func (tf tFiles) searchNumLess(num int64) int {
return sort.Search(len(tf), func(i int) bool {
return tf[i].fd.Num < num
})
}
// Returns true if given key range overlaps with one or more // Returns true if given key range overlaps with one or more
// tables key range. If unsorted is true then binary search will not be used. // tables key range. If unsorted is true then binary search will not be used.
func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool { func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool {

@ -273,10 +273,10 @@ func (v *version) newStaging() *versionStaging {
} }
// Spawn a new version based on this version. // Spawn a new version based on this version.
func (v *version) spawn(r *sessionRecord) *version { func (v *version) spawn(r *sessionRecord, trivial bool) *version {
staging := v.newStaging() staging := v.newStaging()
staging.commit(r) staging.commit(r)
return staging.finish() return staging.finish(trivial)
} }
func (v *version) fillRecord(r *sessionRecord) { func (v *version) fillRecord(r *sessionRecord) {
@ -446,7 +446,7 @@ func (p *versionStaging) commit(r *sessionRecord) {
} }
} }
func (p *versionStaging) finish() *version { func (p *versionStaging) finish(trivial bool) *version {
// Build new version. // Build new version.
nv := newVersion(p.base.s) nv := newVersion(p.base.s)
numLevel := len(p.levels) numLevel := len(p.levels)
@ -463,6 +463,12 @@ func (p *versionStaging) finish() *version {
if level < len(p.levels) { if level < len(p.levels) {
scratch := p.levels[level] scratch := p.levels[level]
// Short circuit if there is no change at all.
if len(scratch.added) == 0 && len(scratch.deleted) == 0 {
nv.levels[level] = baseTabels
continue
}
var nt tFiles var nt tFiles
// Prealloc list if possible. // Prealloc list if possible.
if n := len(baseTabels) + len(scratch.added) - len(scratch.deleted); n > 0 { if n := len(baseTabels) + len(scratch.added) - len(scratch.deleted); n > 0 {
@ -480,6 +486,35 @@ func (p *versionStaging) finish() *version {
nt = append(nt, t) nt = append(nt, t)
} }
// For normal table compaction, one compaction will only involve two levels
// of files. And the new files generated after merging the source level and
// source+1 level related files can be inserted as a whole into source+1 level
// without any overlap with the other source+1 files.
//
// When the amount of data maintained by leveldb is large, the number of files
// per level will be very large. While qsort is very inefficient for sorting
// already ordered arrays. Therefore, for the normal table compaction, we use
// binary search here to find the insert index to insert a batch of new added
// files directly instead of using qsort.
if trivial && len(scratch.added) > 0 {
added := make(tFiles, 0, len(scratch.added))
for _, r := range scratch.added {
added = append(added, tableFileFromRecord(r))
}
if level == 0 {
added.sortByNum()
index := nt.searchNumLess(added[len(added)-1].fd.Num)
nt = append(nt[:index], append(added, nt[index:]...)...)
} else {
added.sortByKey(p.base.s.icmp)
_, amax := added.getRange(p.base.s.icmp)
index := nt.searchMin(p.base.s.icmp, amax)
nt = append(nt[:index], append(added, nt[index:]...)...)
}
nv.levels[level] = nt
continue
}
// New tables. // New tables.
for _, r := range scratch.added { for _, r := range scratch.added {
nt = append(nt, tableFileFromRecord(r)) nt = append(nt, tableFileFromRecord(r))

@ -455,10 +455,10 @@
"revisionTime": "2017-07-05T02:17:15Z" "revisionTime": "2017-07-05T02:17:15Z"
}, },
{ {
"checksumSHA1": "LV0VMVON7xY1ttV+s2ph83ntmDQ=", "checksumSHA1": "4DuP8qJfeXFfdbcl4wr7l1VppcY=",
"path": "github.com/syndtr/goleveldb/leveldb", "path": "github.com/syndtr/goleveldb/leveldb",
"revision": "b001fa50d6b27f3f0bb175a87d0cb55426d0a0ae", "revision": "4217c9f31f5816db02addc94e56061da77f288d8",
"revisionTime": "2018-11-28T10:09:59Z" "revisionTime": "2019-02-26T15:37:22Z"
}, },
{ {
"checksumSHA1": "mPNraL2edpk/2FYq26rSXfMHbJg=", "checksumSHA1": "mPNraL2edpk/2FYq26rSXfMHbJg=",

Loading…
Cancel
Save