From 80bdab757dfb0f6d73fb869d834979536fe474e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felf=C3=B6ldi=20Zsolt?= Date: Fri, 25 Oct 2024 17:33:46 +0200 Subject: [PATCH] ethdb: add DeleteRange feature (#30668) This PR adds `DeleteRange` to `ethdb.KeyValueWriter`. While range deletion using an iterator can be really slow, `DeleteRange` is natively supported by pebble and apparently runs in O(1) time (typically 20-30ms in my tests for removing hundreds of millions of keys and gigabytes of data). For leveldb and memorydb an iterator based fallback is implemented. Note that since the iterator method can be slow and a database function should not unexpectedly block for a very long time, the number of deleted keys is limited at 10000 which should ensure that it does not block for more than a second. ErrTooManyKeys is returned if the range has only been partially deleted. In this case the caller can repeat the call until it finally succeeds. --- core/rawdb/table.go | 6 +++ ethdb/database.go | 9 +++++ ethdb/dbtest/testsuite.go | 82 ++++++++++++++++++++++++++++++++++++++ ethdb/leveldb/leveldb.go | 31 ++++++++++++++ ethdb/memorydb/memorydb.go | 15 +++++++ ethdb/pebble/pebble.go | 13 +++++- ethdb/remotedb/remotedb.go | 4 ++ trie/trie_test.go | 1 + trie/trienode/proof.go | 4 ++ 9 files changed, 164 insertions(+), 1 deletion(-) diff --git a/core/rawdb/table.go b/core/rawdb/table.go index bc1d354d10..1a9060b636 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -129,6 +129,12 @@ func (t *table) Delete(key []byte) error { return t.db.Delete(append([]byte(t.prefix), key...)) } +// DeleteRange deletes all of the keys (and values) in the range [start,end) +// (inclusive on start, exclusive on end). +func (t *table) DeleteRange(start, end []byte) error { + return t.db.DeleteRange(append([]byte(t.prefix), start...), append([]byte(t.prefix), end...)) +} + // NewIterator creates a binary-alphabetical iterator over a subset // of database content with a particular key prefix, starting at a particular // initial key (or after, if it does not exist). diff --git a/ethdb/database.go b/ethdb/database.go index c6e76fd2fe..9bf4427293 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -37,6 +37,13 @@ type KeyValueWriter interface { Delete(key []byte) error } +// KeyValueRangeDeleter wraps the DeleteRange method of a backing data store. +type KeyValueRangeDeleter interface { + // DeleteRange deletes all of the keys (and values) in the range [start,end) + // (inclusive on start, exclusive on end). + DeleteRange(start, end []byte) error +} + // KeyValueStater wraps the Stat method of a backing data store. type KeyValueStater interface { // Stat returns the statistic data of the database. @@ -61,6 +68,7 @@ type KeyValueStore interface { KeyValueReader KeyValueWriter KeyValueStater + KeyValueRangeDeleter Batcher Iteratee Compacter @@ -158,6 +166,7 @@ type Reader interface { // immutable ancient data. type Writer interface { KeyValueWriter + KeyValueRangeDeleter AncientWriter } diff --git a/ethdb/dbtest/testsuite.go b/ethdb/dbtest/testsuite.go index 1af55a0e38..52e6b287cf 100644 --- a/ethdb/dbtest/testsuite.go +++ b/ethdb/dbtest/testsuite.go @@ -21,6 +21,7 @@ import ( "crypto/rand" "slices" "sort" + "strconv" "testing" "github.com/ethereum/go-ethereum/ethdb" @@ -343,6 +344,64 @@ func TestDatabaseSuite(t *testing.T, New func() ethdb.KeyValueStore) { t.Fatalf("expected error on batch.Write after Close") } }) + + t.Run("DeleteRange", func(t *testing.T) { + db := New() + defer db.Close() + + addRange := func(start, stop int) { + for i := start; i <= stop; i++ { + db.Put([]byte(strconv.Itoa(i)), nil) + } + } + + checkRange := func(start, stop int, exp bool) { + for i := start; i <= stop; i++ { + has, _ := db.Has([]byte(strconv.Itoa(i))) + if has && !exp { + t.Fatalf("unexpected key %d", i) + } + if !has && exp { + t.Fatalf("missing expected key %d", i) + } + } + } + + addRange(1, 9) + db.DeleteRange([]byte("9"), []byte("1")) + checkRange(1, 9, true) + db.DeleteRange([]byte("5"), []byte("5")) + checkRange(1, 9, true) + db.DeleteRange([]byte("5"), []byte("50")) + checkRange(1, 4, true) + checkRange(5, 5, false) + checkRange(6, 9, true) + db.DeleteRange([]byte(""), []byte("a")) + checkRange(1, 9, false) + + addRange(1, 999) + db.DeleteRange([]byte("12345"), []byte("54321")) + checkRange(1, 1, true) + checkRange(2, 5, false) + checkRange(6, 12, true) + checkRange(13, 54, false) + checkRange(55, 123, true) + checkRange(124, 543, false) + checkRange(544, 999, true) + + addRange(1, 999) + db.DeleteRange([]byte("3"), []byte("7")) + checkRange(1, 2, true) + checkRange(3, 6, false) + checkRange(7, 29, true) + checkRange(30, 69, false) + checkRange(70, 299, true) + checkRange(300, 699, false) + checkRange(700, 999, true) + + db.DeleteRange([]byte(""), []byte("a")) + checkRange(1, 999, false) + }) } // BenchDatabaseSuite runs a suite of benchmarks against a KeyValueStore database @@ -438,6 +497,29 @@ func BenchDatabaseSuite(b *testing.B, New func() ethdb.KeyValueStore) { benchBatchWrite(b, keys, vals) }) }) + b.Run("DeleteRange", func(b *testing.B) { + benchDeleteRange := func(b *testing.B, count int) { + db := New() + defer db.Close() + + for i := 0; i < count; i++ { + db.Put([]byte(strconv.Itoa(i)), nil) + } + b.ResetTimer() + b.ReportAllocs() + + db.DeleteRange([]byte("0"), []byte("999999999")) + } + b.Run("DeleteRange100", func(b *testing.B) { + benchDeleteRange(b, 100) + }) + b.Run("DeleteRange1k", func(b *testing.B) { + benchDeleteRange(b, 1000) + }) + b.Run("DeleteRange10k", func(b *testing.B) { + benchDeleteRange(b, 10000) + }) + }) } func iterateKeys(it ethdb.Iterator) []string { diff --git a/ethdb/leveldb/leveldb.go b/ethdb/leveldb/leveldb.go index 24925a4f04..ce7d823561 100644 --- a/ethdb/leveldb/leveldb.go +++ b/ethdb/leveldb/leveldb.go @@ -21,6 +21,7 @@ package leveldb import ( + "bytes" "fmt" "sync" "time" @@ -206,6 +207,36 @@ func (db *Database) Delete(key []byte) error { return db.db.Delete(key, nil) } +var ErrTooManyKeys = errors.New("too many keys in deleted range") + +// DeleteRange deletes all of the keys (and values) in the range [start,end) +// (inclusive on start, exclusive on end). +// Note that this is a fallback implementation as leveldb does not natively +// support range deletion. It can be slow and therefore the number of deleted +// keys is limited in order to avoid blocking for a very long time. +// ErrTooManyKeys is returned if the range has only been partially deleted. +// In this case the caller can repeat the call until it finally succeeds. +func (db *Database) DeleteRange(start, end []byte) error { + batch := db.NewBatch() + it := db.NewIterator(nil, start) + defer it.Release() + + var count int + for it.Next() && bytes.Compare(end, it.Key()) > 0 { + count++ + if count > 10000 { // should not block for more than a second + if err := batch.Write(); err != nil { + return err + } + return ErrTooManyKeys + } + if err := batch.Delete(it.Key()); err != nil { + return err + } + } + return batch.Write() +} + // NewBatch creates a write-only key-value store that buffers changes to its host // database until a final write is called. func (db *Database) NewBatch() ethdb.Batch { diff --git a/ethdb/memorydb/memorydb.go b/ethdb/memorydb/memorydb.go index 532e0dfe3f..c6fba39cfd 100644 --- a/ethdb/memorydb/memorydb.go +++ b/ethdb/memorydb/memorydb.go @@ -18,6 +18,7 @@ package memorydb import ( + "bytes" "errors" "sort" "strings" @@ -121,6 +122,20 @@ func (db *Database) Delete(key []byte) error { return nil } +// DeleteRange deletes all of the keys (and values) in the range [start,end) +// (inclusive on start, exclusive on end). +func (db *Database) DeleteRange(start, end []byte) error { + it := db.NewIterator(nil, start) + defer it.Release() + + for it.Next() && bytes.Compare(end, it.Key()) > 0 { + if err := db.Delete(it.Key()); err != nil { + return err + } + } + return nil +} + // NewBatch creates a write-only key-value store that buffers changes to its host // database until a final write is called. func (db *Database) NewBatch() ethdb.Batch { diff --git a/ethdb/pebble/pebble.go b/ethdb/pebble/pebble.go index e2ba9b8c7b..a9151a3bb5 100644 --- a/ethdb/pebble/pebble.go +++ b/ethdb/pebble/pebble.go @@ -335,7 +335,18 @@ func (d *Database) Delete(key []byte) error { if d.closed { return pebble.ErrClosed } - return d.db.Delete(key, nil) + return d.db.Delete(key, d.writeOptions) +} + +// DeleteRange deletes all of the keys (and values) in the range [start,end) +// (inclusive on start, exclusive on end). +func (d *Database) DeleteRange(start, end []byte) error { + d.quitLock.RLock() + defer d.quitLock.RUnlock() + if d.closed { + return pebble.ErrClosed + } + return d.db.DeleteRange(start, end, d.writeOptions) } // NewBatch creates a write-only key-value store that buffers changes to its host diff --git a/ethdb/remotedb/remotedb.go b/ethdb/remotedb/remotedb.go index d0f018cb01..247a4392db 100644 --- a/ethdb/remotedb/remotedb.go +++ b/ethdb/remotedb/remotedb.go @@ -94,6 +94,10 @@ func (db *Database) Delete(key []byte) error { panic("not supported") } +func (db *Database) DeleteRange(start, end []byte) error { + panic("not supported") +} + func (db *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) { panic("not supported") } diff --git a/trie/trie_test.go b/trie/trie_test.go index 9b2530bdd4..423ed30fe8 100644 --- a/trie/trie_test.go +++ b/trie/trie_test.go @@ -819,6 +819,7 @@ type spongeDb struct { func (s *spongeDb) Has(key []byte) (bool, error) { panic("implement me") } func (s *spongeDb) Get(key []byte) ([]byte, error) { return nil, errors.New("no such elem") } func (s *spongeDb) Delete(key []byte) error { panic("implement me") } +func (s *spongeDb) DeleteRange(start, end []byte) error { panic("implement me") } func (s *spongeDb) NewBatch() ethdb.Batch { return &spongeBatch{s} } func (s *spongeDb) NewBatchWithSize(size int) ethdb.Batch { return &spongeBatch{s} } func (s *spongeDb) Stat() (string, error) { panic("implement me") } diff --git a/trie/trienode/proof.go b/trie/trienode/proof.go index d3075ecccf..01a07c05b0 100644 --- a/trie/trienode/proof.go +++ b/trie/trienode/proof.go @@ -69,6 +69,10 @@ func (db *ProofSet) Delete(key []byte) error { return nil } +func (db *ProofSet) DeleteRange(start, end []byte) error { + panic("not supported") +} + // Get returns a stored node func (db *ProofSet) Get(key []byte) ([]byte, error) { db.lock.RLock()