From f1b00cffc828105c17c0ecacb2074874b752a9a0 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Mon, 17 Dec 2018 15:23:54 +0800 Subject: [PATCH] core: re-omit new log event when logs rebirth --- core/blockchain.go | 27 ++++-- core/blockchain_test.go | 206 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 224 insertions(+), 9 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index e40fc39fac..117be8c721 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1401,11 +1401,11 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { commonBlock *types.Block deletedTxs types.Transactions deletedLogs []*types.Log + rebirthLogs []*types.Log // collectLogs collects the logs that were generated during the // processing of the block that corresponds with the given hash. - // These logs are later announced as deleted. - collectLogs = func(hash common.Hash) { - // Coalesce logs and set 'Removed'. + // These logs are later announced as deleted or reborn + collectLogs = func(hash common.Hash, removed bool) { number := bc.hc.GetBlockNumber(hash) if number == nil { return @@ -1413,9 +1413,13 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { receipts := rawdb.ReadReceipts(bc.db, hash, *number) for _, receipt := range receipts { for _, log := range receipt.Logs { - del := *log - del.Removed = true - deletedLogs = append(deletedLogs, &del) + l := *log + if removed { + l.Removed = true + deletedLogs = append(deletedLogs, &l) + } else { + rebirthLogs = append(rebirthLogs, &l) + } } } } @@ -1428,7 +1432,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { oldChain = append(oldChain, oldBlock) deletedTxs = append(deletedTxs, oldBlock.Transactions()...) - collectLogs(oldBlock.Hash()) + collectLogs(oldBlock.Hash(), true) } } else { // reduce new chain and append new chain blocks for inserting later on @@ -1452,7 +1456,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { oldChain = append(oldChain, oldBlock) newChain = append(newChain, newBlock) deletedTxs = append(deletedTxs, oldBlock.Transactions()...) - collectLogs(oldBlock.Hash()) + collectLogs(oldBlock.Hash(), true) oldBlock, newBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) if oldBlock == nil { @@ -1478,6 +1482,10 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { for i := len(newChain) - 1; i >= 0; i-- { // insert the block in the canonical way, re-writing history bc.insert(newChain[i]) + // collect reborn logs due to chain reorg(except head block) + if i != 0 { + collectLogs(newChain[i].Hash(), false) + } // write lookup entries for hash based transaction/receipt searches rawdb.WriteTxLookupEntries(bc.db, newChain[i]) addedTxs = append(addedTxs, newChain[i].Transactions()...) @@ -1495,6 +1503,9 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { if len(deletedLogs) > 0 { go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) } + if len(rebirthLogs) > 0 { + go bc.logsFeed.Send(rebirthLogs) + } if len(oldChain) > 0 { go func() { for _, block := range oldChain { diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 504ad0eaf7..7c76f1fc44 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -884,7 +884,6 @@ func TestChainTxReorgs(t *testing.T) { } func TestLogReorgs(t *testing.T) { - var ( key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr1 = crypto.PubkeyToAddress(key1.PublicKey) @@ -930,6 +929,211 @@ func TestLogReorgs(t *testing.T) { } } +func TestLogRebirth(t *testing.T) { + var ( + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + db = ethdb.NewMemDatabase() + // this code generates a log + code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00") + gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}} + genesis = gspec.MustCommit(db) + signer = types.NewEIP155Signer(gspec.Config.ChainID) + newLogCh = make(chan bool) + ) + + // listenNewLog checks whether the received logs number is equal with expected. + listenNewLog := func(sink chan []*types.Log, expect int) { + cnt := 0 + for { + select { + case logs := <-sink: + cnt += len(logs) + case <-time.NewTimer(5 * time.Second).C: + // new logs timeout + newLogCh <- false + return + } + if cnt == expect { + break + } else if cnt > expect { + // redundant logs received + newLogCh <- false + return + } + } + select { + case <-sink: + // redundant logs received + newLogCh <- false + case <-time.NewTimer(100 * time.Millisecond).C: + newLogCh <- true + } + } + + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + defer blockchain.Stop() + + logsCh := make(chan []*types.Log) + blockchain.SubscribeLogsEvent(logsCh) + + rmLogsCh := make(chan RemovedLogsEvent) + blockchain.SubscribeRemovedLogsEvent(rmLogsCh) + + chain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) { + if i == 1 { + tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, new(big.Int), code), signer, key1) + if err != nil { + t.Fatalf("failed to create tx: %v", err) + } + gen.AddTx(tx) + } + }) + + // Spawn a goroutine to receive log events + go listenNewLog(logsCh, 1) + if _, err := blockchain.InsertChain(chain); err != nil { + t.Fatalf("failed to insert chain: %v", err) + } + if !<-newLogCh { + t.Fatalf("failed to receive new log event") + } + + // Generate long reorg chain + forkChain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) { + if i == 1 { + tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, new(big.Int), code), signer, key1) + if err != nil { + t.Fatalf("failed to create tx: %v", err) + } + gen.AddTx(tx) + // Higher block difficulty + gen.OffsetTime(-9) + } + }) + + // Spawn a goroutine to receive log events + go listenNewLog(logsCh, 1) + if _, err := blockchain.InsertChain(forkChain); err != nil { + t.Fatalf("failed to insert forked chain: %v", err) + } + if !<-newLogCh { + t.Fatalf("failed to receive new log event") + } + // Ensure removedLog events received + select { + case ev := <-rmLogsCh: + if len(ev.Logs) == 0 { + t.Error("expected logs") + } + case <-time.NewTimer(1 * time.Second).C: + t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.") + } + + newBlocks, _ := GenerateChain(params.TestChainConfig, chain[len(chain)-1], ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {}) + go listenNewLog(logsCh, 1) + if _, err := blockchain.InsertChain(newBlocks); err != nil { + t.Fatalf("failed to insert forked chain: %v", err) + } + // Rebirth logs should omit a newLogEvent + if !<-newLogCh { + t.Fatalf("failed to receive new log event") + } + // Ensure removedLog events received + select { + case ev := <-rmLogsCh: + if len(ev.Logs) == 0 { + t.Error("expected logs") + } + case <-time.NewTimer(1 * time.Second).C: + t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.") + } +} + +func TestSideLogRebirth(t *testing.T) { + var ( + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + db = ethdb.NewMemDatabase() + // this code generates a log + code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00") + gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}} + genesis = gspec.MustCommit(db) + signer = types.NewEIP155Signer(gspec.Config.ChainID) + newLogCh = make(chan bool) + ) + + // listenNewLog checks whether the received logs number is equal with expected. + listenNewLog := func(sink chan []*types.Log, expect int) { + cnt := 0 + for { + select { + case logs := <-sink: + cnt += len(logs) + case <-time.NewTimer(5 * time.Second).C: + // new logs timeout + newLogCh <- false + return + } + if cnt == expect { + break + } else if cnt > expect { + // redundant logs received + newLogCh <- false + return + } + } + select { + case <-sink: + // redundant logs received + newLogCh <- false + case <-time.NewTimer(100 * time.Millisecond).C: + newLogCh <- true + } + } + + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) + defer blockchain.Stop() + + logsCh := make(chan []*types.Log) + blockchain.SubscribeLogsEvent(logsCh) + + chain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) { + if i == 1 { + // Higher block difficulty + gen.OffsetTime(-9) + } + }) + if _, err := blockchain.InsertChain(chain); err != nil { + t.Fatalf("failed to insert forked chain: %v", err) + } + + // Generate side chain with lower difficulty + sideChain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) { + if i == 1 { + tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, new(big.Int), code), signer, key1) + if err != nil { + t.Fatalf("failed to create tx: %v", err) + } + gen.AddTx(tx) + } + }) + if _, err := blockchain.InsertChain(sideChain); err != nil { + t.Fatalf("failed to insert forked chain: %v", err) + } + + // Generate a new block based on side chain + newBlocks, _ := GenerateChain(params.TestChainConfig, sideChain[len(sideChain)-1], ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {}) + go listenNewLog(logsCh, 1) + if _, err := blockchain.InsertChain(newBlocks); err != nil { + t.Fatalf("failed to insert forked chain: %v", err) + } + // Rebirth logs should omit a newLogEvent + if !<-newLogCh { + t.Fatalf("failed to receive new log event") + } +} + func TestReorgSideEvent(t *testing.T) { var ( db = ethdb.NewMemDatabase()