mirror of https://github.com/ethereum/go-ethereum
Merge pull request #14631 from zsfelfoldi/bloombits2
core/bloombits, eth/filter: transformed bloom bitmap based log searchpull/15103/head
commit
c4d21bc8e5
@ -0,0 +1,18 @@ |
|||||||
|
// Copyright 2017 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
// Package bloombits implements bloom filtering on batches of data.
|
||||||
|
package bloombits |
@ -0,0 +1,87 @@ |
|||||||
|
// Copyright 2017 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package bloombits |
||||||
|
|
||||||
|
import ( |
||||||
|
"errors" |
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/core/types" |
||||||
|
) |
||||||
|
|
||||||
|
// errSectionOutOfBounds is returned if the user tried to add more bloom filters
|
||||||
|
// to the batch than available space, or if tries to retrieve above the capacity,
|
||||||
|
var errSectionOutOfBounds = errors.New("section out of bounds") |
||||||
|
|
||||||
|
// Generator takes a number of bloom filters and generates the rotated bloom bits
|
||||||
|
// to be used for batched filtering.
|
||||||
|
type Generator struct { |
||||||
|
blooms [types.BloomBitLength][]byte // Rotated blooms for per-bit matching
|
||||||
|
sections uint // Number of sections to batch together
|
||||||
|
nextBit uint // Next bit to set when adding a bloom
|
||||||
|
} |
||||||
|
|
||||||
|
// NewGenerator creates a rotated bloom generator that can iteratively fill a
|
||||||
|
// batched bloom filter's bits.
|
||||||
|
func NewGenerator(sections uint) (*Generator, error) { |
||||||
|
if sections%8 != 0 { |
||||||
|
return nil, errors.New("section count not multiple of 8") |
||||||
|
} |
||||||
|
b := &Generator{sections: sections} |
||||||
|
for i := 0; i < types.BloomBitLength; i++ { |
||||||
|
b.blooms[i] = make([]byte, sections/8) |
||||||
|
} |
||||||
|
return b, nil |
||||||
|
} |
||||||
|
|
||||||
|
// AddBloom takes a single bloom filter and sets the corresponding bit column
|
||||||
|
// in memory accordingly.
|
||||||
|
func (b *Generator) AddBloom(index uint, bloom types.Bloom) error { |
||||||
|
// Make sure we're not adding more bloom filters than our capacity
|
||||||
|
if b.nextBit >= b.sections { |
||||||
|
return errSectionOutOfBounds |
||||||
|
} |
||||||
|
if b.nextBit != index { |
||||||
|
return errors.New("bloom filter with unexpected index") |
||||||
|
} |
||||||
|
// Rotate the bloom and insert into our collection
|
||||||
|
byteIndex := b.nextBit / 8 |
||||||
|
bitMask := byte(1) << byte(7-b.nextBit%8) |
||||||
|
|
||||||
|
for i := 0; i < types.BloomBitLength; i++ { |
||||||
|
bloomByteIndex := types.BloomByteLength - 1 - i/8 |
||||||
|
bloomBitMask := byte(1) << byte(i%8) |
||||||
|
|
||||||
|
if (bloom[bloomByteIndex] & bloomBitMask) != 0 { |
||||||
|
b.blooms[i][byteIndex] |= bitMask |
||||||
|
} |
||||||
|
} |
||||||
|
b.nextBit++ |
||||||
|
|
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
// Bitset returns the bit vector belonging to the given bit index after all
|
||||||
|
// blooms have been added.
|
||||||
|
func (b *Generator) Bitset(idx uint) ([]byte, error) { |
||||||
|
if b.nextBit != b.sections { |
||||||
|
return nil, errors.New("bloom not fully generated yet") |
||||||
|
} |
||||||
|
if idx >= b.sections { |
||||||
|
return nil, errSectionOutOfBounds |
||||||
|
} |
||||||
|
return b.blooms[idx], nil |
||||||
|
} |
@ -0,0 +1,60 @@ |
|||||||
|
// Copyright 2017 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package bloombits |
||||||
|
|
||||||
|
import ( |
||||||
|
"bytes" |
||||||
|
"math/rand" |
||||||
|
"testing" |
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/core/types" |
||||||
|
) |
||||||
|
|
||||||
|
// Tests that batched bloom bits are correctly rotated from the input bloom
|
||||||
|
// filters.
|
||||||
|
func TestGenerator(t *testing.T) { |
||||||
|
// Generate the input and the rotated output
|
||||||
|
var input, output [types.BloomBitLength][types.BloomByteLength]byte |
||||||
|
|
||||||
|
for i := 0; i < types.BloomBitLength; i++ { |
||||||
|
for j := 0; j < types.BloomBitLength; j++ { |
||||||
|
bit := byte(rand.Int() % 2) |
||||||
|
|
||||||
|
input[i][j/8] |= bit << byte(7-j%8) |
||||||
|
output[types.BloomBitLength-1-j][i/8] |= bit << byte(7-i%8) |
||||||
|
} |
||||||
|
} |
||||||
|
// Crunch the input through the generator and verify the result
|
||||||
|
gen, err := NewGenerator(types.BloomBitLength) |
||||||
|
if err != nil { |
||||||
|
t.Fatalf("failed to create bloombit generator: %v", err) |
||||||
|
} |
||||||
|
for i, bloom := range input { |
||||||
|
if err := gen.AddBloom(uint(i), bloom); err != nil { |
||||||
|
t.Fatalf("bloom %d: failed to add: %v", i, err) |
||||||
|
} |
||||||
|
} |
||||||
|
for i, want := range output { |
||||||
|
have, err := gen.Bitset(uint(i)) |
||||||
|
if err != nil { |
||||||
|
t.Fatalf("output %d: failed to retrieve bits: %v", i, err) |
||||||
|
} |
||||||
|
if !bytes.Equal(have, want[:]) { |
||||||
|
t.Errorf("output %d: bit vector mismatch have %x, want %x", i, have, want) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,615 @@ |
|||||||
|
// Copyright 2017 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package bloombits |
||||||
|
|
||||||
|
import ( |
||||||
|
"bytes" |
||||||
|
"errors" |
||||||
|
"math" |
||||||
|
"sort" |
||||||
|
"sync" |
||||||
|
"sync/atomic" |
||||||
|
"time" |
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/common/bitutil" |
||||||
|
"github.com/ethereum/go-ethereum/crypto" |
||||||
|
) |
||||||
|
|
||||||
|
// bloomIndexes represents the bit indexes inside the bloom filter that belong
|
||||||
|
// to some key.
|
||||||
|
type bloomIndexes [3]uint |
||||||
|
|
||||||
|
// calcBloomIndexes returns the bloom filter bit indexes belonging to the given key.
|
||||||
|
func calcBloomIndexes(b []byte) bloomIndexes { |
||||||
|
b = crypto.Keccak256(b) |
||||||
|
|
||||||
|
var idxs bloomIndexes |
||||||
|
for i := 0; i < len(idxs); i++ { |
||||||
|
idxs[i] = (uint(b[2*i])<<8)&2047 + uint(b[2*i+1]) |
||||||
|
} |
||||||
|
return idxs |
||||||
|
} |
||||||
|
|
||||||
|
// partialMatches with a non-nil vector represents a section in which some sub-
|
||||||
|
// matchers have already found potential matches. Subsequent sub-matchers will
|
||||||
|
// binary AND their matches with this vector. If vector is nil, it represents a
|
||||||
|
// section to be processed by the first sub-matcher.
|
||||||
|
type partialMatches struct { |
||||||
|
section uint64 |
||||||
|
bitset []byte |
||||||
|
} |
||||||
|
|
||||||
|
// Retrieval represents a request for retrieval task assignments for a given
|
||||||
|
// bit with the given number of fetch elements, or a response for such a request.
|
||||||
|
// It can also have the actual results set to be used as a delivery data struct.
|
||||||
|
type Retrieval struct { |
||||||
|
Bit uint |
||||||
|
Sections []uint64 |
||||||
|
Bitsets [][]byte |
||||||
|
} |
||||||
|
|
||||||
|
// Matcher is a pipelined system of schedulers and logic matchers which perform
|
||||||
|
// binary AND/OR operations on the bit-streams, creating a stream of potential
|
||||||
|
// blocks to inspect for data content.
|
||||||
|
type Matcher struct { |
||||||
|
sectionSize uint64 // Size of the data batches to filter on
|
||||||
|
|
||||||
|
filters [][]bloomIndexes // Filter the system is matching for
|
||||||
|
schedulers map[uint]*scheduler // Retrieval schedulers for loading bloom bits
|
||||||
|
|
||||||
|
retrievers chan chan uint // Retriever processes waiting for bit allocations
|
||||||
|
counters chan chan uint // Retriever processes waiting for task count reports
|
||||||
|
retrievals chan chan *Retrieval // Retriever processes waiting for task allocations
|
||||||
|
deliveries chan *Retrieval // Retriever processes waiting for task response deliveries
|
||||||
|
|
||||||
|
running uint32 // Atomic flag whether a session is live or not
|
||||||
|
} |
||||||
|
|
||||||
|
// NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
|
||||||
|
// address and topic filtering on them.
|
||||||
|
func NewMatcher(sectionSize uint64, filters [][][]byte) *Matcher { |
||||||
|
// Create the matcher instance
|
||||||
|
m := &Matcher{ |
||||||
|
sectionSize: sectionSize, |
||||||
|
schedulers: make(map[uint]*scheduler), |
||||||
|
retrievers: make(chan chan uint), |
||||||
|
counters: make(chan chan uint), |
||||||
|
retrievals: make(chan chan *Retrieval), |
||||||
|
deliveries: make(chan *Retrieval), |
||||||
|
} |
||||||
|
// Calculate the bloom bit indexes for the groups we're interested in
|
||||||
|
m.filters = nil |
||||||
|
|
||||||
|
for _, filter := range filters { |
||||||
|
bloomBits := make([]bloomIndexes, len(filter)) |
||||||
|
for i, clause := range filter { |
||||||
|
bloomBits[i] = calcBloomIndexes(clause) |
||||||
|
} |
||||||
|
m.filters = append(m.filters, bloomBits) |
||||||
|
} |
||||||
|
// For every bit, create a scheduler to load/download the bit vectors
|
||||||
|
for _, bloomIndexLists := range m.filters { |
||||||
|
for _, bloomIndexList := range bloomIndexLists { |
||||||
|
for _, bloomIndex := range bloomIndexList { |
||||||
|
m.addScheduler(bloomIndex) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
return m |
||||||
|
} |
||||||
|
|
||||||
|
// addScheduler adds a bit stream retrieval scheduler for the given bit index if
|
||||||
|
// it has not existed before. If the bit is already selected for filtering, the
|
||||||
|
// existing scheduler can be used.
|
||||||
|
func (m *Matcher) addScheduler(idx uint) { |
||||||
|
if _, ok := m.schedulers[idx]; ok { |
||||||
|
return |
||||||
|
} |
||||||
|
m.schedulers[idx] = newScheduler(idx) |
||||||
|
} |
||||||
|
|
||||||
|
// Start starts the matching process and returns a stream of bloom matches in
|
||||||
|
// a given range of blocks. If there are no more matches in the range, the result
|
||||||
|
// channel is closed.
|
||||||
|
func (m *Matcher) Start(begin, end uint64, results chan uint64) (*MatcherSession, error) { |
||||||
|
// Make sure we're not creating concurrent sessions
|
||||||
|
if atomic.SwapUint32(&m.running, 1) == 1 { |
||||||
|
return nil, errors.New("matcher already running") |
||||||
|
} |
||||||
|
defer atomic.StoreUint32(&m.running, 0) |
||||||
|
|
||||||
|
// Initiate a new matching round
|
||||||
|
session := &MatcherSession{ |
||||||
|
matcher: m, |
||||||
|
quit: make(chan struct{}), |
||||||
|
kill: make(chan struct{}), |
||||||
|
} |
||||||
|
for _, scheduler := range m.schedulers { |
||||||
|
scheduler.reset() |
||||||
|
} |
||||||
|
sink := m.run(begin, end, cap(results), session) |
||||||
|
|
||||||
|
// Read the output from the result sink and deliver to the user
|
||||||
|
session.pend.Add(1) |
||||||
|
go func() { |
||||||
|
defer session.pend.Done() |
||||||
|
defer close(results) |
||||||
|
|
||||||
|
for { |
||||||
|
select { |
||||||
|
case <-session.quit: |
||||||
|
return |
||||||
|
|
||||||
|
case res, ok := <-sink: |
||||||
|
// New match result found
|
||||||
|
if !ok { |
||||||
|
return |
||||||
|
} |
||||||
|
// Calculate the first and last blocks of the section
|
||||||
|
sectionStart := res.section * m.sectionSize |
||||||
|
|
||||||
|
first := sectionStart |
||||||
|
if begin > first { |
||||||
|
first = begin |
||||||
|
} |
||||||
|
last := sectionStart + m.sectionSize - 1 |
||||||
|
if end < last { |
||||||
|
last = end |
||||||
|
} |
||||||
|
// Iterate over all the blocks in the section and return the matching ones
|
||||||
|
for i := first; i <= last; i++ { |
||||||
|
// Skip the entire byte if no matches are found inside
|
||||||
|
next := res.bitset[(i-sectionStart)/8] |
||||||
|
if next == 0 { |
||||||
|
i += 7 |
||||||
|
continue |
||||||
|
} |
||||||
|
// Some bit it set, do the actual submatching
|
||||||
|
if bit := 7 - i%8; next&(1<<bit) != 0 { |
||||||
|
select { |
||||||
|
case <-session.quit: |
||||||
|
return |
||||||
|
case results <- i: |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
}() |
||||||
|
return session, nil |
||||||
|
} |
||||||
|
|
||||||
|
// run creates a daisy-chain of sub-matchers, one for the address set and one
|
||||||
|
// for each topic set, each sub-matcher receiving a section only if the previous
|
||||||
|
// ones have all found a potential match in one of the blocks of the section,
|
||||||
|
// then binary AND-ing its own matches and forwaring the result to the next one.
|
||||||
|
//
|
||||||
|
// The method starts feeding the section indexes into the first sub-matcher on a
|
||||||
|
// new goroutine and returns a sink channel receiving the results.
|
||||||
|
func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) chan *partialMatches { |
||||||
|
// Create the source channel and feed section indexes into
|
||||||
|
source := make(chan *partialMatches, buffer) |
||||||
|
|
||||||
|
session.pend.Add(1) |
||||||
|
go func() { |
||||||
|
defer session.pend.Done() |
||||||
|
defer close(source) |
||||||
|
|
||||||
|
for i := begin / m.sectionSize; i <= end/m.sectionSize; i++ { |
||||||
|
select { |
||||||
|
case <-session.quit: |
||||||
|
return |
||||||
|
case source <- &partialMatches{i, bytes.Repeat([]byte{0xff}, int(m.sectionSize/8))}: |
||||||
|
} |
||||||
|
} |
||||||
|
}() |
||||||
|
// Assemble the daisy-chained filtering pipeline
|
||||||
|
next := source |
||||||
|
dist := make(chan *request, buffer) |
||||||
|
|
||||||
|
for _, bloom := range m.filters { |
||||||
|
next = m.subMatch(next, dist, bloom, session) |
||||||
|
} |
||||||
|
// Start the request distribution
|
||||||
|
session.pend.Add(1) |
||||||
|
go m.distributor(dist, session) |
||||||
|
|
||||||
|
return next |
||||||
|
} |
||||||
|
|
||||||
|
// subMatch creates a sub-matcher that filters for a set of addresses or topics, binary OR-s those matches, then
|
||||||
|
// binary AND-s the result to the daisy-chain input (source) and forwards it to the daisy-chain output.
|
||||||
|
// The matches of each address/topic are calculated by fetching the given sections of the three bloom bit indexes belonging to
|
||||||
|
// that address/topic, and binary AND-ing those vectors together.
|
||||||
|
func (m *Matcher) subMatch(source chan *partialMatches, dist chan *request, bloom []bloomIndexes, session *MatcherSession) chan *partialMatches { |
||||||
|
// Start the concurrent schedulers for each bit required by the bloom filter
|
||||||
|
sectionSources := make([][3]chan uint64, len(bloom)) |
||||||
|
sectionSinks := make([][3]chan []byte, len(bloom)) |
||||||
|
for i, bits := range bloom { |
||||||
|
for j, bit := range bits { |
||||||
|
sectionSources[i][j] = make(chan uint64, cap(source)) |
||||||
|
sectionSinks[i][j] = make(chan []byte, cap(source)) |
||||||
|
|
||||||
|
m.schedulers[bit].run(sectionSources[i][j], dist, sectionSinks[i][j], session.quit, &session.pend) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
process := make(chan *partialMatches, cap(source)) // entries from source are forwarded here after fetches have been initiated
|
||||||
|
results := make(chan *partialMatches, cap(source)) |
||||||
|
|
||||||
|
session.pend.Add(2) |
||||||
|
go func() { |
||||||
|
// Tear down the goroutine and terminate all source channels
|
||||||
|
defer session.pend.Done() |
||||||
|
defer close(process) |
||||||
|
|
||||||
|
defer func() { |
||||||
|
for _, bloomSources := range sectionSources { |
||||||
|
for _, bitSource := range bloomSources { |
||||||
|
close(bitSource) |
||||||
|
} |
||||||
|
} |
||||||
|
}() |
||||||
|
// Read sections from the source channel and multiplex into all bit-schedulers
|
||||||
|
for { |
||||||
|
select { |
||||||
|
case <-session.quit: |
||||||
|
return |
||||||
|
|
||||||
|
case subres, ok := <-source: |
||||||
|
// New subresult from previous link
|
||||||
|
if !ok { |
||||||
|
return |
||||||
|
} |
||||||
|
// Multiplex the section index to all bit-schedulers
|
||||||
|
for _, bloomSources := range sectionSources { |
||||||
|
for _, bitSource := range bloomSources { |
||||||
|
select { |
||||||
|
case <-session.quit: |
||||||
|
return |
||||||
|
case bitSource <- subres.section: |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
// Notify the processor that this section will become available
|
||||||
|
select { |
||||||
|
case <-session.quit: |
||||||
|
return |
||||||
|
case process <- subres: |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
}() |
||||||
|
|
||||||
|
go func() { |
||||||
|
// Tear down the goroutine and terminate the final sink channel
|
||||||
|
defer session.pend.Done() |
||||||
|
defer close(results) |
||||||
|
|
||||||
|
// Read the source notifications and collect the delivered results
|
||||||
|
for { |
||||||
|
select { |
||||||
|
case <-session.quit: |
||||||
|
return |
||||||
|
|
||||||
|
case subres, ok := <-process: |
||||||
|
// Notified of a section being retrieved
|
||||||
|
if !ok { |
||||||
|
return |
||||||
|
} |
||||||
|
// Gather all the sub-results and merge them together
|
||||||
|
var orVector []byte |
||||||
|
for _, bloomSinks := range sectionSinks { |
||||||
|
var andVector []byte |
||||||
|
for _, bitSink := range bloomSinks { |
||||||
|
var data []byte |
||||||
|
select { |
||||||
|
case <-session.quit: |
||||||
|
return |
||||||
|
case data = <-bitSink: |
||||||
|
} |
||||||
|
if andVector == nil { |
||||||
|
andVector = make([]byte, int(m.sectionSize/8)) |
||||||
|
copy(andVector, data) |
||||||
|
} else { |
||||||
|
bitutil.ANDBytes(andVector, andVector, data) |
||||||
|
} |
||||||
|
} |
||||||
|
if orVector == nil { |
||||||
|
orVector = andVector |
||||||
|
} else { |
||||||
|
bitutil.ORBytes(orVector, orVector, andVector) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
if orVector == nil { |
||||||
|
orVector = make([]byte, int(m.sectionSize/8)) |
||||||
|
} |
||||||
|
if subres.bitset != nil { |
||||||
|
bitutil.ANDBytes(orVector, orVector, subres.bitset) |
||||||
|
} |
||||||
|
if bitutil.TestBytes(orVector) { |
||||||
|
select { |
||||||
|
case <-session.quit: |
||||||
|
return |
||||||
|
case results <- &partialMatches{subres.section, orVector}: |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
}() |
||||||
|
return results |
||||||
|
} |
||||||
|
|
||||||
|
// distributor receives requests from the schedulers and queues them into a set
|
||||||
|
// of pending requests, which are assigned to retrievers wanting to fulfil them.
|
||||||
|
func (m *Matcher) distributor(dist chan *request, session *MatcherSession) { |
||||||
|
defer session.pend.Done() |
||||||
|
|
||||||
|
var ( |
||||||
|
requests = make(map[uint][]uint64) // Per-bit list of section requests, ordered by section number
|
||||||
|
unallocs = make(map[uint]struct{}) // Bits with pending requests but not allocated to any retriever
|
||||||
|
retrievers chan chan uint // Waiting retrievers (toggled to nil if unallocs is empty)
|
||||||
|
) |
||||||
|
var ( |
||||||
|
allocs int // Number of active allocations to handle graceful shutdown requests
|
||||||
|
shutdown = session.quit // Shutdown request channel, will gracefully wait for pending requests
|
||||||
|
) |
||||||
|
|
||||||
|
// assign is a helper method fo try to assign a pending bit an an actively
|
||||||
|
// listening servicer, or schedule it up for later when one arrives.
|
||||||
|
assign := func(bit uint) { |
||||||
|
select { |
||||||
|
case fetcher := <-m.retrievers: |
||||||
|
allocs++ |
||||||
|
fetcher <- bit |
||||||
|
default: |
||||||
|
// No retrievers active, start listening for new ones
|
||||||
|
retrievers = m.retrievers |
||||||
|
unallocs[bit] = struct{}{} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
for { |
||||||
|
select { |
||||||
|
case <-shutdown: |
||||||
|
// Graceful shutdown requested, wait until all pending requests are honoured
|
||||||
|
if allocs == 0 { |
||||||
|
return |
||||||
|
} |
||||||
|
shutdown = nil |
||||||
|
|
||||||
|
case <-session.kill: |
||||||
|
// Pending requests not honoured in time, hard terminate
|
||||||
|
return |
||||||
|
|
||||||
|
case req := <-dist: |
||||||
|
// New retrieval request arrived to be distributed to some fetcher process
|
||||||
|
queue := requests[req.bit] |
||||||
|
index := sort.Search(len(queue), func(i int) bool { return queue[i] >= req.section }) |
||||||
|
requests[req.bit] = append(queue[:index], append([]uint64{req.section}, queue[index:]...)...) |
||||||
|
|
||||||
|
// If it's a new bit and we have waiting fetchers, allocate to them
|
||||||
|
if len(queue) == 0 { |
||||||
|
assign(req.bit) |
||||||
|
} |
||||||
|
|
||||||
|
case fetcher := <-retrievers: |
||||||
|
// New retriever arrived, find the lowest section-ed bit to assign
|
||||||
|
bit, best := uint(0), uint64(math.MaxUint64) |
||||||
|
for idx := range unallocs { |
||||||
|
if requests[idx][0] < best { |
||||||
|
bit, best = idx, requests[idx][0] |
||||||
|
} |
||||||
|
} |
||||||
|
// Stop tracking this bit (and alloc notifications if no more work is available)
|
||||||
|
delete(unallocs, bit) |
||||||
|
if len(unallocs) == 0 { |
||||||
|
retrievers = nil |
||||||
|
} |
||||||
|
allocs++ |
||||||
|
fetcher <- bit |
||||||
|
|
||||||
|
case fetcher := <-m.counters: |
||||||
|
// New task count request arrives, return number of items
|
||||||
|
fetcher <- uint(len(requests[<-fetcher])) |
||||||
|
|
||||||
|
case fetcher := <-m.retrievals: |
||||||
|
// New fetcher waiting for tasks to retrieve, assign
|
||||||
|
task := <-fetcher |
||||||
|
if want := len(task.Sections); want >= len(requests[task.Bit]) { |
||||||
|
task.Sections = requests[task.Bit] |
||||||
|
delete(requests, task.Bit) |
||||||
|
} else { |
||||||
|
task.Sections = append(task.Sections[:0], requests[task.Bit][:want]...) |
||||||
|
requests[task.Bit] = append(requests[task.Bit][:0], requests[task.Bit][want:]...) |
||||||
|
} |
||||||
|
fetcher <- task |
||||||
|
|
||||||
|
// If anything was left unallocated, try to assign to someone else
|
||||||
|
if len(requests[task.Bit]) > 0 { |
||||||
|
assign(task.Bit) |
||||||
|
} |
||||||
|
|
||||||
|
case result := <-m.deliveries: |
||||||
|
// New retrieval task response from fetcher, split out missing sections and
|
||||||
|
// deliver complete ones
|
||||||
|
var ( |
||||||
|
sections = make([]uint64, 0, len(result.Sections)) |
||||||
|
bitsets = make([][]byte, 0, len(result.Bitsets)) |
||||||
|
missing = make([]uint64, 0, len(result.Sections)) |
||||||
|
) |
||||||
|
for i, bitset := range result.Bitsets { |
||||||
|
if len(bitset) == 0 { |
||||||
|
missing = append(missing, result.Sections[i]) |
||||||
|
continue |
||||||
|
} |
||||||
|
sections = append(sections, result.Sections[i]) |
||||||
|
bitsets = append(bitsets, bitset) |
||||||
|
} |
||||||
|
m.schedulers[result.Bit].deliver(sections, bitsets) |
||||||
|
allocs-- |
||||||
|
|
||||||
|
// Reschedule missing sections and allocate bit if newly available
|
||||||
|
if len(missing) > 0 { |
||||||
|
queue := requests[result.Bit] |
||||||
|
for _, section := range missing { |
||||||
|
index := sort.Search(len(queue), func(i int) bool { return queue[i] >= section }) |
||||||
|
queue = append(queue[:index], append([]uint64{section}, queue[index:]...)...) |
||||||
|
} |
||||||
|
requests[result.Bit] = queue |
||||||
|
|
||||||
|
if len(queue) == len(missing) { |
||||||
|
assign(result.Bit) |
||||||
|
} |
||||||
|
} |
||||||
|
// If we're in the process of shutting down, terminate
|
||||||
|
if allocs == 0 && shutdown == nil { |
||||||
|
return |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// MatcherSession is returned by a started matcher to be used as a terminator
|
||||||
|
// for the actively running matching operation.
|
||||||
|
type MatcherSession struct { |
||||||
|
matcher *Matcher |
||||||
|
|
||||||
|
quit chan struct{} // Quit channel to request pipeline termination
|
||||||
|
kill chan struct{} // Term channel to signal non-graceful forced shutdown
|
||||||
|
pend sync.WaitGroup |
||||||
|
} |
||||||
|
|
||||||
|
// Close stops the matching process and waits for all subprocesses to terminate
|
||||||
|
// before returning. The timeout may be used for graceful shutdown, allowing the
|
||||||
|
// currently running retrievals to complete before this time.
|
||||||
|
func (s *MatcherSession) Close(timeout time.Duration) { |
||||||
|
// Bail out if the matcher is not running
|
||||||
|
select { |
||||||
|
case <-s.quit: |
||||||
|
return |
||||||
|
default: |
||||||
|
} |
||||||
|
// Signal termination and wait for all goroutines to tear down
|
||||||
|
close(s.quit) |
||||||
|
time.AfterFunc(timeout, func() { close(s.kill) }) |
||||||
|
s.pend.Wait() |
||||||
|
} |
||||||
|
|
||||||
|
// AllocateRetrieval assigns a bloom bit index to a client process that can either
|
||||||
|
// immediately reuest and fetch the section contents assigned to this bit or wait
|
||||||
|
// a little while for more sections to be requested.
|
||||||
|
func (s *MatcherSession) AllocateRetrieval() (uint, bool) { |
||||||
|
fetcher := make(chan uint) |
||||||
|
|
||||||
|
select { |
||||||
|
case <-s.quit: |
||||||
|
return 0, false |
||||||
|
case s.matcher.retrievers <- fetcher: |
||||||
|
bit, ok := <-fetcher |
||||||
|
return bit, ok |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// PendingSections returns the number of pending section retrievals belonging to
|
||||||
|
// the given bloom bit index.
|
||||||
|
func (s *MatcherSession) PendingSections(bit uint) int { |
||||||
|
fetcher := make(chan uint) |
||||||
|
|
||||||
|
select { |
||||||
|
case <-s.quit: |
||||||
|
return 0 |
||||||
|
case s.matcher.counters <- fetcher: |
||||||
|
fetcher <- bit |
||||||
|
return int(<-fetcher) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// AllocateSections assigns all or part of an already allocated bit-task queue
|
||||||
|
// to the requesting process.
|
||||||
|
func (s *MatcherSession) AllocateSections(bit uint, count int) []uint64 { |
||||||
|
fetcher := make(chan *Retrieval) |
||||||
|
|
||||||
|
select { |
||||||
|
case <-s.quit: |
||||||
|
return nil |
||||||
|
case s.matcher.retrievals <- fetcher: |
||||||
|
task := &Retrieval{ |
||||||
|
Bit: bit, |
||||||
|
Sections: make([]uint64, count), |
||||||
|
} |
||||||
|
fetcher <- task |
||||||
|
return (<-fetcher).Sections |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// DeliverSections delivers a batch of section bit-vectors for a specific bloom
|
||||||
|
// bit index to be injected into the processing pipeline.
|
||||||
|
func (s *MatcherSession) DeliverSections(bit uint, sections []uint64, bitsets [][]byte) { |
||||||
|
select { |
||||||
|
case <-s.kill: |
||||||
|
return |
||||||
|
case s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets}: |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// Multiplex polls the matcher session for rerieval tasks and multiplexes it into
|
||||||
|
// the reuested retrieval queue to be serviced together with other sessions.
|
||||||
|
//
|
||||||
|
// This method will block for the lifetime of the session. Even after termination
|
||||||
|
// of the session, any request in-flight need to be responded to! Empty responses
|
||||||
|
// are fine though in that case.
|
||||||
|
func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) { |
||||||
|
for { |
||||||
|
// Allocate a new bloom bit index to retrieve data for, stopping when done
|
||||||
|
bit, ok := s.AllocateRetrieval() |
||||||
|
if !ok { |
||||||
|
return |
||||||
|
} |
||||||
|
// Bit allocated, throttle a bit if we're below our batch limit
|
||||||
|
if s.PendingSections(bit) < batch { |
||||||
|
select { |
||||||
|
case <-s.quit: |
||||||
|
// Session terminating, we can't meaningfully service, abort
|
||||||
|
s.AllocateSections(bit, 0) |
||||||
|
s.DeliverSections(bit, []uint64{}, [][]byte{}) |
||||||
|
return |
||||||
|
|
||||||
|
case <-time.After(wait): |
||||||
|
// Throttling up, fetch whatever's available
|
||||||
|
} |
||||||
|
} |
||||||
|
// Allocate as much as we can handle and request servicing
|
||||||
|
sections := s.AllocateSections(bit, batch) |
||||||
|
request := make(chan *Retrieval) |
||||||
|
|
||||||
|
select { |
||||||
|
case <-s.quit: |
||||||
|
// Session terminating, we can't meaningfully service, abort
|
||||||
|
s.DeliverSections(bit, sections, make([][]byte, len(sections))) |
||||||
|
return |
||||||
|
|
||||||
|
case mux <- request: |
||||||
|
// Retrieval accepted, something must arrive before we're aborting
|
||||||
|
request <- &Retrieval{Bit: bit, Sections: sections} |
||||||
|
|
||||||
|
result := <-request |
||||||
|
s.DeliverSections(result.Bit, result.Sections, result.Bitsets) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,242 @@ |
|||||||
|
// Copyright 2017 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package bloombits |
||||||
|
|
||||||
|
import ( |
||||||
|
"math/rand" |
||||||
|
"sync/atomic" |
||||||
|
"testing" |
||||||
|
"time" |
||||||
|
) |
||||||
|
|
||||||
|
const testSectionSize = 4096 |
||||||
|
|
||||||
|
// Tests the matcher pipeline on a single continuous workflow without interrupts.
|
||||||
|
func TestMatcherContinuous(t *testing.T) { |
||||||
|
testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 100000, false, 75) |
||||||
|
testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 100000, false, 81) |
||||||
|
testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 10000, false, 36) |
||||||
|
} |
||||||
|
|
||||||
|
// Tests the matcher pipeline on a constantly interrupted and resumed work pattern
|
||||||
|
// with the aim of ensuring data items are requested only once.
|
||||||
|
func TestMatcherIntermittent(t *testing.T) { |
||||||
|
testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 100000, true, 75) |
||||||
|
testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 100000, true, 81) |
||||||
|
testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 10000, true, 36) |
||||||
|
} |
||||||
|
|
||||||
|
// Tests the matcher pipeline on random input to hopefully catch anomalies.
|
||||||
|
func TestMatcherRandom(t *testing.T) { |
||||||
|
for i := 0; i < 10; i++ { |
||||||
|
testMatcherBothModes(t, makeRandomIndexes([]int{1}, 50), 10000, 0) |
||||||
|
testMatcherBothModes(t, makeRandomIndexes([]int{3}, 50), 10000, 0) |
||||||
|
testMatcherBothModes(t, makeRandomIndexes([]int{2, 2, 2}, 20), 10000, 0) |
||||||
|
testMatcherBothModes(t, makeRandomIndexes([]int{5, 5, 5}, 50), 10000, 0) |
||||||
|
testMatcherBothModes(t, makeRandomIndexes([]int{4, 4, 4}, 20), 10000, 0) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// Tests that matching on everything doesn't crash (special case internally).
|
||||||
|
func TestWildcardMatcher(t *testing.T) { |
||||||
|
testMatcherBothModes(t, nil, 10000, 0) |
||||||
|
} |
||||||
|
|
||||||
|
// makeRandomIndexes generates a random filter system, composed on multiple filter
|
||||||
|
// criteria, each having one bloom list component for the address and arbitrarilly
|
||||||
|
// many topic bloom list components.
|
||||||
|
func makeRandomIndexes(lengths []int, max int) [][]bloomIndexes { |
||||||
|
res := make([][]bloomIndexes, len(lengths)) |
||||||
|
for i, topics := range lengths { |
||||||
|
res[i] = make([]bloomIndexes, topics) |
||||||
|
for j := 0; j < topics; j++ { |
||||||
|
for k := 0; k < len(res[i][j]); k++ { |
||||||
|
res[i][j][k] = uint(rand.Intn(max-1) + 2) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
return res |
||||||
|
} |
||||||
|
|
||||||
|
// testMatcherDiffBatches runs the given matches test in single-delivery and also
|
||||||
|
// in batches delivery mode, verifying that all kinds of deliveries are handled
|
||||||
|
// correctly withn.
|
||||||
|
func testMatcherDiffBatches(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermittent bool, retrievals uint32) { |
||||||
|
singleton := testMatcher(t, filter, blocks, intermittent, retrievals, 1) |
||||||
|
batched := testMatcher(t, filter, blocks, intermittent, retrievals, 16) |
||||||
|
|
||||||
|
if singleton != batched { |
||||||
|
t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, %v in signleton vs. %v in batched mode", filter, blocks, intermittent, singleton, batched) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// testMatcherBothModes runs the given matcher test in both continuous as well as
|
||||||
|
// in intermittent mode, verifying that the request counts match each other.
|
||||||
|
func testMatcherBothModes(t *testing.T, filter [][]bloomIndexes, blocks uint64, retrievals uint32) { |
||||||
|
continuous := testMatcher(t, filter, blocks, false, retrievals, 16) |
||||||
|
intermittent := testMatcher(t, filter, blocks, true, retrievals, 16) |
||||||
|
|
||||||
|
if continuous != intermittent { |
||||||
|
t.Errorf("filter = %v blocks = %v: request count mismatch, %v in continuous vs. %v in intermittent mode", filter, blocks, continuous, intermittent) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// testMatcher is a generic tester to run the given matcher test and return the
|
||||||
|
// number of requests made for cross validation between different modes.
|
||||||
|
func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermittent bool, retrievals uint32, maxReqCount int) uint32 { |
||||||
|
// Create a new matcher an simulate our explicit random bitsets
|
||||||
|
matcher := NewMatcher(testSectionSize, nil) |
||||||
|
matcher.filters = filter |
||||||
|
|
||||||
|
for _, rule := range filter { |
||||||
|
for _, topic := range rule { |
||||||
|
for _, bit := range topic { |
||||||
|
matcher.addScheduler(bit) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
// Track the number of retrieval requests made
|
||||||
|
var requested uint32 |
||||||
|
|
||||||
|
// Start the matching session for the filter and the retriver goroutines
|
||||||
|
quit := make(chan struct{}) |
||||||
|
matches := make(chan uint64, 16) |
||||||
|
|
||||||
|
session, err := matcher.Start(0, blocks-1, matches) |
||||||
|
if err != nil { |
||||||
|
t.Fatalf("failed to stat matcher session: %v", err) |
||||||
|
} |
||||||
|
startRetrievers(session, quit, &requested, maxReqCount) |
||||||
|
|
||||||
|
// Iterate over all the blocks and verify that the pipeline produces the correct matches
|
||||||
|
for i := uint64(0); i < blocks; i++ { |
||||||
|
if expMatch3(filter, i) { |
||||||
|
match, ok := <-matches |
||||||
|
if !ok { |
||||||
|
t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, results channel closed", filter, blocks, intermittent, i) |
||||||
|
return 0 |
||||||
|
} |
||||||
|
if match != i { |
||||||
|
t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, got #%v", filter, blocks, intermittent, i, match) |
||||||
|
} |
||||||
|
// If we're testing intermittent mode, abort and restart the pipeline
|
||||||
|
if intermittent { |
||||||
|
session.Close(time.Second) |
||||||
|
close(quit) |
||||||
|
|
||||||
|
quit = make(chan struct{}) |
||||||
|
matches = make(chan uint64, 16) |
||||||
|
|
||||||
|
session, err = matcher.Start(i+1, blocks-1, matches) |
||||||
|
if err != nil { |
||||||
|
t.Fatalf("failed to stat matcher session: %v", err) |
||||||
|
} |
||||||
|
startRetrievers(session, quit, &requested, maxReqCount) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
// Ensure the result channel is torn down after the last block
|
||||||
|
match, ok := <-matches |
||||||
|
if ok { |
||||||
|
t.Errorf("filter = %v blocks = %v intermittent = %v: expected closed channel, got #%v", filter, blocks, intermittent, match) |
||||||
|
} |
||||||
|
// Clean up the session and ensure we match the expected retrieval count
|
||||||
|
session.Close(time.Second) |
||||||
|
close(quit) |
||||||
|
|
||||||
|
if retrievals != 0 && requested != retrievals { |
||||||
|
t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, have #%v, want #%v", filter, blocks, intermittent, requested, retrievals) |
||||||
|
} |
||||||
|
return requested |
||||||
|
} |
||||||
|
|
||||||
|
// startRetrievers starts a batch of goroutines listening for section requests
|
||||||
|
// and serving them.
|
||||||
|
func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *uint32, batch int) { |
||||||
|
requests := make(chan chan *Retrieval) |
||||||
|
|
||||||
|
for i := 0; i < 10; i++ { |
||||||
|
// Start a multiplexer to test multiple threaded execution
|
||||||
|
go session.Multiplex(batch, 100*time.Microsecond, requests) |
||||||
|
|
||||||
|
// Start a services to match the above multiplexer
|
||||||
|
go func() { |
||||||
|
for { |
||||||
|
// Wait for a service request or a shutdown
|
||||||
|
select { |
||||||
|
case <-quit: |
||||||
|
return |
||||||
|
|
||||||
|
case request := <-requests: |
||||||
|
task := <-request |
||||||
|
|
||||||
|
task.Bitsets = make([][]byte, len(task.Sections)) |
||||||
|
for i, section := range task.Sections { |
||||||
|
if rand.Int()%4 != 0 { // Handle occasional missing deliveries
|
||||||
|
task.Bitsets[i] = generateBitset(task.Bit, section) |
||||||
|
atomic.AddUint32(retrievals, 1) |
||||||
|
} |
||||||
|
} |
||||||
|
request <- task |
||||||
|
} |
||||||
|
} |
||||||
|
}() |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// generateBitset generates the rotated bitset for the given bloom bit and section
|
||||||
|
// numbers.
|
||||||
|
func generateBitset(bit uint, section uint64) []byte { |
||||||
|
bitset := make([]byte, testSectionSize/8) |
||||||
|
for i := 0; i < len(bitset); i++ { |
||||||
|
for b := 0; b < 8; b++ { |
||||||
|
blockIdx := section*testSectionSize + uint64(i*8+b) |
||||||
|
bitset[i] += bitset[i] |
||||||
|
if (blockIdx % uint64(bit)) == 0 { |
||||||
|
bitset[i]++ |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
return bitset |
||||||
|
} |
||||||
|
|
||||||
|
func expMatch1(filter bloomIndexes, i uint64) bool { |
||||||
|
for _, ii := range filter { |
||||||
|
if (i % uint64(ii)) != 0 { |
||||||
|
return false |
||||||
|
} |
||||||
|
} |
||||||
|
return true |
||||||
|
} |
||||||
|
|
||||||
|
func expMatch2(filter []bloomIndexes, i uint64) bool { |
||||||
|
for _, ii := range filter { |
||||||
|
if expMatch1(ii, i) { |
||||||
|
return true |
||||||
|
} |
||||||
|
} |
||||||
|
return false |
||||||
|
} |
||||||
|
|
||||||
|
func expMatch3(filter [][]bloomIndexes, i uint64) bool { |
||||||
|
for _, ii := range filter { |
||||||
|
if !expMatch2(ii, i) { |
||||||
|
return false |
||||||
|
} |
||||||
|
} |
||||||
|
return true |
||||||
|
} |
@ -0,0 +1,181 @@ |
|||||||
|
// Copyright 2017 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package bloombits |
||||||
|
|
||||||
|
import ( |
||||||
|
"sync" |
||||||
|
) |
||||||
|
|
||||||
|
// request represents a bloom retrieval task to prioritize and pull from the local
|
||||||
|
// database or remotely from the network.
|
||||||
|
type request struct { |
||||||
|
section uint64 // Section index to retrieve the a bit-vector from
|
||||||
|
bit uint // Bit index within the section to retrieve the vector of
|
||||||
|
} |
||||||
|
|
||||||
|
// response represents the state of a requested bit-vector through a scheduler.
|
||||||
|
type response struct { |
||||||
|
cached []byte // Cached bits to dedup multiple requests
|
||||||
|
done chan struct{} // Channel to allow waiting for completion
|
||||||
|
} |
||||||
|
|
||||||
|
// scheduler handles the scheduling of bloom-filter retrieval operations for
|
||||||
|
// entire section-batches belonging to a single bloom bit. Beside scheduling the
|
||||||
|
// retrieval operations, this struct also deduplicates the requests and caches
|
||||||
|
// the results to minimize network/database overhead even in complex filtering
|
||||||
|
// scenarios.
|
||||||
|
type scheduler struct { |
||||||
|
bit uint // Index of the bit in the bloom filter this scheduler is responsible for
|
||||||
|
responses map[uint64]*response // Currently pending retrieval requests or already cached responses
|
||||||
|
lock sync.Mutex // Lock protecting the responses from concurrent access
|
||||||
|
} |
||||||
|
|
||||||
|
// newScheduler creates a new bloom-filter retrieval scheduler for a specific
|
||||||
|
// bit index.
|
||||||
|
func newScheduler(idx uint) *scheduler { |
||||||
|
return &scheduler{ |
||||||
|
bit: idx, |
||||||
|
responses: make(map[uint64]*response), |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// run creates a retrieval pipeline, receiving section indexes from sections and
|
||||||
|
// returning the results in the same order through the done channel. Concurrent
|
||||||
|
// runs of the same scheduler are allowed, leading to retrieval task deduplication.
|
||||||
|
func (s *scheduler) run(sections chan uint64, dist chan *request, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) { |
||||||
|
// Create a forwarder channel between requests and responses of the same size as
|
||||||
|
// the distribution channel (since that will block the pipeline anyway).
|
||||||
|
pend := make(chan uint64, cap(dist)) |
||||||
|
|
||||||
|
// Start the pipeline schedulers to forward between user -> distributor -> user
|
||||||
|
wg.Add(2) |
||||||
|
go s.scheduleRequests(sections, dist, pend, quit, wg) |
||||||
|
go s.scheduleDeliveries(pend, done, quit, wg) |
||||||
|
} |
||||||
|
|
||||||
|
// reset cleans up any leftovers from previous runs. This is required before a
|
||||||
|
// restart to ensure the no previously requested but never delivered state will
|
||||||
|
// cause a lockup.
|
||||||
|
func (s *scheduler) reset() { |
||||||
|
s.lock.Lock() |
||||||
|
defer s.lock.Unlock() |
||||||
|
|
||||||
|
for section, res := range s.responses { |
||||||
|
if res.cached == nil { |
||||||
|
delete(s.responses, section) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// scheduleRequests reads section retrieval requests from the input channel,
|
||||||
|
// deduplicates the stream and pushes unique retrieval tasks into the distribution
|
||||||
|
// channel for a database or network layer to honour.
|
||||||
|
func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}, wg *sync.WaitGroup) { |
||||||
|
// Clean up the goroutine and pipeline when done
|
||||||
|
defer wg.Done() |
||||||
|
defer close(pend) |
||||||
|
|
||||||
|
// Keep reading and scheduling section requests
|
||||||
|
for { |
||||||
|
select { |
||||||
|
case <-quit: |
||||||
|
return |
||||||
|
|
||||||
|
case section, ok := <-reqs: |
||||||
|
// New section retrieval requested
|
||||||
|
if !ok { |
||||||
|
return |
||||||
|
} |
||||||
|
// Deduplicate retrieval requests
|
||||||
|
unique := false |
||||||
|
|
||||||
|
s.lock.Lock() |
||||||
|
if s.responses[section] == nil { |
||||||
|
s.responses[section] = &response{ |
||||||
|
done: make(chan struct{}), |
||||||
|
} |
||||||
|
unique = true |
||||||
|
} |
||||||
|
s.lock.Unlock() |
||||||
|
|
||||||
|
// Schedule the section for retrieval and notify the deliverer to expect this section
|
||||||
|
if unique { |
||||||
|
select { |
||||||
|
case <-quit: |
||||||
|
return |
||||||
|
case dist <- &request{bit: s.bit, section: section}: |
||||||
|
} |
||||||
|
} |
||||||
|
select { |
||||||
|
case <-quit: |
||||||
|
return |
||||||
|
case pend <- section: |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// scheduleDeliveries reads section acceptance notifications and waits for them
|
||||||
|
// to be delivered, pushing them into the output data buffer.
|
||||||
|
func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) { |
||||||
|
// Clean up the goroutine and pipeline when done
|
||||||
|
defer wg.Done() |
||||||
|
defer close(done) |
||||||
|
|
||||||
|
// Keep reading notifications and scheduling deliveries
|
||||||
|
for { |
||||||
|
select { |
||||||
|
case <-quit: |
||||||
|
return |
||||||
|
|
||||||
|
case idx, ok := <-pend: |
||||||
|
// New section retrieval pending
|
||||||
|
if !ok { |
||||||
|
return |
||||||
|
} |
||||||
|
// Wait until the request is honoured
|
||||||
|
s.lock.Lock() |
||||||
|
res := s.responses[idx] |
||||||
|
s.lock.Unlock() |
||||||
|
|
||||||
|
select { |
||||||
|
case <-quit: |
||||||
|
return |
||||||
|
case <-res.done: |
||||||
|
} |
||||||
|
// Deliver the result
|
||||||
|
select { |
||||||
|
case <-quit: |
||||||
|
return |
||||||
|
case done <- res.cached: |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// deliver is called by the request distributor when a reply to a request arrives.
|
||||||
|
func (s *scheduler) deliver(sections []uint64, data [][]byte) { |
||||||
|
s.lock.Lock() |
||||||
|
defer s.lock.Unlock() |
||||||
|
|
||||||
|
for i, section := range sections { |
||||||
|
if res := s.responses[section]; res != nil && res.cached == nil { // Avoid non-requests and double deliveries
|
||||||
|
res.cached = data[i] |
||||||
|
close(res.done) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,105 @@ |
|||||||
|
// Copyright 2017 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package bloombits |
||||||
|
|
||||||
|
import ( |
||||||
|
"bytes" |
||||||
|
"math/big" |
||||||
|
"math/rand" |
||||||
|
"sync" |
||||||
|
"sync/atomic" |
||||||
|
"testing" |
||||||
|
"time" |
||||||
|
) |
||||||
|
|
||||||
|
// Tests that the scheduler can deduplicate and forward retrieval requests to
|
||||||
|
// underlying fetchers and serve responses back, irrelevant of the concurrency
|
||||||
|
// of the requesting clients or serving data fetchers.
|
||||||
|
func TestSchedulerSingleClientSingleFetcher(t *testing.T) { testScheduler(t, 1, 1, 5000) } |
||||||
|
func TestSchedulerSingleClientMultiFetcher(t *testing.T) { testScheduler(t, 1, 10, 5000) } |
||||||
|
func TestSchedulerMultiClientSingleFetcher(t *testing.T) { testScheduler(t, 10, 1, 5000) } |
||||||
|
func TestSchedulerMultiClientMultiFetcher(t *testing.T) { testScheduler(t, 10, 10, 5000) } |
||||||
|
|
||||||
|
func testScheduler(t *testing.T, clients int, fetchers int, requests int) { |
||||||
|
f := newScheduler(0) |
||||||
|
|
||||||
|
// Create a batch of handler goroutines that respond to bloom bit requests and
|
||||||
|
// deliver them to the scheduler.
|
||||||
|
var fetchPend sync.WaitGroup |
||||||
|
fetchPend.Add(fetchers) |
||||||
|
defer fetchPend.Wait() |
||||||
|
|
||||||
|
fetch := make(chan *request, 16) |
||||||
|
defer close(fetch) |
||||||
|
|
||||||
|
var delivered uint32 |
||||||
|
for i := 0; i < fetchers; i++ { |
||||||
|
go func() { |
||||||
|
defer fetchPend.Done() |
||||||
|
|
||||||
|
for req := range fetch { |
||||||
|
time.Sleep(time.Duration(rand.Intn(int(100 * time.Microsecond)))) |
||||||
|
atomic.AddUint32(&delivered, 1) |
||||||
|
|
||||||
|
f.deliver([]uint64{ |
||||||
|
req.section + uint64(requests), // Non-requested data (ensure it doesn't go out of bounds)
|
||||||
|
req.section, // Requested data
|
||||||
|
req.section, // Duplicated data (ensure it doesn't double close anything)
|
||||||
|
}, [][]byte{ |
||||||
|
[]byte{}, |
||||||
|
new(big.Int).SetUint64(req.section).Bytes(), |
||||||
|
new(big.Int).SetUint64(req.section).Bytes(), |
||||||
|
}) |
||||||
|
} |
||||||
|
}() |
||||||
|
} |
||||||
|
// Start a batch of goroutines to concurrently run scheduling tasks
|
||||||
|
quit := make(chan struct{}) |
||||||
|
|
||||||
|
var pend sync.WaitGroup |
||||||
|
pend.Add(clients) |
||||||
|
|
||||||
|
for i := 0; i < clients; i++ { |
||||||
|
go func() { |
||||||
|
defer pend.Done() |
||||||
|
|
||||||
|
in := make(chan uint64, 16) |
||||||
|
out := make(chan []byte, 16) |
||||||
|
|
||||||
|
f.run(in, fetch, out, quit, &pend) |
||||||
|
|
||||||
|
go func() { |
||||||
|
for j := 0; j < requests; j++ { |
||||||
|
in <- uint64(j) |
||||||
|
} |
||||||
|
close(in) |
||||||
|
}() |
||||||
|
|
||||||
|
for j := 0; j < requests; j++ { |
||||||
|
bits := <-out |
||||||
|
if want := new(big.Int).SetUint64(uint64(j)).Bytes(); !bytes.Equal(bits, want) { |
||||||
|
t.Errorf("vector %d: delivered content mismatch: have %x, want %x", j, bits, want) |
||||||
|
} |
||||||
|
} |
||||||
|
}() |
||||||
|
} |
||||||
|
pend.Wait() |
||||||
|
|
||||||
|
if have := atomic.LoadUint32(&delivered); int(have) != requests { |
||||||
|
t.Errorf("request count mismatch: have %v, want %v", have, requests) |
||||||
|
} |
||||||
|
} |
@ -1,74 +0,0 @@ |
|||||||
// Copyright 2015 The go-ethereum Authors
|
|
||||||
// This file is part of the go-ethereum library.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Lesser General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Lesser General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU Lesser General Public License
|
|
||||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package eth |
|
||||||
|
|
||||||
import ( |
|
||||||
"math/big" |
|
||||||
"testing" |
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common" |
|
||||||
"github.com/ethereum/go-ethereum/core" |
|
||||||
"github.com/ethereum/go-ethereum/core/types" |
|
||||||
"github.com/ethereum/go-ethereum/ethdb" |
|
||||||
"github.com/ethereum/go-ethereum/params" |
|
||||||
) |
|
||||||
|
|
||||||
func TestMipmapUpgrade(t *testing.T) { |
|
||||||
db, _ := ethdb.NewMemDatabase() |
|
||||||
addr := common.BytesToAddress([]byte("jeff")) |
|
||||||
genesis := new(core.Genesis).MustCommit(db) |
|
||||||
|
|
||||||
chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) { |
|
||||||
switch i { |
|
||||||
case 1: |
|
||||||
receipt := types.NewReceipt(nil, false, new(big.Int)) |
|
||||||
receipt.Logs = []*types.Log{{Address: addr}} |
|
||||||
gen.AddUncheckedReceipt(receipt) |
|
||||||
case 2: |
|
||||||
receipt := types.NewReceipt(nil, false, new(big.Int)) |
|
||||||
receipt.Logs = []*types.Log{{Address: addr}} |
|
||||||
gen.AddUncheckedReceipt(receipt) |
|
||||||
} |
|
||||||
}) |
|
||||||
for i, block := range chain { |
|
||||||
core.WriteBlock(db, block) |
|
||||||
if err := core.WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil { |
|
||||||
t.Fatalf("failed to insert block number: %v", err) |
|
||||||
} |
|
||||||
if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil { |
|
||||||
t.Fatalf("failed to insert block number: %v", err) |
|
||||||
} |
|
||||||
if err := core.WriteBlockReceipts(db, block.Hash(), block.NumberU64(), receipts[i]); err != nil { |
|
||||||
t.Fatal("error writing block receipts:", err) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
err := addMipmapBloomBins(db) |
|
||||||
if err != nil { |
|
||||||
t.Fatal(err) |
|
||||||
} |
|
||||||
|
|
||||||
bloom := core.GetMipmapBloom(db, 1, core.MIPMapLevels[0]) |
|
||||||
if (bloom == types.Bloom{}) { |
|
||||||
t.Error("got empty bloom filter") |
|
||||||
} |
|
||||||
|
|
||||||
data, _ := db.Get([]byte("setting-mipmap-version")) |
|
||||||
if len(data) == 0 { |
|
||||||
t.Error("setting-mipmap-version not written to database") |
|
||||||
} |
|
||||||
} |
|
@ -0,0 +1,142 @@ |
|||||||
|
// Copyright 2017 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package eth |
||||||
|
|
||||||
|
import ( |
||||||
|
"time" |
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/common" |
||||||
|
"github.com/ethereum/go-ethereum/common/bitutil" |
||||||
|
"github.com/ethereum/go-ethereum/core" |
||||||
|
"github.com/ethereum/go-ethereum/core/bloombits" |
||||||
|
"github.com/ethereum/go-ethereum/core/types" |
||||||
|
"github.com/ethereum/go-ethereum/ethdb" |
||||||
|
"github.com/ethereum/go-ethereum/params" |
||||||
|
) |
||||||
|
|
||||||
|
const ( |
||||||
|
// bloomServiceThreads is the number of goroutines used globally by an Ethereum
|
||||||
|
// instance to service bloombits lookups for all running filters.
|
||||||
|
bloomServiceThreads = 16 |
||||||
|
|
||||||
|
// bloomFilterThreads is the number of goroutines used locally per filter to
|
||||||
|
// multiplex requests onto the global servicing goroutines.
|
||||||
|
bloomFilterThreads = 3 |
||||||
|
|
||||||
|
// bloomRetrievalBatch is the maximum number of bloom bit retrievals to service
|
||||||
|
// in a single batch.
|
||||||
|
bloomRetrievalBatch = 16 |
||||||
|
|
||||||
|
// bloomRetrievalWait is the maximum time to wait for enough bloom bit requests
|
||||||
|
// to accumulate request an entire batch (avoiding hysteresis).
|
||||||
|
bloomRetrievalWait = time.Duration(0) |
||||||
|
) |
||||||
|
|
||||||
|
// startBloomHandlers starts a batch of goroutines to accept bloom bit database
|
||||||
|
// retrievals from possibly a range of filters and serving the data to satisfy.
|
||||||
|
func (eth *Ethereum) startBloomHandlers() { |
||||||
|
for i := 0; i < bloomServiceThreads; i++ { |
||||||
|
go func() { |
||||||
|
for { |
||||||
|
select { |
||||||
|
case <-eth.shutdownChan: |
||||||
|
return |
||||||
|
|
||||||
|
case request := <-eth.bloomRequests: |
||||||
|
task := <-request |
||||||
|
|
||||||
|
task.Bitsets = make([][]byte, len(task.Sections)) |
||||||
|
for i, section := range task.Sections { |
||||||
|
head := core.GetCanonicalHash(eth.chainDb, (section+1)*params.BloomBitsBlocks-1) |
||||||
|
blob, err := bitutil.DecompressBytes(core.GetBloomBits(eth.chainDb, task.Bit, section, head), int(params.BloomBitsBlocks)/8) |
||||||
|
if err != nil { |
||||||
|
panic(err) |
||||||
|
} |
||||||
|
task.Bitsets[i] = blob |
||||||
|
} |
||||||
|
request <- task |
||||||
|
} |
||||||
|
} |
||||||
|
}() |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
const ( |
||||||
|
// bloomConfirms is the number of confirmation blocks before a bloom section is
|
||||||
|
// considered probably final and its rotated bits are calculated.
|
||||||
|
bloomConfirms = 256 |
||||||
|
|
||||||
|
// bloomThrottling is the time to wait between processing two consecutive index
|
||||||
|
// sections. It's useful during chain upgrades to prevent disk overload.
|
||||||
|
bloomThrottling = 100 * time.Millisecond |
||||||
|
) |
||||||
|
|
||||||
|
// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index
|
||||||
|
// for the Ethereum header bloom filters, permitting blazing fast filtering.
|
||||||
|
type BloomIndexer struct { |
||||||
|
size uint64 // section size to generate bloombits for
|
||||||
|
|
||||||
|
db ethdb.Database // database instance to write index data and metadata into
|
||||||
|
gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
|
||||||
|
|
||||||
|
section uint64 // Section is the section number being processed currently
|
||||||
|
head common.Hash // Head is the hash of the last header processed
|
||||||
|
} |
||||||
|
|
||||||
|
// NewBloomIndexer returns a chain indexer that generates bloom bits data for the
|
||||||
|
// canonical chain for fast logs filtering.
|
||||||
|
func NewBloomIndexer(db ethdb.Database, size uint64) *core.ChainIndexer { |
||||||
|
backend := &BloomIndexer{ |
||||||
|
db: db, |
||||||
|
size: size, |
||||||
|
} |
||||||
|
table := ethdb.NewTable(db, string(core.BloomBitsIndexPrefix)) |
||||||
|
|
||||||
|
return core.NewChainIndexer(db, table, backend, size, bloomConfirms, bloomThrottling, "bloombits") |
||||||
|
} |
||||||
|
|
||||||
|
// Reset implements core.ChainIndexerBackend, starting a new bloombits index
|
||||||
|
// section.
|
||||||
|
func (b *BloomIndexer) Reset(section uint64) { |
||||||
|
gen, err := bloombits.NewGenerator(uint(b.size)) |
||||||
|
if err != nil { |
||||||
|
panic(err) |
||||||
|
} |
||||||
|
b.gen, b.section, b.head = gen, section, common.Hash{} |
||||||
|
} |
||||||
|
|
||||||
|
// Process implements core.ChainIndexerBackend, adding a new header's bloom into
|
||||||
|
// the index.
|
||||||
|
func (b *BloomIndexer) Process(header *types.Header) { |
||||||
|
b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom) |
||||||
|
b.head = header.Hash() |
||||||
|
} |
||||||
|
|
||||||
|
// Commit implements core.ChainIndexerBackend, finalizing the bloom section and
|
||||||
|
// writing it out into the database.
|
||||||
|
func (b *BloomIndexer) Commit() error { |
||||||
|
batch := b.db.NewBatch() |
||||||
|
|
||||||
|
for i := 0; i < types.BloomBitLength; i++ { |
||||||
|
bits, err := b.gen.Bitset(uint(i)) |
||||||
|
if err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
core.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits)) |
||||||
|
} |
||||||
|
return batch.Write() |
||||||
|
} |
@ -0,0 +1,201 @@ |
|||||||
|
// Copyright 2017 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package filters |
||||||
|
|
||||||
|
import ( |
||||||
|
"bytes" |
||||||
|
"context" |
||||||
|
"fmt" |
||||||
|
"testing" |
||||||
|
"time" |
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/common" |
||||||
|
"github.com/ethereum/go-ethereum/common/bitutil" |
||||||
|
"github.com/ethereum/go-ethereum/core" |
||||||
|
"github.com/ethereum/go-ethereum/core/bloombits" |
||||||
|
"github.com/ethereum/go-ethereum/core/types" |
||||||
|
"github.com/ethereum/go-ethereum/ethdb" |
||||||
|
"github.com/ethereum/go-ethereum/event" |
||||||
|
"github.com/ethereum/go-ethereum/node" |
||||||
|
) |
||||||
|
|
||||||
|
func BenchmarkBloomBits512(b *testing.B) { |
||||||
|
benchmarkBloomBits(b, 512) |
||||||
|
} |
||||||
|
|
||||||
|
func BenchmarkBloomBits1k(b *testing.B) { |
||||||
|
benchmarkBloomBits(b, 1024) |
||||||
|
} |
||||||
|
|
||||||
|
func BenchmarkBloomBits2k(b *testing.B) { |
||||||
|
benchmarkBloomBits(b, 2048) |
||||||
|
} |
||||||
|
|
||||||
|
func BenchmarkBloomBits4k(b *testing.B) { |
||||||
|
benchmarkBloomBits(b, 4096) |
||||||
|
} |
||||||
|
|
||||||
|
func BenchmarkBloomBits8k(b *testing.B) { |
||||||
|
benchmarkBloomBits(b, 8192) |
||||||
|
} |
||||||
|
|
||||||
|
func BenchmarkBloomBits16k(b *testing.B) { |
||||||
|
benchmarkBloomBits(b, 16384) |
||||||
|
} |
||||||
|
|
||||||
|
func BenchmarkBloomBits32k(b *testing.B) { |
||||||
|
benchmarkBloomBits(b, 32768) |
||||||
|
} |
||||||
|
|
||||||
|
const benchFilterCnt = 2000 |
||||||
|
|
||||||
|
func benchmarkBloomBits(b *testing.B, sectionSize uint64) { |
||||||
|
benchDataDir := node.DefaultDataDir() + "/geth/chaindata" |
||||||
|
fmt.Println("Running bloombits benchmark section size:", sectionSize) |
||||||
|
|
||||||
|
db, err := ethdb.NewLDBDatabase(benchDataDir, 128, 1024) |
||||||
|
if err != nil { |
||||||
|
b.Fatalf("error opening database at %v: %v", benchDataDir, err) |
||||||
|
} |
||||||
|
head := core.GetHeadBlockHash(db) |
||||||
|
if head == (common.Hash{}) { |
||||||
|
b.Fatalf("chain data not found at %v", benchDataDir) |
||||||
|
} |
||||||
|
|
||||||
|
clearBloomBits(db) |
||||||
|
fmt.Println("Generating bloombits data...") |
||||||
|
headNum := core.GetBlockNumber(db, head) |
||||||
|
if headNum < sectionSize+512 { |
||||||
|
b.Fatalf("not enough blocks for running a benchmark") |
||||||
|
} |
||||||
|
|
||||||
|
start := time.Now() |
||||||
|
cnt := (headNum - 512) / sectionSize |
||||||
|
var dataSize, compSize uint64 |
||||||
|
for sectionIdx := uint64(0); sectionIdx < cnt; sectionIdx++ { |
||||||
|
bc, err := bloombits.NewGenerator(uint(sectionSize)) |
||||||
|
if err != nil { |
||||||
|
b.Fatalf("failed to create generator: %v", err) |
||||||
|
} |
||||||
|
var header *types.Header |
||||||
|
for i := sectionIdx * sectionSize; i < (sectionIdx+1)*sectionSize; i++ { |
||||||
|
hash := core.GetCanonicalHash(db, i) |
||||||
|
header = core.GetHeader(db, hash, i) |
||||||
|
if header == nil { |
||||||
|
b.Fatalf("Error creating bloomBits data") |
||||||
|
} |
||||||
|
bc.AddBloom(uint(i-sectionIdx*sectionSize), header.Bloom) |
||||||
|
} |
||||||
|
sectionHead := core.GetCanonicalHash(db, (sectionIdx+1)*sectionSize-1) |
||||||
|
for i := 0; i < types.BloomBitLength; i++ { |
||||||
|
data, err := bc.Bitset(uint(i)) |
||||||
|
if err != nil { |
||||||
|
b.Fatalf("failed to retrieve bitset: %v", err) |
||||||
|
} |
||||||
|
comp := bitutil.CompressBytes(data) |
||||||
|
dataSize += uint64(len(data)) |
||||||
|
compSize += uint64(len(comp)) |
||||||
|
core.WriteBloomBits(db, uint(i), sectionIdx, sectionHead, comp) |
||||||
|
} |
||||||
|
//if sectionIdx%50 == 0 {
|
||||||
|
// fmt.Println(" section", sectionIdx, "/", cnt)
|
||||||
|
//}
|
||||||
|
} |
||||||
|
|
||||||
|
d := time.Since(start) |
||||||
|
fmt.Println("Finished generating bloombits data") |
||||||
|
fmt.Println(" ", d, "total ", d/time.Duration(cnt*sectionSize), "per block") |
||||||
|
fmt.Println(" data size:", dataSize, " compressed size:", compSize, " compression ratio:", float64(compSize)/float64(dataSize)) |
||||||
|
|
||||||
|
fmt.Println("Running filter benchmarks...") |
||||||
|
start = time.Now() |
||||||
|
mux := new(event.TypeMux) |
||||||
|
var backend *testBackend |
||||||
|
|
||||||
|
for i := 0; i < benchFilterCnt; i++ { |
||||||
|
if i%20 == 0 { |
||||||
|
db.Close() |
||||||
|
db, _ = ethdb.NewLDBDatabase(benchDataDir, 128, 1024) |
||||||
|
backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)} |
||||||
|
} |
||||||
|
var addr common.Address |
||||||
|
addr[0] = byte(i) |
||||||
|
addr[1] = byte(i / 256) |
||||||
|
filter := New(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil) |
||||||
|
if _, err := filter.Logs(context.Background()); err != nil { |
||||||
|
b.Error("filter.Find error:", err) |
||||||
|
} |
||||||
|
} |
||||||
|
d = time.Since(start) |
||||||
|
fmt.Println("Finished running filter benchmarks") |
||||||
|
fmt.Println(" ", d, "total ", d/time.Duration(benchFilterCnt), "per address", d*time.Duration(1000000)/time.Duration(benchFilterCnt*cnt*sectionSize), "per million blocks") |
||||||
|
db.Close() |
||||||
|
} |
||||||
|
|
||||||
|
func forEachKey(db ethdb.Database, startPrefix, endPrefix []byte, fn func(key []byte)) { |
||||||
|
it := db.(*ethdb.LDBDatabase).NewIterator() |
||||||
|
it.Seek(startPrefix) |
||||||
|
for it.Valid() { |
||||||
|
key := it.Key() |
||||||
|
cmpLen := len(key) |
||||||
|
if len(endPrefix) < cmpLen { |
||||||
|
cmpLen = len(endPrefix) |
||||||
|
} |
||||||
|
if bytes.Compare(key[:cmpLen], endPrefix) == 1 { |
||||||
|
break |
||||||
|
} |
||||||
|
fn(common.CopyBytes(key)) |
||||||
|
it.Next() |
||||||
|
} |
||||||
|
it.Release() |
||||||
|
} |
||||||
|
|
||||||
|
var bloomBitsPrefix = []byte("bloomBits-") |
||||||
|
|
||||||
|
func clearBloomBits(db ethdb.Database) { |
||||||
|
fmt.Println("Clearing bloombits data...") |
||||||
|
forEachKey(db, bloomBitsPrefix, bloomBitsPrefix, func(key []byte) { |
||||||
|
db.Delete(key) |
||||||
|
}) |
||||||
|
} |
||||||
|
|
||||||
|
func BenchmarkNoBloomBits(b *testing.B) { |
||||||
|
benchDataDir := node.DefaultDataDir() + "/geth/chaindata" |
||||||
|
fmt.Println("Running benchmark without bloombits") |
||||||
|
db, err := ethdb.NewLDBDatabase(benchDataDir, 128, 1024) |
||||||
|
if err != nil { |
||||||
|
b.Fatalf("error opening database at %v: %v", benchDataDir, err) |
||||||
|
} |
||||||
|
head := core.GetHeadBlockHash(db) |
||||||
|
if head == (common.Hash{}) { |
||||||
|
b.Fatalf("chain data not found at %v", benchDataDir) |
||||||
|
} |
||||||
|
headNum := core.GetBlockNumber(db, head) |
||||||
|
|
||||||
|
clearBloomBits(db) |
||||||
|
|
||||||
|
fmt.Println("Running filter benchmarks...") |
||||||
|
start := time.Now() |
||||||
|
mux := new(event.TypeMux) |
||||||
|
backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)} |
||||||
|
filter := New(backend, 0, int64(headNum), []common.Address{common.Address{}}, nil) |
||||||
|
filter.Logs(context.Background()) |
||||||
|
d := time.Since(start) |
||||||
|
fmt.Println("Finished running filter benchmarks") |
||||||
|
fmt.Println(" ", d, "total ", d*time.Duration(1000000)/time.Duration(headNum+1), "per million blocks") |
||||||
|
db.Close() |
||||||
|
} |
@ -0,0 +1,26 @@ |
|||||||
|
// Copyright 2017 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package params |
||||||
|
|
||||||
|
// These are network parameters that need to be constant between clients, but
|
||||||
|
// aren't necesarilly consensus related.
|
||||||
|
|
||||||
|
const ( |
||||||
|
// BloomBitsBlocks is the number of blocks a single bloom bit section vector
|
||||||
|
// contains.
|
||||||
|
BloomBitsBlocks uint64 = 4096 |
||||||
|
) |
Loading…
Reference in new issue