From 2ab2a9f13116748ca343892f851e3632861c994e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 27 Sep 2017 13:14:52 +0300 Subject: [PATCH] core/bloombits, eth/filters: handle null topics (#15195) When implementing the new bloombits based filter, I've accidentally broke null topics by removing the special casing of common.Hash{} filter rules, which acted as the wildcard topic until now. This PR fixes the regression, but instead of using the magic hash common.Hash{} as the null wildcard, the PR reworks the code to handle nil topics during parsing, converting a JSON null into nil []common.Hash topic. --- core/bloombits/matcher.go | 16 ++++++++++++++-- core/bloombits/matcher_test.go | 28 ++++++++++++++++++++++++++++ eth/filters/api.go | 9 ++++++--- eth/filters/api_test.go | 24 ++++++------------------ eth/filters/filter.go | 22 +++++++--------------- eth/filters/filter_system.go | 5 ----- eth/filters/filter_system_test.go | 12 +++++++----- 7 files changed, 68 insertions(+), 48 deletions(-) diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go index f3ed405a61..e33de018a5 100644 --- a/core/bloombits/matcher.go +++ b/core/bloombits/matcher.go @@ -80,7 +80,8 @@ type Matcher struct { } // NewMatcher creates a new pipeline for retrieving bloom bit streams and doing -// address and topic filtering on them. +// address and topic filtering on them. Setting a filter component to `nil` is +// allowed and will result in that filter rule being skipped (OR 0x11...1). func NewMatcher(sectionSize uint64, filters [][][]byte) *Matcher { // Create the matcher instance m := &Matcher{ @@ -95,11 +96,22 @@ func NewMatcher(sectionSize uint64, filters [][][]byte) *Matcher { m.filters = nil for _, filter := range filters { + // Gather the bit indexes of the filter rule, special casing the nil filter + if len(filter) == 0 { + continue + } bloomBits := make([]bloomIndexes, len(filter)) for i, clause := range filter { + if clause == nil { + bloomBits = nil + break + } bloomBits[i] = calcBloomIndexes(clause) } - m.filters = append(m.filters, bloomBits) + // Accumulate the filter rules if no nil rule was within + if bloomBits != nil { + m.filters = append(m.filters, bloomBits) + } } // For every bit, create a scheduler to load/download the bit vectors for _, bloomIndexLists := range m.filters { diff --git a/core/bloombits/matcher_test.go b/core/bloombits/matcher_test.go index f0198c4e38..f95d0ea9e0 100644 --- a/core/bloombits/matcher_test.go +++ b/core/bloombits/matcher_test.go @@ -21,10 +21,38 @@ import ( "sync/atomic" "testing" "time" + + "github.com/ethereum/go-ethereum/common" ) const testSectionSize = 4096 +// Tests that wildcard filter rules (nil) can be specified and are handled well. +func TestMatcherWildcards(t *testing.T) { + matcher := NewMatcher(testSectionSize, [][][]byte{ + [][]byte{common.Address{}.Bytes(), common.Address{0x01}.Bytes()}, // Default address is not a wildcard + [][]byte{common.Hash{}.Bytes(), common.Hash{0x01}.Bytes()}, // Default hash is not a wildcard + [][]byte{common.Hash{0x01}.Bytes()}, // Plain rule, sanity check + [][]byte{common.Hash{0x01}.Bytes(), nil}, // Wildcard suffix, drop rule + [][]byte{nil, common.Hash{0x01}.Bytes()}, // Wildcard prefix, drop rule + [][]byte{nil, nil}, // Wildcard combo, drop rule + [][]byte{}, // Inited wildcard rule, drop rule + nil, // Proper wildcard rule, drop rule + }) + if len(matcher.filters) != 3 { + t.Fatalf("filter system size mismatch: have %d, want %d", len(matcher.filters), 3) + } + if len(matcher.filters[0]) != 2 { + t.Fatalf("address clause size mismatch: have %d, want %d", len(matcher.filters[0]), 2) + } + if len(matcher.filters[1]) != 2 { + t.Fatalf("combo topic clause size mismatch: have %d, want %d", len(matcher.filters[1]), 2) + } + if len(matcher.filters[2]) != 1 { + t.Fatalf("singletone topic clause size mismatch: have %d, want %d", len(matcher.filters[2]), 1) + } +} + // Tests the matcher pipeline on a single continuous workflow without interrupts. func TestMatcherContinuous(t *testing.T) { testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 100000, false, 75) diff --git a/eth/filters/api.go b/eth/filters/api.go index 6e1d48adb6..03c1d6afc8 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -498,7 +498,6 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error { switch topic := t.(type) { case nil: // ignore topic when matching logs - args.Topics[i] = []common.Hash{{}} case string: // match specific topic @@ -507,12 +506,16 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error { return err } args.Topics[i] = []common.Hash{top} + case []interface{}: // or case e.g. [null, "topic0", "topic1"] for _, rawTopic := range topic { if rawTopic == nil { - args.Topics[i] = append(args.Topics[i], common.Hash{}) - } else if topic, ok := rawTopic.(string); ok { + // null component, match all + args.Topics[i] = nil + break + } + if topic, ok := rawTopic.(string); ok { parsed, err := decodeTopic(topic) if err != nil { return err diff --git a/eth/filters/api_test.go b/eth/filters/api_test.go index 068a5ea243..4ae37f9779 100644 --- a/eth/filters/api_test.go +++ b/eth/filters/api_test.go @@ -34,7 +34,6 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) { topic0 = common.HexToHash("3ac225168df54212a25c1c01fd35bebfea408fdac2e31ddd6f80a4bbf9a5f1ca") topic1 = common.HexToHash("9084a792d2f8b16a62b882fd56f7860c07bf5fa91dd8a2ae7e809e5180fef0b3") topic2 = common.HexToHash("6ccae1c4af4152f460ff510e573399795dfab5dcf1fa60d1f33ac8fdc1e480ce") - nullTopic = common.Hash{} ) // default values @@ -150,11 +149,8 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) { if test6.Topics[0][0] != topic0 { t.Fatalf("got %x, expected %x", test6.Topics[0][0], topic0) } - if len(test6.Topics[1]) != 1 { - t.Fatalf("expected 1 topic, got %d", len(test6.Topics[1])) - } - if test6.Topics[1][0] != nullTopic { - t.Fatalf("got %x, expected empty hash", test6.Topics[1][0]) + if len(test6.Topics[1]) != 0 { + t.Fatalf("expected 0 topic, got %d", len(test6.Topics[1])) } if len(test6.Topics[2]) != 1 { t.Fatalf("expected 1 topic, got %d", len(test6.Topics[2])) @@ -180,18 +176,10 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) { topic0, topic1, test7.Topics[0][0], test7.Topics[0][1], ) } - if len(test7.Topics[1]) != 1 { - t.Fatalf("expected 1 topic, got %d topics", len(test7.Topics[1])) - } - if test7.Topics[1][0] != nullTopic { - t.Fatalf("expected empty hash, got %x", test7.Topics[1][0]) - } - if len(test7.Topics[2]) != 2 { - t.Fatalf("expected 2 topics, got %d topics", len(test7.Topics[2])) + if len(test7.Topics[1]) != 0 { + t.Fatalf("expected 0 topic, got %d topics", len(test7.Topics[1])) } - if test7.Topics[2][0] != topic2 || test7.Topics[2][1] != nullTopic { - t.Fatalf("invalid topics expected [%x,%x], got [%x,%x]", - topic2, nullTopic, test7.Topics[2][0], test7.Topics[2][1], - ) + if len(test7.Topics[2]) != 0 { + t.Fatalf("expected 0 topics, got %d topics", len(test7.Topics[2])) } } diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 4f6c30058e..026cbf95ca 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -60,7 +60,9 @@ type Filter struct { // New creates a new filter which uses a bloom filter on blocks to figure out whether // a particular block is interesting or not. func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { - // Flatten the address and topic filter clauses into a single filter system + // Flatten the address and topic filter clauses into a single bloombits filter + // system. Since the bloombits are not positional, nil topics are permitted, + // which get flattened into a nil byte slice. var filters [][][]byte if len(addresses) > 0 { filter := make([][]byte, len(addresses)) @@ -235,32 +237,24 @@ Logs: if len(addresses) > 0 && !includes(addresses, log.Address) { continue } - - logTopics := make([]common.Hash, len(topics)) - copy(logTopics, log.Topics) - // If the to filtered topics is greater than the amount of topics in logs, skip. if len(topics) > len(log.Topics) { continue Logs } - for i, topics := range topics { - var match bool + match := len(topics) == 0 // empty rule set == wildcard for _, topic := range topics { - // common.Hash{} is a match all (wildcard) - if (topic == common.Hash{}) || log.Topics[i] == topic { + if log.Topics[i] == topic { match = true break } } - if !match { continue Logs } } ret = append(ret, log) } - return ret } @@ -273,16 +267,15 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo break } } - if !included { return false } } for _, sub := range topics { - var included bool + included := len(sub) == 0 // empty rule set == wildcard for _, topic := range sub { - if (topic == common.Hash{}) || types.BloomLookup(bloom, topic) { + if types.BloomLookup(bloom, topic) { included = true break } @@ -291,6 +284,5 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo return false } } - return true } diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 00ade0ffb2..e08cedb274 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -212,7 +212,6 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit FilterCriteria, logs chan installed: make(chan struct{}), err: make(chan error), } - return es.subscribe(sub) } @@ -230,7 +229,6 @@ func (es *EventSystem) subscribeLogs(crit FilterCriteria, logs chan []*types.Log installed: make(chan struct{}), err: make(chan error), } - return es.subscribe(sub) } @@ -248,7 +246,6 @@ func (es *EventSystem) subscribePendingLogs(crit FilterCriteria, logs chan []*ty installed: make(chan struct{}), err: make(chan error), } - return es.subscribe(sub) } @@ -265,7 +262,6 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti installed: make(chan struct{}), err: make(chan error), } - return es.subscribe(sub) } @@ -282,7 +278,6 @@ func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscr installed: make(chan struct{}), err: make(chan error), } - return es.subscribe(sub) } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 664ce07a5f..bc3511f23a 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -363,7 +363,7 @@ func TestLogFilter(t *testing.T) { // match all 0: {FilterCriteria{}, allLogs, ""}, // match none due to no matching addresses - 1: {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{allLogs[0].Topics}}, []*types.Log{}, ""}, + 1: {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, ""}, // match logs based on addresses, ignore topics 2: {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""}, // match none due to no matching topics (match with address) @@ -384,6 +384,8 @@ func TestLogFilter(t *testing.T) { 10: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""}, // all "mined" and pending logs with topic firstTopic 11: {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), Topics: [][]common.Hash{{firstTopic}}}, expectedCase11, ""}, + // match all logs due to wildcard topic + 12: {FilterCriteria{Topics: [][]common.Hash{nil}}, allLogs[1:], ""}, } ) @@ -459,7 +461,7 @@ func TestPendingLogsSubscription(t *testing.T) { firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") thirdTopic = common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333") - forthTopic = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444") + fourthTopic = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444") notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999") allLogs = []core.PendingLogsEvent{ @@ -471,7 +473,7 @@ func TestPendingLogsSubscription(t *testing.T) { {Logs: []*types.Log{ {Address: thirdAddress, Topics: []common.Hash{firstTopic}, BlockNumber: 5}, {Address: thirdAddress, Topics: []common.Hash{thirdTopic}, BlockNumber: 5}, - {Address: thirdAddress, Topics: []common.Hash{forthTopic}, BlockNumber: 5}, + {Address: thirdAddress, Topics: []common.Hash{fourthTopic}, BlockNumber: 5}, {Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 5}, }}, } @@ -493,7 +495,7 @@ func TestPendingLogsSubscription(t *testing.T) { // match all {FilterCriteria{}, convertLogs(allLogs), nil, nil}, // match none due to no matching addresses - {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{{}}}, []*types.Log{}, nil, nil}, + {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, nil, nil}, // match logs based on addresses, ignore topics {FilterCriteria{Addresses: []common.Address{firstAddr}}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil}, // match none due to no matching topics (match with address) @@ -505,7 +507,7 @@ func TestPendingLogsSubscription(t *testing.T) { // block numbers are ignored for filters created with New***Filter, these return all logs that match the given criteria when the state changes {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil}, // multiple pending logs, should match only 2 topics from the logs in block 5 - {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, forthTopic}}}, []*types.Log{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil}, + {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}}, []*types.Log{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil}, } )