core: journal the snapshot inside leveldb, not a flat file

pull/20152/head
Péter Szilágyi 5 years ago
parent d5d7c0c24b
commit fd39f722a3
No known key found for this signature in database
GPG Key ID: E9AE538CEDF8293D
  1. 4
      core/blockchain.go
  2. 23
      core/rawdb/accessors_snapshot.go
  3. 5
      core/rawdb/schema.go
  4. 5
      core/state/snapshot/difflayer_test.go
  5. 70
      core/state/snapshot/journal.go
  6. 22
      core/state/snapshot/snapshot.go
  7. 4
      core/state/statedb.go

@ -302,7 +302,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
}
}
// Load any existing snapshot, regenerating it if loading failed
bc.snaps = snapshot.New(bc.db, bc.stateCache.TrieDB(), "snapshot.rlp", bc.cacheConfig.SnapshotLimit, bc.CurrentBlock().Root())
bc.snaps = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, bc.CurrentBlock().Root())
// Take ownership of this particular state
go bc.update()
@ -854,7 +854,7 @@ func (bc *BlockChain) Stop() {
bc.wg.Wait()
// Ensure that the entirety of the state snapshot is journalled to disk.
snapBase, err := bc.snaps.Journal(bc.CurrentBlock().Root(), "snapshot.rlp")
snapBase, err := bc.snaps.Journal(bc.CurrentBlock().Root())
if err != nil {
log.Error("Failed to journal state snapshot", "err", err)
}

@ -95,3 +95,26 @@ func DeleteStorageSnapshot(db ethdb.KeyValueWriter, accountHash, storageHash com
func IterateStorageSnapshots(db ethdb.Iteratee, accountHash common.Hash) ethdb.Iterator {
return db.NewIteratorWithPrefix(storageSnapshotsKey(accountHash))
}
// ReadSnapshotJournal retrieves the serialized in-memory diff layers saved at
// the last shutdown. The blob is expected to be max a few 10s of megabytes.
func ReadSnapshotJournal(db ethdb.KeyValueReader) []byte {
data, _ := db.Get(snapshotJournalKey)
return data
}
// WriteSnapshotJournal stores the serialized in-memory diff layers to save at
// shutdown. The blob is expected to be max a few 10s of megabytes.
func WriteSnapshotJournal(db ethdb.KeyValueWriter, journal []byte) {
if err := db.Put(snapshotJournalKey, journal); err != nil {
log.Crit("Failed to store snapshot journal", "err", err)
}
}
// DeleteSnapshotJournal deletes the serialized in-memory diff layers saved at
// the last shutdown
func DeleteSnapshotJournal(db ethdb.KeyValueWriter) {
if err := db.Delete(snapshotJournalKey); err != nil {
log.Crit("Failed to remove snapshot journal", "err", err)
}
}

@ -41,9 +41,12 @@ var (
// fastTrieProgressKey tracks the number of trie entries imported during fast sync.
fastTrieProgressKey = []byte("TrieSync")
// snapshotRootKey tracks the number and hash of the last snapshot.
// snapshotRootKey tracks the hash of the last snapshot.
snapshotRootKey = []byte("SnapshotRoot")
// snapshotJournalKey tracks the in-memory diff layers across restarts.
snapshotJournalKey = []byte("SnapshotJournal")
// Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes).
headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td

@ -20,8 +20,6 @@ import (
"bytes"
"math/big"
"math/rand"
"os"
"path"
"testing"
"github.com/VictoriaMetrics/fastcache"
@ -343,7 +341,6 @@ func BenchmarkJournal(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
f, _, _ := layer.Journal(path.Join(os.TempDir(), "difflayer_journal.tmp"))
f.Close()
layer.Journal(new(bytes.Buffer))
}
}

@ -17,12 +17,11 @@
package snapshot
import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"time"
"github.com/VictoriaMetrics/fastcache"
@ -58,7 +57,7 @@ type journalStorage struct {
}
// loadSnapshot loads a pre-existing state snapshot backed by a key-value store.
func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, journal string, cache int, root common.Hash) (snapshot, error) {
func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash) (snapshot, error) {
// Retrieve the block number and hash of the snapshot, failing if no snapshot
// is present in the database (or crashed mid-update).
baseRoot := rawdb.ReadSnapshotRoot(diskdb)
@ -71,13 +70,13 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, journal str
cache: fastcache.New(cache * 1024 * 1024),
root: baseRoot,
}
// Open the journal, it must exist since even for 0 layer it stores whether
// Retrieve the journal, it must exist since even for 0 layer it stores whether
// we've already generated the snapshot or are in progress only
file, err := os.Open(journal)
if err != nil {
return nil, err
journal := rawdb.ReadSnapshotJournal(diskdb)
if len(journal) == 0 {
return nil, errors.New("missing or corrupted snapshot journal")
}
r := rlp.NewStream(file, 0)
r := rlp.NewStream(bytes.NewReader(journal), 0)
// Read the snapshot generation progress for the disk layer
var generator journalGenerator
@ -162,9 +161,9 @@ func loadDiffLayer(parent snapshot, r *rlp.Stream) (snapshot, error) {
return loadDiffLayer(newDiffLayer(parent, root, accountData, storageData), r)
}
// Journal is the internal version of Journal that also returns the journal file
// so subsequent layers know where to write to.
func (dl *diskLayer) Journal(path string) (io.WriteCloser, common.Hash, error) {
// Journal writes the persistent layer generator stats into a buffer to be stored
// in the database as the snapshot journal.
func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
// If the snapshot is currenty being generated, abort it
var stats *generatorStats
if dl.genAbort != nil {
@ -180,12 +179,7 @@ func (dl *diskLayer) Journal(path string) (io.WriteCloser, common.Hash, error) {
defer dl.lock.RUnlock()
if dl.stale {
return nil, common.Hash{}, ErrSnapshotStale
}
// We've reached the bottom, open the journal
file, err := os.Create(path)
if err != nil {
return nil, common.Hash{}, err
return common.Hash{}, ErrSnapshotStale
}
// Write out the generator marker
entry := journalGenerator{
@ -198,44 +192,37 @@ func (dl *diskLayer) Journal(path string) (io.WriteCloser, common.Hash, error) {
entry.Slots = stats.slots
entry.Storage = uint64(stats.storage)
}
if err := rlp.Encode(file, entry); err != nil {
file.Close()
return nil, common.Hash{}, err
if err := rlp.Encode(buffer, entry); err != nil {
return common.Hash{}, err
}
return file, dl.root, nil
return dl.root, nil
}
// Journal is the internal version of Journal that also returns the journal file
// so subsequent layers know where to write to.
func (dl *diffLayer) Journal(path string) (io.WriteCloser, common.Hash, error) {
// Journal writes the memory layer contents into a buffer to be stored in the
// database as the snapshot journal.
func (dl *diffLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
// Journal the parent first
writer, base, err := dl.parent.Journal(path)
base, err := dl.parent.Journal(buffer)
if err != nil {
return nil, common.Hash{}, err
return common.Hash{}, err
}
// Ensure the layer didn't get stale
dl.lock.RLock()
defer dl.lock.RUnlock()
if dl.stale {
writer.Close()
return nil, common.Hash{}, ErrSnapshotStale
return common.Hash{}, ErrSnapshotStale
}
// Everything below was journalled, persist this layer too
buf := bufio.NewWriter(writer)
if err := rlp.Encode(buf, dl.root); err != nil {
buf.Flush()
writer.Close()
return nil, common.Hash{}, err
if err := rlp.Encode(buffer, dl.root); err != nil {
return common.Hash{}, err
}
accounts := make([]journalAccount, 0, len(dl.accountData))
for hash, blob := range dl.accountData {
accounts = append(accounts, journalAccount{Hash: hash, Blob: blob})
}
if err := rlp.Encode(buf, accounts); err != nil {
buf.Flush()
writer.Close()
return nil, common.Hash{}, err
if err := rlp.Encode(buffer, accounts); err != nil {
return common.Hash{}, err
}
storage := make([]journalStorage, 0, len(dl.storageData))
for hash, slots := range dl.storageData {
@ -247,11 +234,8 @@ func (dl *diffLayer) Journal(path string) (io.WriteCloser, common.Hash, error) {
}
storage = append(storage, journalStorage{Hash: hash, Keys: keys, Vals: vals})
}
if err := rlp.Encode(buf, storage); err != nil {
buf.Flush()
writer.Close()
return nil, common.Hash{}, err
if err := rlp.Encode(buffer, storage); err != nil {
return common.Hash{}, err
}
buf.Flush()
return writer, base, nil
return base, nil
}

@ -21,7 +21,6 @@ import (
"bytes"
"errors"
"fmt"
"io"
"sync"
"github.com/ethereum/go-ethereum/common"
@ -112,10 +111,10 @@ type snapshot interface {
// copying everything.
Update(blockRoot common.Hash, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer
// Journal commits an entire diff hierarchy to disk into a single journal file.
// Journal commits an entire diff hierarchy to disk into a single journal entry.
// This is meant to be used during shutdown to persist the snapshot without
// flattening everything down (bad for reorgs).
Journal(path string) (io.WriteCloser, common.Hash, error)
Journal(buffer *bytes.Buffer) (common.Hash, error)
// Stale return whether this layer has become stale (was flattened across) or
// if it's still live.
@ -146,7 +145,7 @@ type Tree struct {
// If the snapshot is missing or inconsistent, the entirety is deleted and will
// be reconstructed from scratch based on the tries in the key-value store, on a
// background thread.
func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, journal string, cache int, root common.Hash) *Tree {
func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash) *Tree {
// Create a new, empty snapshot tree
snap := &Tree{
diskdb: diskdb,
@ -155,7 +154,7 @@ func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, journal string, cach
layers: make(map[common.Hash]snapshot),
}
// Attempt to load a previously persisted snapshot and rebuild one if failed
head, err := loadSnapshot(diskdb, triedb, journal, cache, root)
head, err := loadSnapshot(diskdb, triedb, cache, root)
if err != nil {
log.Warn("Failed to load snapshot, regenerating", "err", err)
snap.Rebuild(root)
@ -401,6 +400,7 @@ func diffToDisk(bottom *diffLayer) *diskLayer {
// Account was updated, push to disk
rawdb.WriteAccountSnapshot(batch, hash, data)
base.cache.Set(hash[:], data)
snapshotCleanAccountWriteMeter.Mark(int64(len(data)))
if batch.ValueSize() > ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
@ -445,6 +445,7 @@ func diffToDisk(bottom *diffLayer) *diskLayer {
if len(data) > 0 {
rawdb.WriteStorageSnapshot(batch, accountHash, storageHash, data)
base.cache.Set(append(accountHash[:], storageHash[:]...), data)
snapshotCleanStorageWriteMeter.Mark(int64(len(data)))
} else {
rawdb.DeleteStorageSnapshot(batch, accountHash, storageHash)
base.cache.Set(append(accountHash[:], storageHash[:]...), nil)
@ -484,13 +485,13 @@ func diffToDisk(bottom *diffLayer) *diskLayer {
return res
}
// Journal commits an entire diff hierarchy to disk into a single journal file.
// Journal commits an entire diff hierarchy to disk into a single journal entry.
// This is meant to be used during shutdown to persist the snapshot without
// flattening everything down (bad for reorgs).
//
// The method returns the root hash of the base layer that needs to be persisted
// to disk as a trie too to allow continuing any pending generation op.
func (t *Tree) Journal(root common.Hash, path string) (common.Hash, error) {
func (t *Tree) Journal(root common.Hash) (common.Hash, error) {
// Retrieve the head snapshot to journal from var snap snapshot
snap := t.Snapshot(root)
if snap == nil {
@ -500,11 +501,14 @@ func (t *Tree) Journal(root common.Hash, path string) (common.Hash, error) {
t.lock.Lock()
defer t.lock.Unlock()
writer, base, err := snap.(snapshot).Journal(path)
journal := new(bytes.Buffer)
base, err := snap.(snapshot).Journal(journal)
if err != nil {
return common.Hash{}, err
}
return base, writer.Close()
// Store the journal into the database and return
rawdb.WriteSnapshotJournal(t.diskdb, journal.Bytes())
return base, nil
}
// Rebuild wipes all available snapshot data from the persistent database and

@ -845,8 +845,8 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) {
if err := s.snaps.Update(root, parent, s.snapAccounts, s.snapStorage); err != nil {
log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err)
}
if err := s.snaps.Cap(root, 128); err != nil {
log.Warn("Failed to cap snapshot tree", "root", root, "layers", 128, "err", err)
if err := s.snaps.Cap(root, 127); err != nil { // Persistent layer is 128th, the last available trie
log.Warn("Failed to cap snapshot tree", "root", root, "layers", 127, "err", err)
}
}
s.snap, s.snapAccounts, s.snapStorage = nil, nil, nil

Loading…
Cancel
Save