core, eth: merge snap-sync chain download progress logs (#26676)

pull/26748/head
Péter Szilágyi 2 years ago committed by GitHub
parent 7d4db69607
commit 90d25514af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      core/blockchain.go
  2. 2
      core/headerchain.go
  3. 26
      core/rawdb/accessors_chain.go
  4. 30
      core/rawdb/ancient_scheme.go
  5. 10
      core/rawdb/chain_freezer.go
  6. 2
      core/rawdb/chain_iterator.go
  7. 2
      core/rawdb/database.go
  8. 65
      eth/downloader/downloader.go
  9. 9
      eth/downloader/queue.go

@ -1278,7 +1278,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if stats.ignored > 0 {
context = append(context, []interface{}{"ignored", stats.ignored}...)
}
log.Info("Imported new block receipts", context...)
log.Debug("Imported new block receipts", context...)
return 0, nil
}

@ -389,7 +389,7 @@ func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, start time.Time,
if res.ignored > 0 {
context = append(context, []interface{}{"ignored", res.ignored}...)
}
log.Info("Imported new block headers", context...)
log.Debug("Imported new block headers", context...)
return res.status, err
}

@ -37,7 +37,7 @@ import (
func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash {
var data []byte
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
data, _ = reader.Ancient(chainFreezerHashTable, number)
data, _ = reader.Ancient(ChainFreezerHashTable, number)
if len(data) == 0 {
// Get it by hash from leveldb
data, _ = db.Get(headerHashKey(number))
@ -334,7 +334,7 @@ func ReadHeaderRange(db ethdb.Reader, number uint64, count uint64) []rlp.RawValu
}
// read remaining from ancients
max := count * 700
data, err := db.AncientRange(chainFreezerHeaderTable, i+1-count, count, max)
data, err := db.AncientRange(ChainFreezerHeaderTable, i+1-count, count, max)
if err == nil && uint64(len(data)) == count {
// the data is on the order [h, h+1, .., n] -- reordering needed
for i := range data {
@ -351,7 +351,7 @@ func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValu
// First try to look up the data in ancient database. Extra hash
// comparison is necessary since ancient database only maintains
// the canonical data.
data, _ = reader.Ancient(chainFreezerHeaderTable, number)
data, _ = reader.Ancient(ChainFreezerHeaderTable, number)
if len(data) > 0 && crypto.Keccak256Hash(data) == hash {
return nil
}
@ -427,7 +427,7 @@ func deleteHeaderWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number
// isCanon is an internal utility method, to check whether the given number/hash
// is part of the ancient (canon) set.
func isCanon(reader ethdb.AncientReaderOp, number uint64, hash common.Hash) bool {
h, err := reader.Ancient(chainFreezerHashTable, number)
h, err := reader.Ancient(ChainFreezerHashTable, number)
if err != nil {
return false
}
@ -443,7 +443,7 @@ func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
// Check if the data is in ancients
if isCanon(reader, number, hash) {
data, _ = reader.Ancient(chainFreezerBodiesTable, number)
data, _ = reader.Ancient(ChainFreezerBodiesTable, number)
return nil
}
// If not, try reading from leveldb
@ -458,7 +458,7 @@ func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue
func ReadCanonicalBodyRLP(db ethdb.Reader, number uint64) rlp.RawValue {
var data []byte
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
data, _ = reader.Ancient(chainFreezerBodiesTable, number)
data, _ = reader.Ancient(ChainFreezerBodiesTable, number)
if len(data) > 0 {
return nil
}
@ -526,7 +526,7 @@ func ReadTdRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
// Check if the data is in ancients
if isCanon(reader, number, hash) {
data, _ = reader.Ancient(chainFreezerDifficultyTable, number)
data, _ = reader.Ancient(ChainFreezerDifficultyTable, number)
return nil
}
// If not, try reading from leveldb
@ -586,7 +586,7 @@ func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawVa
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
// Check if the data is in ancients
if isCanon(reader, number, hash) {
data, _ = reader.Ancient(chainFreezerReceiptTable, number)
data, _ = reader.Ancient(ChainFreezerReceiptTable, number)
return nil
}
// If not, try reading from leveldb
@ -787,19 +787,19 @@ func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts
func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *types.Header, receipts []*types.ReceiptForStorage, td *big.Int) error {
num := block.NumberU64()
if err := op.AppendRaw(chainFreezerHashTable, num, block.Hash().Bytes()); err != nil {
if err := op.AppendRaw(ChainFreezerHashTable, num, block.Hash().Bytes()); err != nil {
return fmt.Errorf("can't add block %d hash: %v", num, err)
}
if err := op.Append(chainFreezerHeaderTable, num, header); err != nil {
if err := op.Append(ChainFreezerHeaderTable, num, header); err != nil {
return fmt.Errorf("can't append block header %d: %v", num, err)
}
if err := op.Append(chainFreezerBodiesTable, num, block.Body()); err != nil {
if err := op.Append(ChainFreezerBodiesTable, num, block.Body()); err != nil {
return fmt.Errorf("can't append block body %d: %v", num, err)
}
if err := op.Append(chainFreezerReceiptTable, num, receipts); err != nil {
if err := op.Append(ChainFreezerReceiptTable, num, receipts); err != nil {
return fmt.Errorf("can't append block %d receipts: %v", num, err)
}
if err := op.Append(chainFreezerDifficultyTable, num, td); err != nil {
if err := op.Append(ChainFreezerDifficultyTable, num, td); err != nil {
return fmt.Errorf("can't append block %d total difficulty: %v", num, err)
}
return nil

@ -18,30 +18,30 @@ package rawdb
// The list of table names of chain freezer.
const (
// chainFreezerHeaderTable indicates the name of the freezer header table.
chainFreezerHeaderTable = "headers"
// ChainFreezerHeaderTable indicates the name of the freezer header table.
ChainFreezerHeaderTable = "headers"
// chainFreezerHashTable indicates the name of the freezer canonical hash table.
chainFreezerHashTable = "hashes"
// ChainFreezerHashTable indicates the name of the freezer canonical hash table.
ChainFreezerHashTable = "hashes"
// chainFreezerBodiesTable indicates the name of the freezer block body table.
chainFreezerBodiesTable = "bodies"
// ChainFreezerBodiesTable indicates the name of the freezer block body table.
ChainFreezerBodiesTable = "bodies"
// chainFreezerReceiptTable indicates the name of the freezer receipts table.
chainFreezerReceiptTable = "receipts"
// ChainFreezerReceiptTable indicates the name of the freezer receipts table.
ChainFreezerReceiptTable = "receipts"
// chainFreezerDifficultyTable indicates the name of the freezer total difficulty table.
chainFreezerDifficultyTable = "diffs"
// ChainFreezerDifficultyTable indicates the name of the freezer total difficulty table.
ChainFreezerDifficultyTable = "diffs"
)
// chainFreezerNoSnappy configures whether compression is disabled for the ancient-tables.
// Hashes and difficulties don't compress well.
var chainFreezerNoSnappy = map[string]bool{
chainFreezerHeaderTable: false,
chainFreezerHashTable: true,
chainFreezerBodiesTable: false,
chainFreezerReceiptTable: false,
chainFreezerDifficultyTable: true,
ChainFreezerHeaderTable: false,
ChainFreezerHashTable: true,
ChainFreezerBodiesTable: false,
ChainFreezerReceiptTable: false,
ChainFreezerDifficultyTable: true,
}
// The list of identifiers of ancient stores.

@ -280,19 +280,19 @@ func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hash
}
// Write to the batch.
if err := op.AppendRaw(chainFreezerHashTable, number, hash[:]); err != nil {
if err := op.AppendRaw(ChainFreezerHashTable, number, hash[:]); err != nil {
return fmt.Errorf("can't write hash to Freezer: %v", err)
}
if err := op.AppendRaw(chainFreezerHeaderTable, number, header); err != nil {
if err := op.AppendRaw(ChainFreezerHeaderTable, number, header); err != nil {
return fmt.Errorf("can't write header to Freezer: %v", err)
}
if err := op.AppendRaw(chainFreezerBodiesTable, number, body); err != nil {
if err := op.AppendRaw(ChainFreezerBodiesTable, number, body); err != nil {
return fmt.Errorf("can't write body to Freezer: %v", err)
}
if err := op.AppendRaw(chainFreezerReceiptTable, number, receipts); err != nil {
if err := op.AppendRaw(ChainFreezerReceiptTable, number, receipts); err != nil {
return fmt.Errorf("can't write receipts to Freezer: %v", err)
}
if err := op.AppendRaw(chainFreezerDifficultyTable, number, td); err != nil {
if err := op.AppendRaw(ChainFreezerDifficultyTable, number, td); err != nil {
return fmt.Errorf("can't write td to Freezer: %v", err)
}

@ -50,7 +50,7 @@ func InitDatabaseFromFreezer(db ethdb.Database) {
if i+count > frozen {
count = frozen - i
}
data, err := db.AncientRange(chainFreezerHashTable, i, count, 32*count)
data, err := db.AncientRange(ChainFreezerHashTable, i, count, 32*count)
if err != nil {
log.Crit("Failed to init database from freezer", "err", err)
}

@ -231,7 +231,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st
// If the freezer already contains something, ensure that the genesis blocks
// match, otherwise we might mix up freezers across chains and destroy both
// the freezer and the key-value store.
frgenesis, err := frdb.Ancient(chainFreezerHashTable, 0)
frgenesis, err := frdb.Ancient(ChainFreezerHashTable, 0)
if err != nil {
return nil, fmt.Errorf("failed to retrieve genesis from ancient %v", err)
} else if !bytes.Equal(kvgenesis, frgenesis) {

@ -154,6 +154,11 @@ type Downloader struct {
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
// Progress reporting metrics
syncStartBlock uint64 // Head snap block when Geth was started
syncStartTime time.Time // Time instance when chain sync started
syncLogTime time.Time // Time instance when status was last reported
}
// LightChain encapsulates functions required to synchronise a light chain.
@ -231,7 +236,9 @@ func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain Bl
quitCh: make(chan struct{}),
SnapSyncer: snap.NewSyncer(stateDb, chain.TrieDB().Scheme()),
stateSyncStart: make(chan *stateSync),
syncStartBlock: chain.CurrentFastBlock().NumberU64(),
}
// Create the post-merge skeleton syncer and start the process
dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success))
go dl.stateFetcher()
@ -1614,6 +1621,7 @@ func (d *Downloader) processSnapSyncContent() error {
if len(results) == 0 {
// If pivot sync is done, stop
if oldPivot == nil {
d.reportSnapSyncProgress(true)
return sync.Cancel()
}
// If sync failed, stop
@ -1627,6 +1635,8 @@ func (d *Downloader) processSnapSyncContent() error {
if d.chainInsertHook != nil {
d.chainInsertHook(results)
}
d.reportSnapSyncProgress(false)
// If we haven't downloaded the pivot block yet, check pivot staleness
// notifications from the header downloader
d.pivotLock.RLock()
@ -1739,7 +1749,7 @@ func (d *Downloader) commitSnapSyncData(results []*fetchResult, stateSync *state
}
default:
}
// Retrieve the a batch of results to import
// Retrieve the batch of results to import
first, last := results[0].Header, results[len(results)-1].Header
log.Debug("Inserting snap-sync blocks", "items", len(results),
"firstnum", first.Number, "firsthash", first.Hash(),
@ -1820,3 +1830,56 @@ func (d *Downloader) readHeaderRange(last *types.Header, count int) []*types.Hea
}
return headers
}
// reportSnapSyncProgress calculates various status reports and provides it to the user.
func (d *Downloader) reportSnapSyncProgress(force bool) {
// Initialize the sync start time if it's the first time we're reporting
if d.syncStartTime.IsZero() {
d.syncStartTime = time.Now().Add(-time.Millisecond) // -1ms offset to avoid division by zero
}
// Don't report all the events, just occasionally
if !force && time.Since(d.syncLogTime) < 8*time.Second {
return
}
// Don't report anything until we have a meaningful progress
var (
headerBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerHeaderTable)
bodyBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerBodiesTable)
receiptBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerReceiptTable)
)
syncedBytes := common.StorageSize(headerBytes + bodyBytes + receiptBytes)
if syncedBytes == 0 {
return
}
var (
header = d.blockchain.CurrentHeader()
block = d.blockchain.CurrentFastBlock()
)
syncedBlocks := block.NumberU64() - d.syncStartBlock
if syncedBlocks == 0 {
return
}
// Retrieve the current chain head and calculate the ETA
latest, _, err := d.skeleton.Bounds()
if err != nil {
// We're going to cheat for non-merged networks, but that's fine
latest = d.pivotHeader
}
if latest == nil {
// This should really never happen, but add some defensive code for now.
// TODO(karalabe): Remove it eventually if we don't see it blow.
log.Error("Nil latest block in sync progress report")
return
}
var (
left = latest.Number.Uint64() - block.NumberU64()
eta = time.Since(d.syncStartTime) / time.Duration(syncedBlocks) * time.Duration(left)
progress = fmt.Sprintf("%.2f%%", float64(block.NumberU64())*100/float64(latest.Number.Uint64()))
headers = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(header.Number.Uint64()), common.StorageSize(headerBytes).TerminalString())
bodies = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(block.NumberU64()), common.StorageSize(bodyBytes).TerminalString())
receipts = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(block.NumberU64()), common.StorageSize(receiptBytes).TerminalString())
)
log.Info("Syncing: chain download in progress", "synced", progress, "chain", syncedBytes, "headers", headers, "bodies", bodies, "receipts", receipts, "eta", common.PrettyDuration(eta))
d.syncLogTime = time.Now()
}

@ -144,7 +144,7 @@ type queue struct {
active *sync.Cond
closed bool
lastStatLog time.Time
logTime time.Time // Time instance when status was last reported
}
// newQueue creates a new download queue for scheduling block retrieval.
@ -390,11 +390,12 @@ func (q *queue) Results(block bool) []*fetchResult {
}
}
// Log some info at certain times
if time.Since(q.lastStatLog) > 60*time.Second {
q.lastStatLog = time.Now()
if time.Since(q.logTime) >= 60*time.Second {
q.logTime = time.Now()
info := q.Stats()
info = append(info, "throttle", throttleThreshold)
log.Info("Downloader queue stats", info...)
log.Debug("Downloader queue stats", info...)
}
return results
}

Loading…
Cancel
Save