Merge pull request #21804 from karalabe/snapshot-marker-sync

core/state/snapshot: update generator marker in sync with flushes
pull/21807/head
Péter Szilágyi 4 years ago committed by GitHub
commit 7c30f4d085
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      core/blockchain_snapshot_test.go
  2. 48
      core/state/snapshot/generate.go
  3. 30
      core/state/snapshot/journal.go
  4. 18
      core/state/snapshot/snapshot.go

@ -675,7 +675,7 @@ func testSnapshot(t *testing.T, tt *snapshotTest) {
if _, err := chain.InsertChain(blocks[startPoint:]); err != nil { if _, err := chain.InsertChain(blocks[startPoint:]); err != nil {
t.Fatalf("Failed to import canonical chain tail: %v", err) t.Fatalf("Failed to import canonical chain tail: %v", err)
} }
// Set the flag for writing legacy journal if ncessary // Set the flag for writing legacy journal if necessary
if tt.legacy { if tt.legacy {
chain.writeLegacyJournal = true chain.writeLegacyJournal = true
} }
@ -708,7 +708,6 @@ func testSnapshot(t *testing.T, tt *snapshotTest) {
} else if tt.gapped > 0 { } else if tt.gapped > 0 {
// Insert blocks without enabling snapshot if gapping is required. // Insert blocks without enabling snapshot if gapping is required.
chain.Stop() chain.Stop()
gappedBlocks, _ := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], engine, gendb, tt.gapped, func(i int, b *BlockGen) {}) gappedBlocks, _ := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], engine, gendb, tt.gapped, func(i int, b *BlockGen) {})
// Insert a few more blocks without enabling snapshot // Insert a few more blocks without enabling snapshot
@ -766,6 +765,7 @@ func testSnapshot(t *testing.T, tt *snapshotTest) {
defer chain.Stop() defer chain.Stop()
} else { } else {
chain.Stop() chain.Stop()
// Restart the chain normally // Restart the chain normally
chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil) chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil)
if err != nil { if err != nil {

@ -19,6 +19,7 @@ package snapshot
import ( import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"fmt"
"math/big" "math/big"
"time" "time"
@ -116,6 +117,38 @@ func generateSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache i
return base return base
} }
// journalProgress persists the generator stats into the database to resume later.
func journalProgress(db ethdb.KeyValueWriter, marker []byte, stats *generatorStats) {
// Write out the generator marker. Note it's a standalone disk layer generator
// which is not mixed with journal. It's ok if the generator is persisted while
// journal is not.
entry := journalGenerator{
Done: marker == nil,
Marker: marker,
}
if stats != nil {
entry.Wiping = (stats.wiping != nil)
entry.Accounts = stats.accounts
entry.Slots = stats.slots
entry.Storage = uint64(stats.storage)
}
blob, err := rlp.EncodeToBytes(entry)
if err != nil {
panic(err) // Cannot happen, here to catch dev errors
}
var logstr string
switch len(marker) {
case 0:
logstr = "done"
case common.HashLength:
logstr = fmt.Sprintf("%#x", marker)
default:
logstr = fmt.Sprintf("%#x:%#x", marker[:common.HashLength], marker[common.HashLength:])
}
log.Debug("Journalled generator progress", "progress", logstr)
rawdb.WriteSnapshotGenerator(db, blob)
}
// generate is a background thread that iterates over the state and storage tries, // generate is a background thread that iterates over the state and storage tries,
// constructing the state snapshot. All the arguments are purely for statistics // constructing the state snapshot. All the arguments are purely for statistics
// gethering and logging, since the method surfs the blocks as they arrive, often // gethering and logging, since the method surfs the blocks as they arrive, often
@ -187,11 +220,15 @@ func (dl *diskLayer) generate(stats *generatorStats) {
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil { if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
// Only write and set the marker if we actually did something useful // Only write and set the marker if we actually did something useful
if batch.ValueSize() > 0 { if batch.ValueSize() > 0 {
// Ensure the generator entry is in sync with the data
marker := accountHash[:]
journalProgress(batch, marker, stats)
batch.Write() batch.Write()
batch.Reset() batch.Reset()
dl.lock.Lock() dl.lock.Lock()
dl.genMarker = accountHash[:] dl.genMarker = marker
dl.lock.Unlock() dl.lock.Unlock()
} }
if abort != nil { if abort != nil {
@ -228,11 +265,15 @@ func (dl *diskLayer) generate(stats *generatorStats) {
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil { if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
// Only write and set the marker if we actually did something useful // Only write and set the marker if we actually did something useful
if batch.ValueSize() > 0 { if batch.ValueSize() > 0 {
// Ensure the generator entry is in sync with the data
marker := append(accountHash[:], storeIt.Key...)
journalProgress(batch, marker, stats)
batch.Write() batch.Write()
batch.Reset() batch.Reset()
dl.lock.Lock() dl.lock.Lock()
dl.genMarker = append(accountHash[:], storeIt.Key...) dl.genMarker = marker
dl.lock.Unlock() dl.lock.Unlock()
} }
if abort != nil { if abort != nil {
@ -264,6 +305,9 @@ func (dl *diskLayer) generate(stats *generatorStats) {
} }
// Snapshot fully generated, set the marker to nil // Snapshot fully generated, set the marker to nil
if batch.ValueSize() > 0 { if batch.ValueSize() > 0 {
// Ensure the generator entry is in sync with the data
journalProgress(batch, nil, stats)
batch.Write() batch.Write()
} }
log.Info("Generated state snapshot", "accounts", stats.accounts, "slots", stats.slots, log.Info("Generated state snapshot", "accounts", stats.accounts, "slots", stats.slots,

@ -276,8 +276,8 @@ func loadDiffLayer(parent snapshot, r *rlp.Stream) (snapshot, error) {
return loadDiffLayer(newDiffLayer(parent, root, destructSet, accountData, storageData), r) return loadDiffLayer(newDiffLayer(parent, root, destructSet, accountData, storageData), r)
} }
// Journal writes the persistent layer generator stats into a buffer to be stored // Journal terminates any in-progress snapshot generation, also implicitly pushing
// in the database as the snapshot journal. // the progress into the database.
func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) { func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
// If the snapshot is currently being generated, abort it // If the snapshot is currently being generated, abort it
var stats *generatorStats var stats *generatorStats
@ -296,25 +296,10 @@ func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
if dl.stale { if dl.stale {
return common.Hash{}, ErrSnapshotStale return common.Hash{}, ErrSnapshotStale
} }
// Write out the generator marker. Note it's a standalone disk layer generator // Ensure the generator stats is written even if none was ran this cycle
// which is not mixed with journal. It's ok if the generator is persisted while journalProgress(dl.diskdb, dl.genMarker, stats)
// journal is not.
entry := journalGenerator{ log.Debug("Journalled disk layer", "root", dl.root)
Done: dl.genMarker == nil,
Marker: dl.genMarker,
}
if stats != nil {
entry.Wiping = (stats.wiping != nil)
entry.Accounts = stats.accounts
entry.Slots = stats.slots
entry.Storage = uint64(stats.storage)
}
blob, err := rlp.EncodeToBytes(entry)
if err != nil {
return common.Hash{}, err
}
log.Debug("Journalled disk layer", "root", dl.root, "complete", dl.genMarker == nil)
rawdb.WriteSnapshotGenerator(dl.diskdb, blob)
return dl.root, nil return dl.root, nil
} }
@ -401,6 +386,7 @@ func (dl *diskLayer) LegacyJournal(buffer *bytes.Buffer) (common.Hash, error) {
entry.Slots = stats.slots entry.Slots = stats.slots
entry.Storage = uint64(stats.storage) entry.Storage = uint64(stats.storage)
} }
log.Debug("Legacy journalled disk layer", "root", dl.root)
if err := rlp.Encode(buffer, entry); err != nil { if err := rlp.Encode(buffer, entry); err != nil {
return common.Hash{}, err return common.Hash{}, err
} }
@ -455,6 +441,6 @@ func (dl *diffLayer) LegacyJournal(buffer *bytes.Buffer) (common.Hash, error) {
if err := rlp.Encode(buffer, storage); err != nil { if err := rlp.Encode(buffer, storage); err != nil {
return common.Hash{}, err return common.Hash{}, err
} }
log.Debug("Journalled diff layer", "root", dl.root, "parent", dl.parent.Root()) log.Debug("Legacy journalled disk layer", "root", dl.root, "parent", dl.parent.Root())
return base, nil return base, nil
} }

@ -512,22 +512,8 @@ func diffToDisk(bottom *diffLayer) *diskLayer {
// Update the snapshot block marker and write any remainder data // Update the snapshot block marker and write any remainder data
rawdb.WriteSnapshotRoot(batch, bottom.root) rawdb.WriteSnapshotRoot(batch, bottom.root)
// Write out the generator marker // Write out the generator progress marker and report
entry := journalGenerator{ journalProgress(batch, base.genMarker, stats)
Done: base.genMarker == nil,
Marker: base.genMarker,
}
if stats != nil {
entry.Wiping = (stats.wiping != nil)
entry.Accounts = stats.accounts
entry.Slots = stats.slots
entry.Storage = uint64(stats.storage)
}
blob, err := rlp.EncodeToBytes(entry)
if err != nil {
panic(fmt.Sprintf("Failed to RLP encode generator %v", err))
}
rawdb.WriteSnapshotGenerator(batch, blob)
// Flush all the updates in the single db operation. Ensure the // Flush all the updates in the single db operation. Ensure the
// disk layer transition is atomic. // disk layer transition is atomic.

Loading…
Cancel
Save