From 7b7b327ff294201fa4f7cf460c4a73a4212c06cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 9 Nov 2020 16:03:58 +0200 Subject: [PATCH] core/state/snapshot: update generator marker in sync with flushes --- core/blockchain_snapshot_test.go | 4 +-- core/state/snapshot/generate.go | 48 ++++++++++++++++++++++++++++++-- core/state/snapshot/journal.go | 30 ++++++-------------- core/state/snapshot/snapshot.go | 18 ++---------- 4 files changed, 58 insertions(+), 42 deletions(-) diff --git a/core/blockchain_snapshot_test.go b/core/blockchain_snapshot_test.go index 5a986bf52c..e8d3b2470a 100644 --- a/core/blockchain_snapshot_test.go +++ b/core/blockchain_snapshot_test.go @@ -675,7 +675,7 @@ func testSnapshot(t *testing.T, tt *snapshotTest) { if _, err := chain.InsertChain(blocks[startPoint:]); err != nil { t.Fatalf("Failed to import canonical chain tail: %v", err) } - // Set the flag for writing legacy journal if ncessary + // Set the flag for writing legacy journal if necessary if tt.legacy { chain.writeLegacyJournal = true } @@ -708,7 +708,6 @@ func testSnapshot(t *testing.T, tt *snapshotTest) { } else if tt.gapped > 0 { // Insert blocks without enabling snapshot if gapping is required. chain.Stop() - gappedBlocks, _ := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], engine, gendb, tt.gapped, func(i int, b *BlockGen) {}) // Insert a few more blocks without enabling snapshot @@ -766,6 +765,7 @@ func testSnapshot(t *testing.T, tt *snapshotTest) { defer chain.Stop() } else { chain.Stop() + // Restart the chain normally chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil) if err != nil { diff --git a/core/state/snapshot/generate.go b/core/state/snapshot/generate.go index 566f7d94a8..92c7640c40 100644 --- a/core/state/snapshot/generate.go +++ b/core/state/snapshot/generate.go @@ -19,6 +19,7 @@ package snapshot import ( "bytes" "encoding/binary" + "fmt" "math/big" "time" @@ -116,6 +117,38 @@ func generateSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache i return base } +// journalProgress persists the generator stats into the database to resume later. +func journalProgress(db ethdb.KeyValueWriter, marker []byte, stats *generatorStats) { + // Write out the generator marker. Note it's a standalone disk layer generator + // which is not mixed with journal. It's ok if the generator is persisted while + // journal is not. + entry := journalGenerator{ + Done: marker == nil, + Marker: marker, + } + if stats != nil { + entry.Wiping = (stats.wiping != nil) + entry.Accounts = stats.accounts + entry.Slots = stats.slots + entry.Storage = uint64(stats.storage) + } + blob, err := rlp.EncodeToBytes(entry) + if err != nil { + panic(err) // Cannot happen, here to catch dev errors + } + var logstr string + switch len(marker) { + case 0: + logstr = "done" + case common.HashLength: + logstr = fmt.Sprintf("%#x", marker) + default: + logstr = fmt.Sprintf("%#x:%#x", marker[:common.HashLength], marker[common.HashLength:]) + } + log.Debug("Journalled generator progress", "progress", logstr) + rawdb.WriteSnapshotGenerator(db, blob) +} + // generate is a background thread that iterates over the state and storage tries, // constructing the state snapshot. All the arguments are purely for statistics // gethering and logging, since the method surfs the blocks as they arrive, often @@ -187,11 +220,15 @@ func (dl *diskLayer) generate(stats *generatorStats) { if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil { // Only write and set the marker if we actually did something useful if batch.ValueSize() > 0 { + // Ensure the generator entry is in sync with the data + marker := accountHash[:] + journalProgress(batch, marker, stats) + batch.Write() batch.Reset() dl.lock.Lock() - dl.genMarker = accountHash[:] + dl.genMarker = marker dl.lock.Unlock() } if abort != nil { @@ -228,11 +265,15 @@ func (dl *diskLayer) generate(stats *generatorStats) { if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil { // Only write and set the marker if we actually did something useful if batch.ValueSize() > 0 { + // Ensure the generator entry is in sync with the data + marker := append(accountHash[:], storeIt.Key...) + journalProgress(batch, marker, stats) + batch.Write() batch.Reset() dl.lock.Lock() - dl.genMarker = append(accountHash[:], storeIt.Key...) + dl.genMarker = marker dl.lock.Unlock() } if abort != nil { @@ -264,6 +305,9 @@ func (dl *diskLayer) generate(stats *generatorStats) { } // Snapshot fully generated, set the marker to nil if batch.ValueSize() > 0 { + // Ensure the generator entry is in sync with the data + journalProgress(batch, nil, stats) + batch.Write() } log.Info("Generated state snapshot", "accounts", stats.accounts, "slots", stats.slots, diff --git a/core/state/snapshot/journal.go b/core/state/snapshot/journal.go index 3fbecdefdc..178ba08902 100644 --- a/core/state/snapshot/journal.go +++ b/core/state/snapshot/journal.go @@ -276,8 +276,8 @@ func loadDiffLayer(parent snapshot, r *rlp.Stream) (snapshot, error) { return loadDiffLayer(newDiffLayer(parent, root, destructSet, accountData, storageData), r) } -// Journal writes the persistent layer generator stats into a buffer to be stored -// in the database as the snapshot journal. +// Journal terminates any in-progress snapshot generation, also implicitly pushing +// the progress into the database. func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) { // If the snapshot is currently being generated, abort it var stats *generatorStats @@ -296,25 +296,10 @@ func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) { if dl.stale { return common.Hash{}, ErrSnapshotStale } - // Write out the generator marker. Note it's a standalone disk layer generator - // which is not mixed with journal. It's ok if the generator is persisted while - // journal is not. - entry := journalGenerator{ - Done: dl.genMarker == nil, - Marker: dl.genMarker, - } - if stats != nil { - entry.Wiping = (stats.wiping != nil) - entry.Accounts = stats.accounts - entry.Slots = stats.slots - entry.Storage = uint64(stats.storage) - } - blob, err := rlp.EncodeToBytes(entry) - if err != nil { - return common.Hash{}, err - } - log.Debug("Journalled disk layer", "root", dl.root, "complete", dl.genMarker == nil) - rawdb.WriteSnapshotGenerator(dl.diskdb, blob) + // Ensure the generator stats is written even if none was ran this cycle + journalProgress(dl.diskdb, dl.genMarker, stats) + + log.Debug("Journalled disk layer", "root", dl.root) return dl.root, nil } @@ -401,6 +386,7 @@ func (dl *diskLayer) LegacyJournal(buffer *bytes.Buffer) (common.Hash, error) { entry.Slots = stats.slots entry.Storage = uint64(stats.storage) } + log.Debug("Legacy journalled disk layer", "root", dl.root) if err := rlp.Encode(buffer, entry); err != nil { return common.Hash{}, err } @@ -455,6 +441,6 @@ func (dl *diffLayer) LegacyJournal(buffer *bytes.Buffer) (common.Hash, error) { if err := rlp.Encode(buffer, storage); err != nil { return common.Hash{}, err } - log.Debug("Journalled diff layer", "root", dl.root, "parent", dl.parent.Root()) + log.Debug("Legacy journalled disk layer", "root", dl.root, "parent", dl.parent.Root()) return base, nil } diff --git a/core/state/snapshot/snapshot.go b/core/state/snapshot/snapshot.go index 6ad4451ea3..60b4158b56 100644 --- a/core/state/snapshot/snapshot.go +++ b/core/state/snapshot/snapshot.go @@ -512,22 +512,8 @@ func diffToDisk(bottom *diffLayer) *diskLayer { // Update the snapshot block marker and write any remainder data rawdb.WriteSnapshotRoot(batch, bottom.root) - // Write out the generator marker - entry := journalGenerator{ - Done: base.genMarker == nil, - Marker: base.genMarker, - } - if stats != nil { - entry.Wiping = (stats.wiping != nil) - entry.Accounts = stats.accounts - entry.Slots = stats.slots - entry.Storage = uint64(stats.storage) - } - blob, err := rlp.EncodeToBytes(entry) - if err != nil { - panic(fmt.Sprintf("Failed to RLP encode generator %v", err)) - } - rawdb.WriteSnapshotGenerator(batch, blob) + // Write out the generator progress marker and report + journalProgress(batch, base.genMarker, stats) // Flush all the updates in the single db operation. Ensure the // disk layer transition is atomic.