|
|
|
@ -24,7 +24,6 @@ import ( |
|
|
|
|
"sync/atomic" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"github.com/ethereum/go-ethereum/common" |
|
|
|
|
"github.com/ethereum/go-ethereum/common/bitutil" |
|
|
|
|
"github.com/ethereum/go-ethereum/crypto" |
|
|
|
|
) |
|
|
|
@ -68,8 +67,7 @@ type Retrieval struct { |
|
|
|
|
type Matcher struct { |
|
|
|
|
sectionSize uint64 // Size of the data batches to filter on
|
|
|
|
|
|
|
|
|
|
addresses []bloomIndexes // Addresses the system is filtering for
|
|
|
|
|
topics [][]bloomIndexes // Topics the system is filtering for
|
|
|
|
|
filters [][]bloomIndexes // Filter the system is matching for
|
|
|
|
|
schedulers map[uint]*scheduler // Retrieval schedulers for loading bloom bits
|
|
|
|
|
|
|
|
|
|
retrievers chan chan uint // Retriever processes waiting for bit allocations
|
|
|
|
@ -82,7 +80,8 @@ type Matcher struct { |
|
|
|
|
|
|
|
|
|
// NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
|
|
|
|
|
// address and topic filtering on them.
|
|
|
|
|
func NewMatcher(sectionSize uint64, addresses []common.Address, topics [][]common.Hash) *Matcher { |
|
|
|
|
func NewMatcher(sectionSize uint64, filters [][][]byte) *Matcher { |
|
|
|
|
// Create the matcher instance
|
|
|
|
|
m := &Matcher{ |
|
|
|
|
sectionSize: sectionSize, |
|
|
|
|
schedulers: make(map[uint]*scheduler), |
|
|
|
@ -91,48 +90,25 @@ func NewMatcher(sectionSize uint64, addresses []common.Address, topics [][]commo |
|
|
|
|
retrievals: make(chan chan *Retrieval), |
|
|
|
|
deliveries: make(chan *Retrieval), |
|
|
|
|
} |
|
|
|
|
m.setAddresses(addresses) |
|
|
|
|
m.setTopics(topics) |
|
|
|
|
return m |
|
|
|
|
} |
|
|
|
|
// Calculate the bloom bit indexes for the groups we're interested in
|
|
|
|
|
m.filters = nil |
|
|
|
|
|
|
|
|
|
// setAddresses configures the matcher to only return logs that are generated
|
|
|
|
|
// from addresses that are included in the given list.
|
|
|
|
|
func (m *Matcher) setAddresses(addresses []common.Address) { |
|
|
|
|
// Calculate the bloom bit indexes for the addresses we're interested in
|
|
|
|
|
m.addresses = make([]bloomIndexes, len(addresses)) |
|
|
|
|
for i, address := range addresses { |
|
|
|
|
m.addresses[i] = calcBloomIndexes(address.Bytes()) |
|
|
|
|
} |
|
|
|
|
// For every bit, create a scheduler to load/download the bit vectors
|
|
|
|
|
for _, bloomIndexList := range m.addresses { |
|
|
|
|
for _, bloomIndex := range bloomIndexList { |
|
|
|
|
m.addScheduler(bloomIndex) |
|
|
|
|
for _, filter := range filters { |
|
|
|
|
bloomBits := make([]bloomIndexes, len(filter)) |
|
|
|
|
for i, clause := range filter { |
|
|
|
|
bloomBits[i] = calcBloomIndexes(clause) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// setTopics configures the matcher to only return logs that have topics matching
|
|
|
|
|
// the given list.
|
|
|
|
|
func (m *Matcher) setTopics(topicsList [][]common.Hash) { |
|
|
|
|
// Calculate the bloom bit indexes for the topics we're interested in
|
|
|
|
|
m.topics = nil |
|
|
|
|
|
|
|
|
|
for _, topics := range topicsList { |
|
|
|
|
bloomBits := make([]bloomIndexes, len(topics)) |
|
|
|
|
for i, topic := range topics { |
|
|
|
|
bloomBits[i] = calcBloomIndexes(topic.Bytes()) |
|
|
|
|
} |
|
|
|
|
m.topics = append(m.topics, bloomBits) |
|
|
|
|
m.filters = append(m.filters, bloomBits) |
|
|
|
|
} |
|
|
|
|
// For every bit, create a scheduler to load/download the bit vectors
|
|
|
|
|
for _, bloomIndexLists := range m.topics { |
|
|
|
|
for _, bloomIndexLists := range m.filters { |
|
|
|
|
for _, bloomIndexList := range bloomIndexLists { |
|
|
|
|
for _, bloomIndex := range bloomIndexList { |
|
|
|
|
m.addScheduler(bloomIndex) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return m |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// addScheduler adds a bit stream retrieval scheduler for the given bit index if
|
|
|
|
@ -250,14 +226,10 @@ func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) ch |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
// Assemble the daisy-chained filtering pipeline
|
|
|
|
|
blooms := m.topics |
|
|
|
|
if len(m.addresses) > 0 { |
|
|
|
|
blooms = append([][]bloomIndexes{m.addresses}, blooms...) |
|
|
|
|
} |
|
|
|
|
next := source |
|
|
|
|
dist := make(chan *request, buffer) |
|
|
|
|
|
|
|
|
|
for _, bloom := range blooms { |
|
|
|
|
for _, bloom := range m.filters { |
|
|
|
|
next = m.subMatch(next, dist, bloom, session) |
|
|
|
|
} |
|
|
|
|
// Start the request distribution
|
|
|
|
|