From 5a0e1d88f48eb3394505290178afd4ce47e60a8f Mon Sep 17 00:00:00 2001 From: Miro Date: Mon, 4 Oct 2021 08:09:51 -0400 Subject: [PATCH] eth/filters: fix TestPendingLogsSubscription (#23619) The test did not synchronize with per-case goroutines, and thus didn't notice that some tests were just hanging. This change adds missing synchronization and fixes the broken tests. --- eth/filters/filter_system_test.go | 100 ++++++++++++++++++++++-------- 1 file changed, 74 insertions(+), 26 deletions(-) diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index d55674f309..efeab58d7d 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -506,58 +506,80 @@ func TestPendingLogsSubscription(t *testing.T) { }, } + pendingBlockNumber = big.NewInt(rpc.PendingBlockNumber.Int64()) + testCases = []struct { crit ethereum.FilterQuery expected []*types.Log c chan []*types.Log sub *Subscription + err chan error }{ // match all { - ethereum.FilterQuery{}, flattenLogs(allLogs), - nil, nil, + ethereum.FilterQuery{FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, + flattenLogs(allLogs), + nil, nil, nil, }, // match none due to no matching addresses { - ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, + ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, nil, - nil, nil, + nil, nil, nil, }, // match logs based on addresses, ignore topics { - ethereum.FilterQuery{Addresses: []common.Address{firstAddr}}, + ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, append(flattenLogs(allLogs[:2]), allLogs[5][3]), - nil, nil, + nil, nil, nil, }, // match none due to no matching topics (match with address) { - ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}}, + ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, + nil, nil, nil, nil, }, // match logs based on addresses and topics { - ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, + ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, append(flattenLogs(allLogs[3:5]), allLogs[5][0]), - nil, nil, + nil, nil, nil, }, // match logs based on multiple addresses and "or" topics { - ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, + ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, append(flattenLogs(allLogs[2:5]), allLogs[5][0]), + nil, nil, nil, + }, + // multiple pending logs, should match only 2 topics from the logs in block 5 + { + ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, + []*types.Log{allLogs[5][0], allLogs[5][2]}, + nil, nil, nil, + }, + // match none due to only matching new mined logs + { + ethereum.FilterQuery{}, nil, + nil, nil, nil, + }, + // match none due to only matching mined logs within a specific block range + { + ethereum.FilterQuery{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, nil, + nil, nil, nil, }, - // block numbers are ignored for filters created with New***Filter, these return all logs that match the given criteria when the state changes + // match all due to matching mined and pending logs { - ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)}, - append(flattenLogs(allLogs[:2]), allLogs[5][3]), - nil, nil, + ethereum.FilterQuery{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, + flattenLogs(allLogs), + nil, nil, nil, }, - // multiple pending logs, should match only 2 topics from the logs in block 5 + // match none due to matching logs from a specific block number to new mined blocks { - ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}}, - []*types.Log{allLogs[5][0], allLogs[5][2]}, - nil, nil, + ethereum.FilterQuery{FromBlock: big.NewInt(1), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, + nil, + nil, nil, nil, }, } ) @@ -567,43 +589,69 @@ func TestPendingLogsSubscription(t *testing.T) { // (some) events are posted. for i := range testCases { testCases[i].c = make(chan []*types.Log) - testCases[i].sub, _ = api.events.SubscribeLogs(testCases[i].crit, testCases[i].c) + testCases[i].err = make(chan error) + + var err error + testCases[i].sub, err = api.events.SubscribeLogs(testCases[i].crit, testCases[i].c) + if err != nil { + t.Fatalf("SubscribeLogs %d failed: %v\n", i, err) + } } for n, test := range testCases { i := n tt := test go func() { + defer tt.sub.Unsubscribe() + var fetched []*types.Log + + timeout := time.After(1 * time.Second) fetchLoop: for { - logs := <-tt.c - fetched = append(fetched, logs...) - if len(fetched) >= len(tt.expected) { + select { + case logs := <-tt.c: + // Do not break early if we've fetched greater, or equal, + // to the number of logs expected. This ensures we do not + // deadlock the filter system because it will do a blocking + // send on this channel if another log arrives. + fetched = append(fetched, logs...) + case <-timeout: break fetchLoop } } if len(fetched) != len(tt.expected) { - panic(fmt.Sprintf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched))) + tt.err <- fmt.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)) + return } for l := range fetched { if fetched[l].Removed { - panic(fmt.Sprintf("expected log not to be removed for log %d in case %d", l, i)) + tt.err <- fmt.Errorf("expected log not to be removed for log %d in case %d", l, i) + return } if !reflect.DeepEqual(fetched[l], tt.expected[l]) { - panic(fmt.Sprintf("invalid log on index %d for case %d", l, i)) + tt.err <- fmt.Errorf("invalid log on index %d for case %d\n", l, i) + return } } + tt.err <- nil }() } // raise events - time.Sleep(1 * time.Second) for _, ev := range allLogs { backend.pendingLogsFeed.Send(ev) } + + for i := range testCases { + err := <-testCases[i].err + if err != nil { + t.Fatalf("test %d failed: %v", i, err) + } + <-testCases[i].sub.Err() + } } // TestPendingTxFilterDeadlock tests if the event loop hangs when pending