From ca473b81cbe4a96cde4e8424c49b15ab304787bb Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Mon, 4 Mar 2024 22:25:53 +0800 Subject: [PATCH] core: use finalized block as the chain freeze indicator (#28683) * core: use finalized block as the chain freeze indicator * core/rawdb: use max(finality, head-90k) as chain freezing threshold * core/rawdb: fix tests * core/rawdb: fix lint * core/rawdb: address comments from peter * core/rawdb: fix typo --- core/blockchain_repair_test.go | 10 ++- core/blockchain_sethead_test.go | 8 ++- core/rawdb/chain_freezer.go | 113 ++++++++++++++++++++------------ core/rawdb/database.go | 8 +-- 4 files changed, 84 insertions(+), 55 deletions(-) diff --git a/core/blockchain_repair_test.go b/core/blockchain_repair_test.go index b2df39d17b..b6a299f8ba 100644 --- a/core/blockchain_repair_test.go +++ b/core/blockchain_repair_test.go @@ -1757,7 +1757,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) { func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme string) { // It's hard to follow the test case, visualize the input - //log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) + // log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) // fmt.Println(tt.dump(true)) // Create a temporary persistent database @@ -1830,10 +1830,14 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s } // Force run a freeze cycle type freezer interface { - Freeze(threshold uint64) error + Freeze() error Ancients() (uint64, error) } - db.(freezer).Freeze(tt.freezeThreshold) + if tt.freezeThreshold < uint64(tt.canonicalBlocks) { + final := uint64(tt.canonicalBlocks) - tt.freezeThreshold + chain.SetFinalized(canonblocks[int(final)-1].Header()) + } + db.(freezer).Freeze() // Set the simulated pivot block if tt.pivotBlock != nil { diff --git a/core/blockchain_sethead_test.go b/core/blockchain_sethead_test.go index 1504c74e0e..b96ee12c99 100644 --- a/core/blockchain_sethead_test.go +++ b/core/blockchain_sethead_test.go @@ -2044,10 +2044,14 @@ func testSetHeadWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme // Force run a freeze cycle type freezer interface { - Freeze(threshold uint64) error + Freeze() error Ancients() (uint64, error) } - db.(freezer).Freeze(tt.freezeThreshold) + if tt.freezeThreshold < uint64(tt.canonicalBlocks) { + final := uint64(tt.canonicalBlocks) - tt.freezeThreshold + chain.SetFinalized(canonblocks[int(final)-1].Header()) + } + db.(freezer).Freeze() // Set the simulated pivot block if tt.pivotBlock != nil { diff --git a/core/rawdb/chain_freezer.go b/core/rawdb/chain_freezer.go index bb2c409dbb..d8214874bd 100644 --- a/core/rawdb/chain_freezer.go +++ b/core/rawdb/chain_freezer.go @@ -17,9 +17,9 @@ package rawdb import ( + "errors" "fmt" "sync" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -43,8 +43,6 @@ const ( // The background thread will keep moving ancient chain segments from key-value // database to flat files for saving space on live database. type chainFreezer struct { - threshold atomic.Uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests) - *Freezer quit chan struct{} wg sync.WaitGroup @@ -57,13 +55,11 @@ func newChainFreezer(datadir string, namespace string, readonly bool) (*chainFre if err != nil { return nil, err } - cf := chainFreezer{ + return &chainFreezer{ Freezer: freezer, quit: make(chan struct{}), trigger: make(chan chan struct{}), - } - cf.threshold.Store(params.FullImmutabilityThreshold) - return &cf, nil + }, nil } // Close closes the chain freezer instance and terminates the background thread. @@ -77,6 +73,57 @@ func (f *chainFreezer) Close() error { return f.Freezer.Close() } +// readHeadNumber returns the number of chain head block. 0 is returned if the +// block is unknown or not available yet. +func (f *chainFreezer) readHeadNumber(db ethdb.KeyValueReader) uint64 { + hash := ReadHeadBlockHash(db) + if hash == (common.Hash{}) { + log.Error("Head block is not reachable") + return 0 + } + number := ReadHeaderNumber(db, hash) + if number == nil { + log.Error("Number of head block is missing") + return 0 + } + return *number +} + +// readFinalizedNumber returns the number of finalized block. 0 is returned +// if the block is unknown or not available yet. +func (f *chainFreezer) readFinalizedNumber(db ethdb.KeyValueReader) uint64 { + hash := ReadFinalizedBlockHash(db) + if hash == (common.Hash{}) { + return 0 + } + number := ReadHeaderNumber(db, hash) + if number == nil { + log.Error("Number of finalized block is missing") + return 0 + } + return *number +} + +// freezeThreshold returns the threshold for chain freezing. It's determined +// by formula: max(finality, HEAD-params.FullImmutabilityThreshold). +func (f *chainFreezer) freezeThreshold(db ethdb.KeyValueReader) (uint64, error) { + var ( + head = f.readHeadNumber(db) + final = f.readFinalizedNumber(db) + headLimit uint64 + ) + if head > params.FullImmutabilityThreshold { + headLimit = head - params.FullImmutabilityThreshold + } + if final == 0 && headLimit == 0 { + return 0, errors.New("freezing threshold is not available") + } + if final > headLimit { + return final, nil + } + return headLimit, nil +} + // freeze is a background thread that periodically checks the blockchain for any // import progress and moves ancient data from the fast database into the freezer. // @@ -114,60 +161,39 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) { return } } - // Retrieve the freezing threshold. - hash := ReadHeadBlockHash(nfdb) - if hash == (common.Hash{}) { - log.Debug("Current full block hash unavailable") // new chain, empty database + threshold, err := f.freezeThreshold(nfdb) + if err != nil { backoff = true + log.Debug("Current full block not old enough to freeze", "err", err) continue } - number := ReadHeaderNumber(nfdb, hash) - threshold := f.threshold.Load() frozen := f.frozen.Load() - switch { - case number == nil: - log.Error("Current full block number unavailable", "hash", hash) - backoff = true - continue - case *number < threshold: - log.Debug("Current full block not old enough to freeze", "number", *number, "hash", hash, "delay", threshold) - backoff = true - continue - - case *number-threshold <= frozen: - log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", frozen) + // Short circuit if the blocks below threshold are already frozen. + if frozen != 0 && frozen-1 >= threshold { backoff = true + log.Debug("Ancient blocks frozen already", "threshold", threshold, "frozen", frozen) continue } - head := ReadHeader(nfdb, hash, *number) - if head == nil { - log.Error("Current full block unavailable", "number", *number, "hash", hash) - backoff = true - continue - } - // Seems we have data ready to be frozen, process in usable batches var ( - start = time.Now() - first, _ = f.Ancients() - limit = *number - threshold + start = time.Now() + first = frozen // the first block to freeze + last = threshold // the last block to freeze ) - if limit-first > freezerBatchLimit { - limit = first + freezerBatchLimit + if last-first+1 > freezerBatchLimit { + last = freezerBatchLimit + first - 1 } - ancients, err := f.freezeRange(nfdb, first, limit) + ancients, err := f.freezeRange(nfdb, first, last) if err != nil { log.Error("Error in block freeze operation", "err", err) backoff = true continue } - // Batch of blocks have been frozen, flush them before wiping from leveldb if err := f.Sync(); err != nil { log.Crit("Failed to flush frozen tables", "err", err) } - // Wipe out all data from the active database batch := db.NewBatch() for i := 0; i < len(ancients); i++ { @@ -250,8 +276,11 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) { } } +// freezeRange moves a batch of chain segments from the fast database to the freezer. +// The parameters (number, limit) specify the relevant block range, both of which +// are included. func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hashes []common.Hash, err error) { - hashes = make([]common.Hash, 0, limit-number) + hashes = make([]common.Hash, 0, limit-number+1) _, err = f.ModifyAncients(func(op ethdb.AncientWriteOp) error { for ; number <= limit; number++ { @@ -293,11 +322,9 @@ func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hash if err := op.AppendRaw(ChainFreezerDifficultyTable, number, td); err != nil { return fmt.Errorf("can't write td to Freezer: %v", err) } - hashes = append(hashes, hash) } return nil }) - return hashes, err } diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 27a9ec7412..9cab30bfcd 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -66,16 +66,10 @@ func (frdb *freezerdb) Close() error { // Freeze is a helper method used for external testing to trigger and block until // a freeze cycle completes, without having to sleep for a minute to trigger the // automatic background run. -func (frdb *freezerdb) Freeze(threshold uint64) error { +func (frdb *freezerdb) Freeze() error { if frdb.AncientStore.(*chainFreezer).readonly { return errReadOnly } - // Set the freezer threshold to a temporary value - defer func(old uint64) { - frdb.AncientStore.(*chainFreezer).threshold.Store(old) - }(frdb.AncientStore.(*chainFreezer).threshold.Load()) - frdb.AncientStore.(*chainFreezer).threshold.Store(threshold) - // Trigger a freeze cycle and block until it's done trigger := make(chan struct{}, 1) frdb.AncientStore.(*chainFreezer).trigger <- trigger