From 690bd8a41755221b4faa211ec0c78af580b9d093 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Mon, 17 Dec 2018 15:23:54 +0800 Subject: [PATCH 1/2] core: re-omit new log event when logs rebirth --- core/blockchain.go | 27 ++++-- core/blockchain_test.go | 206 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 224 insertions(+), 9 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 20ab71b4d..193c297f4 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1475,11 +1475,11 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { commonBlock *types.Block deletedTxs types.Transactions deletedLogs []*types.Log + rebirthLogs []*types.Log // collectLogs collects the logs that were generated during the // processing of the block that corresponds with the given hash. - // These logs are later announced as deleted. - collectLogs = func(hash common.Hash) { - // Coalesce logs and set 'Removed'. + // These logs are later announced as deleted or reborn + collectLogs = func(hash common.Hash, removed bool) { number := bc.hc.GetBlockNumber(hash) if number == nil { return @@ -1487,9 +1487,13 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { receipts := rawdb.ReadReceipts(bc.db, hash, *number) for _, receipt := range receipts { for _, log := range receipt.Logs { - del := *log - del.Removed = true - deletedLogs = append(deletedLogs, &del) + l := *log + if removed { + l.Removed = true + deletedLogs = append(deletedLogs, &l) + } else { + rebirthLogs = append(rebirthLogs, &l) + } } } } @@ -1502,7 +1506,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { oldChain = append(oldChain, oldBlock) deletedTxs = append(deletedTxs, oldBlock.Transactions()...) - collectLogs(oldBlock.Hash()) + collectLogs(oldBlock.Hash(), true) } } else { // reduce new chain and append new chain blocks for inserting later on @@ -1526,7 +1530,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { oldChain = append(oldChain, oldBlock) newChain = append(newChain, newBlock) deletedTxs = append(deletedTxs, oldBlock.Transactions()...) - collectLogs(oldBlock.Hash()) + collectLogs(oldBlock.Hash(), true) oldBlock, newBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) if oldBlock == nil { @@ -1552,6 +1556,10 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { for i := len(newChain) - 1; i >= 0; i-- { // insert the block in the canonical way, re-writing history bc.insert(newChain[i]) + // collect reborn logs due to chain reorg(except head block) + if i != 0 { + collectLogs(newChain[i].Hash(), false) + } // write lookup entries for hash based transaction/receipt searches rawdb.WriteTxLookupEntries(bc.db, newChain[i]) addedTxs = append(addedTxs, newChain[i].Transactions()...) @@ -1569,6 +1577,9 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { if len(deletedLogs) > 0 { go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) } + if len(rebirthLogs) > 0 { + go bc.logsFeed.Send(rebirthLogs) + } if len(oldChain) > 0 { go func() { for _, block := range oldChain { diff --git a/core/blockchain_test.go b/core/blockchain_test.go index c8cae969c..4cee3cd85 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -867,7 +867,6 @@ func TestChainTxReorgs(t *testing.T) { } func TestLogReorgs(t *testing.T) { - var ( key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr1 = crypto.PubkeyToAddress(key1.PublicKey) @@ -913,6 +912,211 @@ func TestLogReorgs(t *testing.T) { } } +func TestLogRebirth(t *testing.T) { + var ( + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + db = ethdb.NewMemDatabase() + // this code generates a log + code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00") + gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}} + genesis = gspec.MustCommit(db) + signer = types.NewEIP155Signer(gspec.Config.ChainID) + newLogCh = make(chan bool) + ) + + // listenNewLog checks whether the received logs number is equal with expected. + listenNewLog := func(sink chan []*types.Log, expect int) { + cnt := 0 + for { + select { + case logs := <-sink: + cnt += len(logs) + case <-time.NewTimer(5 * time.Second).C: + // new logs timeout + newLogCh <- false + return + } + if cnt == expect { + break + } else if cnt > expect { + // redundant logs received + newLogCh <- false + return + } + } + select { + case <-sink: + // redundant logs received + newLogCh <- false + case <-time.NewTimer(100 * time.Millisecond).C: + newLogCh <- true + } + } + + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + defer blockchain.Stop() + + logsCh := make(chan []*types.Log) + blockchain.SubscribeLogsEvent(logsCh) + + rmLogsCh := make(chan RemovedLogsEvent) + blockchain.SubscribeRemovedLogsEvent(rmLogsCh) + + chain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) { + if i == 1 { + tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, new(big.Int), code), signer, key1) + if err != nil { + t.Fatalf("failed to create tx: %v", err) + } + gen.AddTx(tx) + } + }) + + // Spawn a goroutine to receive log events + go listenNewLog(logsCh, 1) + if _, err := blockchain.InsertChain(chain); err != nil { + t.Fatalf("failed to insert chain: %v", err) + } + if !<-newLogCh { + t.Fatalf("failed to receive new log event") + } + + // Generate long reorg chain + forkChain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) { + if i == 1 { + tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, new(big.Int), code), signer, key1) + if err != nil { + t.Fatalf("failed to create tx: %v", err) + } + gen.AddTx(tx) + // Higher block difficulty + gen.OffsetTime(-9) + } + }) + + // Spawn a goroutine to receive log events + go listenNewLog(logsCh, 1) + if _, err := blockchain.InsertChain(forkChain); err != nil { + t.Fatalf("failed to insert forked chain: %v", err) + } + if !<-newLogCh { + t.Fatalf("failed to receive new log event") + } + // Ensure removedLog events received + select { + case ev := <-rmLogsCh: + if len(ev.Logs) == 0 { + t.Error("expected logs") + } + case <-time.NewTimer(1 * time.Second).C: + t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.") + } + + newBlocks, _ := GenerateChain(params.TestChainConfig, chain[len(chain)-1], ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {}) + go listenNewLog(logsCh, 1) + if _, err := blockchain.InsertChain(newBlocks); err != nil { + t.Fatalf("failed to insert forked chain: %v", err) + } + // Rebirth logs should omit a newLogEvent + if !<-newLogCh { + t.Fatalf("failed to receive new log event") + } + // Ensure removedLog events received + select { + case ev := <-rmLogsCh: + if len(ev.Logs) == 0 { + t.Error("expected logs") + } + case <-time.NewTimer(1 * time.Second).C: + t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.") + } +} + +func TestSideLogRebirth(t *testing.T) { + var ( + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + db = ethdb.NewMemDatabase() + // this code generates a log + code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00") + gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}} + genesis = gspec.MustCommit(db) + signer = types.NewEIP155Signer(gspec.Config.ChainID) + newLogCh = make(chan bool) + ) + + // listenNewLog checks whether the received logs number is equal with expected. + listenNewLog := func(sink chan []*types.Log, expect int) { + cnt := 0 + for { + select { + case logs := <-sink: + cnt += len(logs) + case <-time.NewTimer(5 * time.Second).C: + // new logs timeout + newLogCh <- false + return + } + if cnt == expect { + break + } else if cnt > expect { + // redundant logs received + newLogCh <- false + return + } + } + select { + case <-sink: + // redundant logs received + newLogCh <- false + case <-time.NewTimer(100 * time.Millisecond).C: + newLogCh <- true + } + } + + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + defer blockchain.Stop() + + logsCh := make(chan []*types.Log) + blockchain.SubscribeLogsEvent(logsCh) + + chain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) { + if i == 1 { + // Higher block difficulty + gen.OffsetTime(-9) + } + }) + if _, err := blockchain.InsertChain(chain); err != nil { + t.Fatalf("failed to insert forked chain: %v", err) + } + + // Generate side chain with lower difficulty + sideChain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) { + if i == 1 { + tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, new(big.Int), code), signer, key1) + if err != nil { + t.Fatalf("failed to create tx: %v", err) + } + gen.AddTx(tx) + } + }) + if _, err := blockchain.InsertChain(sideChain); err != nil { + t.Fatalf("failed to insert forked chain: %v", err) + } + + // Generate a new block based on side chain + newBlocks, _ := GenerateChain(params.TestChainConfig, sideChain[len(sideChain)-1], ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {}) + go listenNewLog(logsCh, 1) + if _, err := blockchain.InsertChain(newBlocks); err != nil { + t.Fatalf("failed to insert forked chain: %v", err) + } + // Rebirth logs should omit a newLogEvent + if !<-newLogCh { + t.Fatalf("failed to receive new log event") + } +} + func TestReorgSideEvent(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() From 43631aa1d69091b4ced7f5e942b70cfc5b8e82a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 4 Apr 2019 14:39:11 +0300 Subject: [PATCH 2/2] core: minor code polishes + rebase fixes --- core/blockchain.go | 85 +++++++++++++++++++++++------------------ core/blockchain_test.go | 15 +++++--- 2 files changed, 57 insertions(+), 43 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 193c297f4..4a347ec81 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1465,17 +1465,21 @@ func (bc *BlockChain) insertSidechain(block *types.Block, it *insertIterator) (i return 0, nil, nil, nil } -// reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them -// to be part of the new canonical chain and accumulates potential missing transactions and post an -// event about them +// reorg takes two blocks, an old chain and a new chain and will reconstruct the +// blocks and inserts them to be part of the new canonical chain and accumulates +// potential missing transactions and post an event about them. func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { var ( newChain types.Blocks oldChain types.Blocks commonBlock *types.Block - deletedTxs types.Transactions + + deletedTxs types.Transactions + addedTxs types.Transactions + deletedLogs []*types.Log rebirthLogs []*types.Log + // collectLogs collects the logs that were generated during the // processing of the block that corresponds with the given hash. // These logs are later announced as deleted or reborn @@ -1498,46 +1502,49 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { } } ) - - // first reduce whoever is higher bound + // Reduce the longer chain to the same number as the shorter one if oldBlock.NumberU64() > newBlock.NumberU64() { - // reduce old chain + // Old chain is longer, gather all transactions and logs as deleted ones for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) { oldChain = append(oldChain, oldBlock) deletedTxs = append(deletedTxs, oldBlock.Transactions()...) - collectLogs(oldBlock.Hash(), true) } } else { - // reduce new chain and append new chain blocks for inserting later on + // New chain is longer, stash all blocks away for subsequent insertion for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) { newChain = append(newChain, newBlock) } } if oldBlock == nil { - return fmt.Errorf("Invalid old chain") + return fmt.Errorf("invalid old chain") } if newBlock == nil { - return fmt.Errorf("Invalid new chain") + return fmt.Errorf("invalid new chain") } - + // Both sides of the reorg are at the same number, reduce both until the common + // ancestor is found for { + // If the common ancestor was found, bail out if oldBlock.Hash() == newBlock.Hash() { commonBlock = oldBlock break } - + // Remove an old block as well as stash away a new block oldChain = append(oldChain, oldBlock) - newChain = append(newChain, newBlock) deletedTxs = append(deletedTxs, oldBlock.Transactions()...) collectLogs(oldBlock.Hash(), true) - oldBlock, newBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) + newChain = append(newChain, newBlock) + + // Step back with both chains + oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) if oldBlock == nil { - return fmt.Errorf("Invalid old chain") + return fmt.Errorf("invalid old chain") } + newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) if newBlock == nil { - return fmt.Errorf("Invalid new chain") + return fmt.Errorf("invalid new chain") } } // Ensure the user sees large reorgs @@ -1552,42 +1559,46 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "newnum", newBlock.Number(), "newhash", newBlock.Hash()) } // Insert the new chain, taking care of the proper incremental order - var addedTxs types.Transactions for i := len(newChain) - 1; i >= 0; i-- { - // insert the block in the canonical way, re-writing history + // Insert the block in the canonical way, re-writing history bc.insert(newChain[i]) - // collect reborn logs due to chain reorg(except head block) + + // Collect reborn logs due to chain reorg (except head block (reverse order)) if i != 0 { collectLogs(newChain[i].Hash(), false) } - // write lookup entries for hash based transaction/receipt searches + // Write lookup entries for hash based transaction/receipt searches rawdb.WriteTxLookupEntries(bc.db, newChain[i]) addedTxs = append(addedTxs, newChain[i].Transactions()...) } - // calculate the difference between deleted and added transactions - diff := types.TxDifference(deletedTxs, addedTxs) - // When transactions get deleted from the database that means the - // receipts that were created in the fork must also be deleted + // When transactions get deleted from the database, the receipts that were + // created in the fork must also be deleted batch := bc.db.NewBatch() - for _, tx := range diff { + for _, tx := range types.TxDifference(deletedTxs, addedTxs) { rawdb.DeleteTxLookupEntry(batch, tx.Hash()) } batch.Write() - if len(deletedLogs) > 0 { - go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) - } - if len(rebirthLogs) > 0 { - go bc.logsFeed.Send(rebirthLogs) - } - if len(oldChain) > 0 { - go func() { + // If any logs need to be fired, do it now. In theory we could avoid creating + // this goroutine if there are no events to fire, but realistcally that only + // ever happens if we're reorging empty blocks, which will only happen on idle + // networks where performance is not an issue either way. + // + // TODO(karalabe): Can we get rid of the goroutine somehow to guarantee correct + // event ordering? + go func() { + if len(deletedLogs) > 0 { + bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) + } + if len(rebirthLogs) > 0 { + bc.logsFeed.Send(rebirthLogs) + } + if len(oldChain) > 0 { for _, block := range oldChain { bc.chainSideFeed.Send(ChainSideEvent{Block: block}) } - }() - } - + } + }() return nil } diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 4cee3cd85..d6be6c7e8 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/params" ) @@ -916,7 +917,8 @@ func TestLogRebirth(t *testing.T) { var ( key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr1 = crypto.PubkeyToAddress(key1.PublicKey) - db = ethdb.NewMemDatabase() + db = memorydb.New() + // this code generates a log code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00") gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}} @@ -1018,10 +1020,6 @@ func TestLogRebirth(t *testing.T) { if _, err := blockchain.InsertChain(newBlocks); err != nil { t.Fatalf("failed to insert forked chain: %v", err) } - // Rebirth logs should omit a newLogEvent - if !<-newLogCh { - t.Fatalf("failed to receive new log event") - } // Ensure removedLog events received select { case ev := <-rmLogsCh: @@ -1031,13 +1029,18 @@ func TestLogRebirth(t *testing.T) { case <-time.NewTimer(1 * time.Second).C: t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.") } + // Rebirth logs should omit a newLogEvent + if !<-newLogCh { + t.Fatalf("failed to receive new log event") + } } func TestSideLogRebirth(t *testing.T) { var ( key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr1 = crypto.PubkeyToAddress(key1.PublicKey) - db = ethdb.NewMemDatabase() + db = memorydb.New() + // this code generates a log code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00") gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}}