diff --git a/core/blockchain.go b/core/blockchain.go index d580d708d9..02c0bbaad1 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -23,6 +23,7 @@ import ( "io" "math/big" "runtime" + "slices" "strings" "sync" "sync/atomic" @@ -1435,7 +1436,7 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e func (bc *BlockChain) writeKnownBlock(block *types.Block) error { current := bc.CurrentBlock() if block.ParentHash() != current.Hash() { - if err := bc.reorg(current, block); err != nil { + if err := bc.reorg(current, block.Header()); err != nil { return err } } @@ -1541,7 +1542,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types // Reorganise the chain if the parent is not the head block if block.ParentHash() != currentBlock.Hash() { - if err := bc.reorg(currentBlock, block); err != nil { + if err := bc.reorg(currentBlock, block.Header()); err != nil { return NonStatTy, err } } @@ -2154,8 +2155,8 @@ func (bc *BlockChain) recoverAncestors(block *types.Block, makeWitness bool) (co return block.Hash(), nil } -// collectLogs collects the logs that were generated or removed during -// the processing of a block. These logs are later announced as deleted or reborn. +// collectLogs collects the logs that were generated or removed during the +// processing of a block. These logs are later announced as deleted or reborn. func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log { var blobGasPrice *big.Int excessBlobGas := b.ExcessBlobGas() @@ -2181,70 +2182,55 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log { // reorg takes two blocks, an old chain and a new chain and will reconstruct the // blocks and inserts them to be part of the new canonical chain and accumulates // potential missing transactions and post an event about them. +// // Note the new head block won't be processed here, callers need to handle it // externally. -func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { +func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error { var ( - newChain types.Blocks - oldChain types.Blocks - commonBlock *types.Block - - deletedTxs []common.Hash - addedTxs []common.Hash + newChain []*types.Header + oldChain []*types.Header + commonBlock *types.Header ) - oldBlock := bc.GetBlock(oldHead.Hash(), oldHead.Number.Uint64()) - if oldBlock == nil { - return errors.New("current head block missing") - } - newBlock := newHead - // Reduce the longer chain to the same number as the shorter one - if oldBlock.NumberU64() > newBlock.NumberU64() { + if oldHead.Number.Uint64() > newHead.Number.Uint64() { // Old chain is longer, gather all transactions and logs as deleted ones - for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) { - oldChain = append(oldChain, oldBlock) - for _, tx := range oldBlock.Transactions() { - deletedTxs = append(deletedTxs, tx.Hash()) - } + for ; oldHead != nil && oldHead.Number.Uint64() != newHead.Number.Uint64(); oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1) { + oldChain = append(oldChain, oldHead) } } else { // New chain is longer, stash all blocks away for subsequent insertion - for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) { - newChain = append(newChain, newBlock) + for ; newHead != nil && newHead.Number.Uint64() != oldHead.Number.Uint64(); newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1) { + newChain = append(newChain, newHead) } } - if oldBlock == nil { + if oldHead == nil { return errInvalidOldChain } - if newBlock == nil { + if newHead == nil { return errInvalidNewChain } // Both sides of the reorg are at the same number, reduce both until the common // ancestor is found for { // If the common ancestor was found, bail out - if oldBlock.Hash() == newBlock.Hash() { - commonBlock = oldBlock + if oldHead.Hash() == newHead.Hash() { + commonBlock = oldHead break } // Remove an old block as well as stash away a new block - oldChain = append(oldChain, oldBlock) - for _, tx := range oldBlock.Transactions() { - deletedTxs = append(deletedTxs, tx.Hash()) - } - newChain = append(newChain, newBlock) + oldChain = append(oldChain, oldHead) + newChain = append(newChain, newHead) // Step back with both chains - oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) - if oldBlock == nil { + oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1) + if oldHead == nil { return errInvalidOldChain } - newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) - if newBlock == nil { + newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1) + if newHead == nil { return errInvalidNewChain } } - // Ensure the user sees large reorgs if len(oldChain) > 0 && len(newChain) > 0 { logFn := log.Info @@ -2253,7 +2239,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { msg = "Large chain reorg detected" logFn = log.Warn } - logFn(msg, "number", commonBlock.Number(), "hash", commonBlock.Hash(), + logFn(msg, "number", commonBlock.Number, "hash", commonBlock.Hash(), "drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash()) blockReorgAddMeter.Mark(int64(len(newChain))) blockReorgDropMeter.Mark(int64(len(oldChain))) @@ -2261,55 +2247,112 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { } else if len(newChain) > 0 { // Special case happens in the post merge stage that current head is // the ancestor of new head while these two blocks are not consecutive - log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number(), "hash", newChain[0].Hash()) + log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number, "hash", newChain[0].Hash()) blockReorgAddMeter.Mark(int64(len(newChain))) } else { // len(newChain) == 0 && len(oldChain) > 0 // rewind the canonical chain to a lower point. - log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "oldblocks", len(oldChain), "newnum", newBlock.Number(), "newhash", newBlock.Hash(), "newblocks", len(newChain)) + log.Error("Impossible reorg, please file an issue", "oldnum", oldHead.Number, "oldhash", oldHead.Hash(), "oldblocks", len(oldChain), "newnum", newHead.Number, "newhash", newHead.Hash(), "newblocks", len(newChain)) } // Acquire the tx-lookup lock before mutation. This step is essential // as the txlookups should be changed atomically, and all subsequent // reads should be blocked until the mutation is complete. bc.txLookupLock.Lock() - // Insert the new chain segment in incremental order, from the old - // to the new. The new chain head (newChain[0]) is not inserted here, - // as it will be handled separately outside of this function - for i := len(newChain) - 1; i >= 1; i-- { - // Insert the block in the canonical way, re-writing history - bc.writeHeadBlock(newChain[i]) + // Reorg can be executed, start reducing the chain's old blocks and appending + // the new blocks + var ( + deletedTxs []common.Hash + rebirthTxs []common.Hash - // Collect the new added transactions. - for _, tx := range newChain[i].Transactions() { - addedTxs = append(addedTxs, tx.Hash()) + deletedLogs []*types.Log + rebirthLogs []*types.Log + ) + // Deleted log emission on the API uses forward order, which is borked, but + // we'll leave it in for legacy reasons. + // + // TODO(karalabe): This should be nuked out, no idea how, deprecate some APIs? + { + for i := len(oldChain) - 1; i >= 0; i-- { + block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64()) + if block == nil { + return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics + } + if logs := bc.collectLogs(block, true); len(logs) > 0 { + deletedLogs = append(deletedLogs, logs...) + } + if len(deletedLogs) > 512 { + bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) + deletedLogs = nil + } + } + if len(deletedLogs) > 0 { + bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) } } + // Undo old blocks in reverse order + for i := 0; i < len(oldChain); i++ { + // Collect all the deleted transactions + block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64()) + if block == nil { + return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics + } + for _, tx := range block.Transactions() { + deletedTxs = append(deletedTxs, tx.Hash()) + } + // Collect deleted logs and emit them for new integrations + if logs := bc.collectLogs(block, true); len(logs) > 0 { + // Emit revertals latest first, older then + slices.Reverse(logs) + // TODO(karalabe): Hook into the reverse emission part + } + } + // Apply new blocks in forward order + for i := len(newChain) - 1; i >= 1; i-- { + // Collect all the included transactions + block := bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64()) + if block == nil { + return errInvalidNewChain // Corrupt database, mostly here to avoid weird panics + } + for _, tx := range block.Transactions() { + rebirthTxs = append(rebirthTxs, tx.Hash()) + } + // Collect inserted logs and emit them + if logs := bc.collectLogs(block, false); len(logs) > 0 { + rebirthLogs = append(rebirthLogs, logs...) + } + if len(rebirthLogs) > 512 { + bc.logsFeed.Send(rebirthLogs) + rebirthLogs = nil + } + // Update the head block + bc.writeHeadBlock(block) + } + if len(rebirthLogs) > 0 { + bc.logsFeed.Send(rebirthLogs) + } // Delete useless indexes right now which includes the non-canonical // transaction indexes, canonical chain indexes which above the head. - var ( - indexesBatch = bc.db.NewBatch() - diffs = types.HashDifference(deletedTxs, addedTxs) - ) - for _, tx := range diffs { - rawdb.DeleteTxLookupEntry(indexesBatch, tx) + batch := bc.db.NewBatch() + for _, tx := range types.HashDifference(deletedTxs, rebirthTxs) { + rawdb.DeleteTxLookupEntry(batch, tx) } // Delete all hash markers that are not part of the new canonical chain. // Because the reorg function does not handle new chain head, all hash // markers greater than or equal to new chain head should be deleted. - number := commonBlock.NumberU64() + number := commonBlock.Number if len(newChain) > 1 { - number = newChain[1].NumberU64() + number = newChain[1].Number } - for i := number + 1; ; i++ { + for i := number.Uint64() + 1; ; i++ { hash := rawdb.ReadCanonicalHash(bc.db, i) if hash == (common.Hash{}) { break } - rawdb.DeleteCanonicalHash(indexesBatch, i) + rawdb.DeleteCanonicalHash(batch, i) } - if err := indexesBatch.Write(); err != nil { + if err := batch.Write(); err != nil { log.Crit("Failed to delete useless indexes", "err", err) } // Reset the tx lookup cache to clear stale txlookup cache. @@ -2318,40 +2361,6 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error { // Release the tx-lookup lock after mutation. bc.txLookupLock.Unlock() - // Send out events for logs from the old canon chain, and 'reborn' - // logs from the new canon chain. The number of logs can be very - // high, so the events are sent in batches of size around 512. - - // Deleted logs + blocks: - var deletedLogs []*types.Log - for i := len(oldChain) - 1; i >= 0; i-- { - // Collect deleted logs for notification - if logs := bc.collectLogs(oldChain[i], true); len(logs) > 0 { - deletedLogs = append(deletedLogs, logs...) - } - if len(deletedLogs) > 512 { - bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) - deletedLogs = nil - } - } - if len(deletedLogs) > 0 { - bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) - } - - // New logs: - var rebirthLogs []*types.Log - for i := len(newChain) - 1; i >= 1; i-- { - if logs := bc.collectLogs(newChain[i], false); len(logs) > 0 { - rebirthLogs = append(rebirthLogs, logs...) - } - if len(rebirthLogs) > 512 { - bc.logsFeed.Send(rebirthLogs) - rebirthLogs = nil - } - } - if len(rebirthLogs) > 0 { - bc.logsFeed.Send(rebirthLogs) - } return nil } @@ -2389,7 +2398,7 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) { // Run the reorg if necessary and set the given block as new head. start := time.Now() if head.ParentHash() != bc.CurrentBlock().Hash() { - if err := bc.reorg(bc.CurrentBlock(), head); err != nil { + if err := bc.reorg(bc.CurrentBlock(), head.Header()); err != nil { return common.Hash{}, err } } diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 9f491e1bfd..d8f7da0643 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -4231,3 +4231,36 @@ func TestPragueRequests(t *testing.T) { t.Fatalf("block %d: failed to insert into chain: %v", n, err) } } + +func BenchmarkReorg(b *testing.B) { + chainLength := b.N + + dir := b.TempDir() + db, err := rawdb.NewLevelDBDatabase(dir, 128, 128, "", false) + if err != nil { + b.Fatalf("cannot create temporary database: %v", err) + } + defer db.Close() + gspec := &Genesis{ + Config: params.TestChainConfig, + Alloc: types.GenesisAlloc{benchRootAddr: {Balance: math.BigPow(2, 254)}}, + } + blockchain, _ := NewBlockChain(db, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil) + defer blockchain.Stop() + + // Insert an easy and a difficult chain afterwards + easyBlocks, _ := GenerateChain(params.TestChainConfig, blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash()), ethash.NewFaker(), db, chainLength, genValueTx(50000)) + diffBlocks, _ := GenerateChain(params.TestChainConfig, blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash()), ethash.NewFaker(), db, chainLength, genValueTx(50000)) + + if _, err := blockchain.InsertChain(easyBlocks); err != nil { + b.Fatalf("failed to insert easy chain: %v", err) + } + b.ResetTimer() + if _, err := blockchain.InsertChain(diffBlocks); err != nil { + b.Fatalf("failed to insert difficult chain: %v", err) + } +} + +// Master: BenchmarkReorg-8 10000 899591 ns/op 820154 B/op 1440 allocs/op 1549443072 bytes of heap used +// WithoutOldChain: BenchmarkReorg-8 10000 1147281 ns/op 943163 B/op 1564 allocs/op 1163870208 bytes of heap used +// WithoutNewChain: BenchmarkReorg-8 10000 1018922 ns/op 943580 B/op 1564 allocs/op 1171890176 bytes of heap used