mirror of https://github.com/ethereum/go-ethereum
parent
90d75b03ea
commit
7fe099df4d
@ -1,92 +0,0 @@ |
||||
// Copyright 2021 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package core |
||||
|
||||
import ( |
||||
"context" |
||||
"time" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/common/bitutil" |
||||
"github.com/ethereum/go-ethereum/core/bloombits" |
||||
"github.com/ethereum/go-ethereum/core/rawdb" |
||||
"github.com/ethereum/go-ethereum/core/types" |
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
) |
||||
|
||||
const ( |
||||
// bloomThrottling is the time to wait between processing two consecutive index
|
||||
// sections. It's useful during chain upgrades to prevent disk overload.
|
||||
bloomThrottling = 100 * time.Millisecond |
||||
) |
||||
|
||||
// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index
|
||||
// for the Ethereum header bloom filters, permitting blazing fast filtering.
|
||||
type BloomIndexer struct { |
||||
size uint64 // section size to generate bloombits for
|
||||
db ethdb.Database // database instance to write index data and metadata into
|
||||
gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
|
||||
section uint64 // Section is the section number being processed currently
|
||||
head common.Hash // Head is the hash of the last header processed
|
||||
} |
||||
|
||||
// NewBloomIndexer returns a chain indexer that generates bloom bits data for the
|
||||
// canonical chain for fast logs filtering.
|
||||
func NewBloomIndexer(db ethdb.Database, size, confirms uint64) *ChainIndexer { |
||||
backend := &BloomIndexer{ |
||||
db: db, |
||||
size: size, |
||||
} |
||||
table := rawdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix)) |
||||
|
||||
return NewChainIndexer(db, table, backend, size, confirms, bloomThrottling, "bloombits") |
||||
} |
||||
|
||||
// Reset implements core.ChainIndexerBackend, starting a new bloombits index
|
||||
// section.
|
||||
func (b *BloomIndexer) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error { |
||||
gen, err := bloombits.NewGenerator(uint(b.size)) |
||||
b.gen, b.section, b.head = gen, section, common.Hash{} |
||||
return err |
||||
} |
||||
|
||||
// Process implements core.ChainIndexerBackend, adding a new header's bloom into
|
||||
// the index.
|
||||
func (b *BloomIndexer) Process(ctx context.Context, header *types.Header) error { |
||||
b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom) |
||||
b.head = header.Hash() |
||||
return nil |
||||
} |
||||
|
||||
// Commit implements core.ChainIndexerBackend, finalizing the bloom section and
|
||||
// writing it out into the database.
|
||||
func (b *BloomIndexer) Commit() error { |
||||
batch := b.db.NewBatchWithSize((int(b.size) / 8) * types.BloomBitLength) |
||||
for i := 0; i < types.BloomBitLength; i++ { |
||||
bits, err := b.gen.Bitset(uint(i)) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
rawdb.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits)) |
||||
} |
||||
return batch.Write() |
||||
} |
||||
|
||||
// Prune returns an empty error since we don't support pruning here.
|
||||
func (b *BloomIndexer) Prune(threshold uint64) error { |
||||
return nil |
||||
} |
@ -1,18 +0,0 @@ |
||||
// Copyright 2017 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
// Package bloombits implements bloom filtering on batches of data.
|
||||
package bloombits |
@ -1,98 +0,0 @@ |
||||
// Copyright 2017 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package bloombits |
||||
|
||||
import ( |
||||
"errors" |
||||
|
||||
"github.com/ethereum/go-ethereum/core/types" |
||||
) |
||||
|
||||
var ( |
||||
// errSectionOutOfBounds is returned if the user tried to add more bloom filters
|
||||
// to the batch than available space, or if tries to retrieve above the capacity.
|
||||
errSectionOutOfBounds = errors.New("section out of bounds") |
||||
|
||||
// errBloomBitOutOfBounds is returned if the user tried to retrieve specified
|
||||
// bit bloom above the capacity.
|
||||
errBloomBitOutOfBounds = errors.New("bloom bit out of bounds") |
||||
) |
||||
|
||||
// Generator takes a number of bloom filters and generates the rotated bloom bits
|
||||
// to be used for batched filtering.
|
||||
type Generator struct { |
||||
blooms [types.BloomBitLength][]byte // Rotated blooms for per-bit matching
|
||||
sections uint // Number of sections to batch together
|
||||
nextSec uint // Next section to set when adding a bloom
|
||||
} |
||||
|
||||
// NewGenerator creates a rotated bloom generator that can iteratively fill a
|
||||
// batched bloom filter's bits.
|
||||
func NewGenerator(sections uint) (*Generator, error) { |
||||
if sections%8 != 0 { |
||||
return nil, errors.New("section count not multiple of 8") |
||||
} |
||||
b := &Generator{sections: sections} |
||||
for i := 0; i < types.BloomBitLength; i++ { |
||||
b.blooms[i] = make([]byte, sections/8) |
||||
} |
||||
return b, nil |
||||
} |
||||
|
||||
// AddBloom takes a single bloom filter and sets the corresponding bit column
|
||||
// in memory accordingly.
|
||||
func (b *Generator) AddBloom(index uint, bloom types.Bloom) error { |
||||
// Make sure we're not adding more bloom filters than our capacity
|
||||
if b.nextSec >= b.sections { |
||||
return errSectionOutOfBounds |
||||
} |
||||
if b.nextSec != index { |
||||
return errors.New("bloom filter with unexpected index") |
||||
} |
||||
// Rotate the bloom and insert into our collection
|
||||
byteIndex := b.nextSec / 8 |
||||
bitIndex := byte(7 - b.nextSec%8) |
||||
for byt := 0; byt < types.BloomByteLength; byt++ { |
||||
bloomByte := bloom[types.BloomByteLength-1-byt] |
||||
if bloomByte == 0 { |
||||
continue |
||||
} |
||||
base := 8 * byt |
||||
b.blooms[base+7][byteIndex] |= ((bloomByte >> 7) & 1) << bitIndex |
||||
b.blooms[base+6][byteIndex] |= ((bloomByte >> 6) & 1) << bitIndex |
||||
b.blooms[base+5][byteIndex] |= ((bloomByte >> 5) & 1) << bitIndex |
||||
b.blooms[base+4][byteIndex] |= ((bloomByte >> 4) & 1) << bitIndex |
||||
b.blooms[base+3][byteIndex] |= ((bloomByte >> 3) & 1) << bitIndex |
||||
b.blooms[base+2][byteIndex] |= ((bloomByte >> 2) & 1) << bitIndex |
||||
b.blooms[base+1][byteIndex] |= ((bloomByte >> 1) & 1) << bitIndex |
||||
b.blooms[base][byteIndex] |= (bloomByte & 1) << bitIndex |
||||
} |
||||
b.nextSec++ |
||||
return nil |
||||
} |
||||
|
||||
// Bitset returns the bit vector belonging to the given bit index after all
|
||||
// blooms have been added.
|
||||
func (b *Generator) Bitset(idx uint) ([]byte, error) { |
||||
if b.nextSec != b.sections { |
||||
return nil, errors.New("bloom not fully generated yet") |
||||
} |
||||
if idx >= types.BloomBitLength { |
||||
return nil, errBloomBitOutOfBounds |
||||
} |
||||
return b.blooms[idx], nil |
||||
} |
@ -1,100 +0,0 @@ |
||||
// Copyright 2017 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package bloombits |
||||
|
||||
import ( |
||||
"bytes" |
||||
crand "crypto/rand" |
||||
"math/rand" |
||||
"testing" |
||||
|
||||
"github.com/ethereum/go-ethereum/core/types" |
||||
) |
||||
|
||||
// Tests that batched bloom bits are correctly rotated from the input bloom
|
||||
// filters.
|
||||
func TestGenerator(t *testing.T) { |
||||
// Generate the input and the rotated output
|
||||
var input, output [types.BloomBitLength][types.BloomByteLength]byte |
||||
|
||||
for i := 0; i < types.BloomBitLength; i++ { |
||||
for j := 0; j < types.BloomBitLength; j++ { |
||||
bit := byte(rand.Int() % 2) |
||||
|
||||
input[i][j/8] |= bit << byte(7-j%8) |
||||
output[types.BloomBitLength-1-j][i/8] |= bit << byte(7-i%8) |
||||
} |
||||
} |
||||
// Crunch the input through the generator and verify the result
|
||||
gen, err := NewGenerator(types.BloomBitLength) |
||||
if err != nil { |
||||
t.Fatalf("failed to create bloombit generator: %v", err) |
||||
} |
||||
for i, bloom := range input { |
||||
if err := gen.AddBloom(uint(i), bloom); err != nil { |
||||
t.Fatalf("bloom %d: failed to add: %v", i, err) |
||||
} |
||||
} |
||||
for i, want := range output { |
||||
have, err := gen.Bitset(uint(i)) |
||||
if err != nil { |
||||
t.Fatalf("output %d: failed to retrieve bits: %v", i, err) |
||||
} |
||||
if !bytes.Equal(have, want[:]) { |
||||
t.Errorf("output %d: bit vector mismatch have %x, want %x", i, have, want) |
||||
} |
||||
} |
||||
} |
||||
|
||||
func BenchmarkGenerator(b *testing.B) { |
||||
var input [types.BloomBitLength][types.BloomByteLength]byte |
||||
b.Run("empty", func(b *testing.B) { |
||||
b.ReportAllocs() |
||||
b.ResetTimer() |
||||
for i := 0; i < b.N; i++ { |
||||
// Crunch the input through the generator and verify the result
|
||||
gen, err := NewGenerator(types.BloomBitLength) |
||||
if err != nil { |
||||
b.Fatalf("failed to create bloombit generator: %v", err) |
||||
} |
||||
for j, bloom := range &input { |
||||
if err := gen.AddBloom(uint(j), bloom); err != nil { |
||||
b.Fatalf("bloom %d: failed to add: %v", i, err) |
||||
} |
||||
} |
||||
} |
||||
}) |
||||
for i := 0; i < types.BloomBitLength; i++ { |
||||
crand.Read(input[i][:]) |
||||
} |
||||
b.Run("random", func(b *testing.B) { |
||||
b.ReportAllocs() |
||||
b.ResetTimer() |
||||
for i := 0; i < b.N; i++ { |
||||
// Crunch the input through the generator and verify the result
|
||||
gen, err := NewGenerator(types.BloomBitLength) |
||||
if err != nil { |
||||
b.Fatalf("failed to create bloombit generator: %v", err) |
||||
} |
||||
for j, bloom := range &input { |
||||
if err := gen.AddBloom(uint(j), bloom); err != nil { |
||||
b.Fatalf("bloom %d: failed to add: %v", i, err) |
||||
} |
||||
} |
||||
} |
||||
}) |
||||
} |
@ -1,649 +0,0 @@ |
||||
// Copyright 2017 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package bloombits |
||||
|
||||
import ( |
||||
"bytes" |
||||
"context" |
||||
"errors" |
||||
"math" |
||||
"sort" |
||||
"sync" |
||||
"sync/atomic" |
||||
"time" |
||||
|
||||
"github.com/ethereum/go-ethereum/common/bitutil" |
||||
"github.com/ethereum/go-ethereum/crypto" |
||||
) |
||||
|
||||
// bloomIndexes represents the bit indexes inside the bloom filter that belong
|
||||
// to some key.
|
||||
type bloomIndexes [3]uint |
||||
|
||||
// calcBloomIndexes returns the bloom filter bit indexes belonging to the given key.
|
||||
func calcBloomIndexes(b []byte) bloomIndexes { |
||||
b = crypto.Keccak256(b) |
||||
|
||||
var idxs bloomIndexes |
||||
for i := 0; i < len(idxs); i++ { |
||||
idxs[i] = (uint(b[2*i])<<8)&2047 + uint(b[2*i+1]) |
||||
} |
||||
return idxs |
||||
} |
||||
|
||||
// partialMatches with a non-nil vector represents a section in which some sub-
|
||||
// matchers have already found potential matches. Subsequent sub-matchers will
|
||||
// binary AND their matches with this vector. If vector is nil, it represents a
|
||||
// section to be processed by the first sub-matcher.
|
||||
type partialMatches struct { |
||||
section uint64 |
||||
bitset []byte |
||||
} |
||||
|
||||
// Retrieval represents a request for retrieval task assignments for a given
|
||||
// bit with the given number of fetch elements, or a response for such a request.
|
||||
// It can also have the actual results set to be used as a delivery data struct.
|
||||
//
|
||||
// The context and error fields are used by the light client to terminate matching
|
||||
// early if an error is encountered on some path of the pipeline.
|
||||
type Retrieval struct { |
||||
Bit uint |
||||
Sections []uint64 |
||||
Bitsets [][]byte |
||||
|
||||
Context context.Context |
||||
Error error |
||||
} |
||||
|
||||
// Matcher is a pipelined system of schedulers and logic matchers which perform
|
||||
// binary AND/OR operations on the bit-streams, creating a stream of potential
|
||||
// blocks to inspect for data content.
|
||||
type Matcher struct { |
||||
sectionSize uint64 // Size of the data batches to filter on
|
||||
|
||||
filters [][]bloomIndexes // Filter the system is matching for
|
||||
schedulers map[uint]*scheduler // Retrieval schedulers for loading bloom bits
|
||||
|
||||
retrievers chan chan uint // Retriever processes waiting for bit allocations
|
||||
counters chan chan uint // Retriever processes waiting for task count reports
|
||||
retrievals chan chan *Retrieval // Retriever processes waiting for task allocations
|
||||
deliveries chan *Retrieval // Retriever processes waiting for task response deliveries
|
||||
|
||||
running atomic.Bool // Atomic flag whether a session is live or not
|
||||
} |
||||
|
||||
// NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
|
||||
// address and topic filtering on them. Setting a filter component to `nil` is
|
||||
// allowed and will result in that filter rule being skipped (OR 0x11...1).
|
||||
func NewMatcher(sectionSize uint64, filters [][][]byte) *Matcher { |
||||
// Create the matcher instance
|
||||
m := &Matcher{ |
||||
sectionSize: sectionSize, |
||||
schedulers: make(map[uint]*scheduler), |
||||
retrievers: make(chan chan uint), |
||||
counters: make(chan chan uint), |
||||
retrievals: make(chan chan *Retrieval), |
||||
deliveries: make(chan *Retrieval), |
||||
} |
||||
// Calculate the bloom bit indexes for the groups we're interested in
|
||||
m.filters = nil |
||||
|
||||
for _, filter := range filters { |
||||
// Gather the bit indexes of the filter rule, special casing the nil filter
|
||||
if len(filter) == 0 { |
||||
continue |
||||
} |
||||
bloomBits := make([]bloomIndexes, len(filter)) |
||||
for i, clause := range filter { |
||||
if clause == nil { |
||||
bloomBits = nil |
||||
break |
||||
} |
||||
bloomBits[i] = calcBloomIndexes(clause) |
||||
} |
||||
// Accumulate the filter rules if no nil rule was within
|
||||
if bloomBits != nil { |
||||
m.filters = append(m.filters, bloomBits) |
||||
} |
||||
} |
||||
// For every bit, create a scheduler to load/download the bit vectors
|
||||
for _, bloomIndexLists := range m.filters { |
||||
for _, bloomIndexList := range bloomIndexLists { |
||||
for _, bloomIndex := range bloomIndexList { |
||||
m.addScheduler(bloomIndex) |
||||
} |
||||
} |
||||
} |
||||
return m |
||||
} |
||||
|
||||
// addScheduler adds a bit stream retrieval scheduler for the given bit index if
|
||||
// it has not existed before. If the bit is already selected for filtering, the
|
||||
// existing scheduler can be used.
|
||||
func (m *Matcher) addScheduler(idx uint) { |
||||
if _, ok := m.schedulers[idx]; ok { |
||||
return |
||||
} |
||||
m.schedulers[idx] = newScheduler(idx) |
||||
} |
||||
|
||||
// Start starts the matching process and returns a stream of bloom matches in
|
||||
// a given range of blocks. If there are no more matches in the range, the result
|
||||
// channel is closed.
|
||||
func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) { |
||||
// Make sure we're not creating concurrent sessions
|
||||
if m.running.Swap(true) { |
||||
return nil, errors.New("matcher already running") |
||||
} |
||||
defer m.running.Store(false) |
||||
|
||||
// Initiate a new matching round
|
||||
session := &MatcherSession{ |
||||
matcher: m, |
||||
quit: make(chan struct{}), |
||||
ctx: ctx, |
||||
} |
||||
for _, scheduler := range m.schedulers { |
||||
scheduler.reset() |
||||
} |
||||
sink := m.run(begin, end, cap(results), session) |
||||
|
||||
// Read the output from the result sink and deliver to the user
|
||||
session.pend.Add(1) |
||||
go func() { |
||||
defer session.pend.Done() |
||||
defer close(results) |
||||
|
||||
for { |
||||
select { |
||||
case <-session.quit: |
||||
return |
||||
|
||||
case res, ok := <-sink: |
||||
// New match result found
|
||||
if !ok { |
||||
return |
||||
} |
||||
// Calculate the first and last blocks of the section
|
||||
sectionStart := res.section * m.sectionSize |
||||
|
||||
first := sectionStart |
||||
if begin > first { |
||||
first = begin |
||||
} |
||||
last := sectionStart + m.sectionSize - 1 |
||||
if end < last { |
||||
last = end |
||||
} |
||||
// Iterate over all the blocks in the section and return the matching ones
|
||||
for i := first; i <= last; i++ { |
||||
// Skip the entire byte if no matches are found inside (and we're processing an entire byte!)
|
||||
next := res.bitset[(i-sectionStart)/8] |
||||
if next == 0 { |
||||
if i%8 == 0 { |
||||
i += 7 |
||||
} |
||||
continue |
||||
} |
||||
// Some bit it set, do the actual submatching
|
||||
if bit := 7 - i%8; next&(1<<bit) != 0 { |
||||
select { |
||||
case <-session.quit: |
||||
return |
||||
case results <- i: |
||||
} |
||||
} |
||||
} |
||||
} |
||||
} |
||||
}() |
||||
return session, nil |
||||
} |
||||
|
||||
// run creates a daisy-chain of sub-matchers, one for the address set and one
|
||||
// for each topic set, each sub-matcher receiving a section only if the previous
|
||||
// ones have all found a potential match in one of the blocks of the section,
|
||||
// then binary AND-ing its own matches and forwarding the result to the next one.
|
||||
//
|
||||
// The method starts feeding the section indexes into the first sub-matcher on a
|
||||
// new goroutine and returns a sink channel receiving the results.
|
||||
func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) chan *partialMatches { |
||||
// Create the source channel and feed section indexes into
|
||||
source := make(chan *partialMatches, buffer) |
||||
|
||||
session.pend.Add(1) |
||||
go func() { |
||||
defer session.pend.Done() |
||||
defer close(source) |
||||
|
||||
for i := begin / m.sectionSize; i <= end/m.sectionSize; i++ { |
||||
select { |
||||
case <-session.quit: |
||||
return |
||||
case source <- &partialMatches{i, bytes.Repeat([]byte{0xff}, int(m.sectionSize/8))}: |
||||
} |
||||
} |
||||
}() |
||||
// Assemble the daisy-chained filtering pipeline
|
||||
next := source |
||||
dist := make(chan *request, buffer) |
||||
|
||||
for _, bloom := range m.filters { |
||||
next = m.subMatch(next, dist, bloom, session) |
||||
} |
||||
// Start the request distribution
|
||||
session.pend.Add(1) |
||||
go m.distributor(dist, session) |
||||
|
||||
return next |
||||
} |
||||
|
||||
// subMatch creates a sub-matcher that filters for a set of addresses or topics, binary OR-s those matches, then
|
||||
// binary AND-s the result to the daisy-chain input (source) and forwards it to the daisy-chain output.
|
||||
// The matches of each address/topic are calculated by fetching the given sections of the three bloom bit indexes belonging to
|
||||
// that address/topic, and binary AND-ing those vectors together.
|
||||
func (m *Matcher) subMatch(source chan *partialMatches, dist chan *request, bloom []bloomIndexes, session *MatcherSession) chan *partialMatches { |
||||
// Start the concurrent schedulers for each bit required by the bloom filter
|
||||
sectionSources := make([][3]chan uint64, len(bloom)) |
||||
sectionSinks := make([][3]chan []byte, len(bloom)) |
||||
for i, bits := range bloom { |
||||
for j, bit := range bits { |
||||
sectionSources[i][j] = make(chan uint64, cap(source)) |
||||
sectionSinks[i][j] = make(chan []byte, cap(source)) |
||||
|
||||
m.schedulers[bit].run(sectionSources[i][j], dist, sectionSinks[i][j], session.quit, &session.pend) |
||||
} |
||||
} |
||||
|
||||
process := make(chan *partialMatches, cap(source)) // entries from source are forwarded here after fetches have been initiated
|
||||
results := make(chan *partialMatches, cap(source)) |
||||
|
||||
session.pend.Add(2) |
||||
go func() { |
||||
// Tear down the goroutine and terminate all source channels
|
||||
defer session.pend.Done() |
||||
defer close(process) |
||||
|
||||
defer func() { |
||||
for _, bloomSources := range sectionSources { |
||||
for _, bitSource := range bloomSources { |
||||
close(bitSource) |
||||
} |
||||
} |
||||
}() |
||||
// Read sections from the source channel and multiplex into all bit-schedulers
|
||||
for { |
||||
select { |
||||
case <-session.quit: |
||||
return |
||||
|
||||
case subres, ok := <-source: |
||||
// New subresult from previous link
|
||||
if !ok { |
||||
return |
||||
} |
||||
// Multiplex the section index to all bit-schedulers
|
||||
for _, bloomSources := range sectionSources { |
||||
for _, bitSource := range bloomSources { |
||||
select { |
||||
case <-session.quit: |
||||
return |
||||
case bitSource <- subres.section: |
||||
} |
||||
} |
||||
} |
||||
// Notify the processor that this section will become available
|
||||
select { |
||||
case <-session.quit: |
||||
return |
||||
case process <- subres: |
||||
} |
||||
} |
||||
} |
||||
}() |
||||
|
||||
go func() { |
||||
// Tear down the goroutine and terminate the final sink channel
|
||||
defer session.pend.Done() |
||||
defer close(results) |
||||
|
||||
// Read the source notifications and collect the delivered results
|
||||
for { |
||||
select { |
||||
case <-session.quit: |
||||
return |
||||
|
||||
case subres, ok := <-process: |
||||
// Notified of a section being retrieved
|
||||
if !ok { |
||||
return |
||||
} |
||||
// Gather all the sub-results and merge them together
|
||||
var orVector []byte |
||||
for _, bloomSinks := range sectionSinks { |
||||
var andVector []byte |
||||
for _, bitSink := range bloomSinks { |
||||
var data []byte |
||||
select { |
||||
case <-session.quit: |
||||
return |
||||
case data = <-bitSink: |
||||
} |
||||
if andVector == nil { |
||||
andVector = make([]byte, int(m.sectionSize/8)) |
||||
copy(andVector, data) |
||||
} else { |
||||
bitutil.ANDBytes(andVector, andVector, data) |
||||
} |
||||
} |
||||
if orVector == nil { |
||||
orVector = andVector |
||||
} else { |
||||
bitutil.ORBytes(orVector, orVector, andVector) |
||||
} |
||||
} |
||||
|
||||
if orVector == nil { |
||||
orVector = make([]byte, int(m.sectionSize/8)) |
||||
} |
||||
if subres.bitset != nil { |
||||
bitutil.ANDBytes(orVector, orVector, subres.bitset) |
||||
} |
||||
if bitutil.TestBytes(orVector) { |
||||
select { |
||||
case <-session.quit: |
||||
return |
||||
case results <- &partialMatches{subres.section, orVector}: |
||||
} |
||||
} |
||||
} |
||||
} |
||||
}() |
||||
return results |
||||
} |
||||
|
||||
// distributor receives requests from the schedulers and queues them into a set
|
||||
// of pending requests, which are assigned to retrievers wanting to fulfil them.
|
||||
func (m *Matcher) distributor(dist chan *request, session *MatcherSession) { |
||||
defer session.pend.Done() |
||||
|
||||
var ( |
||||
requests = make(map[uint][]uint64) // Per-bit list of section requests, ordered by section number
|
||||
unallocs = make(map[uint]struct{}) // Bits with pending requests but not allocated to any retriever
|
||||
retrievers chan chan uint // Waiting retrievers (toggled to nil if unallocs is empty)
|
||||
allocs int // Number of active allocations to handle graceful shutdown requests
|
||||
shutdown = session.quit // Shutdown request channel, will gracefully wait for pending requests
|
||||
) |
||||
|
||||
// assign is a helper method to try to assign a pending bit an actively
|
||||
// listening servicer, or schedule it up for later when one arrives.
|
||||
assign := func(bit uint) { |
||||
select { |
||||
case fetcher := <-m.retrievers: |
||||
allocs++ |
||||
fetcher <- bit |
||||
default: |
||||
// No retrievers active, start listening for new ones
|
||||
retrievers = m.retrievers |
||||
unallocs[bit] = struct{}{} |
||||
} |
||||
} |
||||
|
||||
for { |
||||
select { |
||||
case <-shutdown: |
||||
// Shutdown requested. No more retrievers can be allocated,
|
||||
// but we still need to wait until all pending requests have returned.
|
||||
shutdown = nil |
||||
if allocs == 0 { |
||||
return |
||||
} |
||||
|
||||
case req := <-dist: |
||||
// New retrieval request arrived to be distributed to some fetcher process
|
||||
queue := requests[req.bit] |
||||
index := sort.Search(len(queue), func(i int) bool { return queue[i] >= req.section }) |
||||
requests[req.bit] = append(queue[:index], append([]uint64{req.section}, queue[index:]...)...) |
||||
|
||||
// If it's a new bit and we have waiting fetchers, allocate to them
|
||||
if len(queue) == 0 { |
||||
assign(req.bit) |
||||
} |
||||
|
||||
case fetcher := <-retrievers: |
||||
// New retriever arrived, find the lowest section-ed bit to assign
|
||||
bit, best := uint(0), uint64(math.MaxUint64) |
||||
for idx := range unallocs { |
||||
if requests[idx][0] < best { |
||||
bit, best = idx, requests[idx][0] |
||||
} |
||||
} |
||||
// Stop tracking this bit (and alloc notifications if no more work is available)
|
||||
delete(unallocs, bit) |
||||
if len(unallocs) == 0 { |
||||
retrievers = nil |
||||
} |
||||
allocs++ |
||||
fetcher <- bit |
||||
|
||||
case fetcher := <-m.counters: |
||||
// New task count request arrives, return number of items
|
||||
fetcher <- uint(len(requests[<-fetcher])) |
||||
|
||||
case fetcher := <-m.retrievals: |
||||
// New fetcher waiting for tasks to retrieve, assign
|
||||
task := <-fetcher |
||||
if want := len(task.Sections); want >= len(requests[task.Bit]) { |
||||
task.Sections = requests[task.Bit] |
||||
delete(requests, task.Bit) |
||||
} else { |
||||
task.Sections = append(task.Sections[:0], requests[task.Bit][:want]...) |
||||
requests[task.Bit] = append(requests[task.Bit][:0], requests[task.Bit][want:]...) |
||||
} |
||||
fetcher <- task |
||||
|
||||
// If anything was left unallocated, try to assign to someone else
|
||||
if len(requests[task.Bit]) > 0 { |
||||
assign(task.Bit) |
||||
} |
||||
|
||||
case result := <-m.deliveries: |
||||
// New retrieval task response from fetcher, split out missing sections and
|
||||
// deliver complete ones
|
||||
var ( |
||||
sections = make([]uint64, 0, len(result.Sections)) |
||||
bitsets = make([][]byte, 0, len(result.Bitsets)) |
||||
missing = make([]uint64, 0, len(result.Sections)) |
||||
) |
||||
for i, bitset := range result.Bitsets { |
||||
if len(bitset) == 0 { |
||||
missing = append(missing, result.Sections[i]) |
||||
continue |
||||
} |
||||
sections = append(sections, result.Sections[i]) |
||||
bitsets = append(bitsets, bitset) |
||||
} |
||||
m.schedulers[result.Bit].deliver(sections, bitsets) |
||||
allocs-- |
||||
|
||||
// Reschedule missing sections and allocate bit if newly available
|
||||
if len(missing) > 0 { |
||||
queue := requests[result.Bit] |
||||
for _, section := range missing { |
||||
index := sort.Search(len(queue), func(i int) bool { return queue[i] >= section }) |
||||
queue = append(queue[:index], append([]uint64{section}, queue[index:]...)...) |
||||
} |
||||
requests[result.Bit] = queue |
||||
|
||||
if len(queue) == len(missing) { |
||||
assign(result.Bit) |
||||
} |
||||
} |
||||
|
||||
// End the session when all pending deliveries have arrived.
|
||||
if shutdown == nil && allocs == 0 { |
||||
return |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// MatcherSession is returned by a started matcher to be used as a terminator
|
||||
// for the actively running matching operation.
|
||||
type MatcherSession struct { |
||||
matcher *Matcher |
||||
|
||||
closer sync.Once // Sync object to ensure we only ever close once
|
||||
quit chan struct{} // Quit channel to request pipeline termination
|
||||
|
||||
ctx context.Context // Context used by the light client to abort filtering
|
||||
err error // Global error to track retrieval failures deep in the chain
|
||||
errLock sync.Mutex |
||||
|
||||
pend sync.WaitGroup |
||||
} |
||||
|
||||
// Close stops the matching process and waits for all subprocesses to terminate
|
||||
// before returning. The timeout may be used for graceful shutdown, allowing the
|
||||
// currently running retrievals to complete before this time.
|
||||
func (s *MatcherSession) Close() { |
||||
s.closer.Do(func() { |
||||
// Signal termination and wait for all goroutines to tear down
|
||||
close(s.quit) |
||||
s.pend.Wait() |
||||
}) |
||||
} |
||||
|
||||
// Error returns any failure encountered during the matching session.
|
||||
func (s *MatcherSession) Error() error { |
||||
s.errLock.Lock() |
||||
defer s.errLock.Unlock() |
||||
|
||||
return s.err |
||||
} |
||||
|
||||
// allocateRetrieval assigns a bloom bit index to a client process that can either
|
||||
// immediately request and fetch the section contents assigned to this bit or wait
|
||||
// a little while for more sections to be requested.
|
||||
func (s *MatcherSession) allocateRetrieval() (uint, bool) { |
||||
fetcher := make(chan uint) |
||||
|
||||
select { |
||||
case <-s.quit: |
||||
return 0, false |
||||
case s.matcher.retrievers <- fetcher: |
||||
bit, ok := <-fetcher |
||||
return bit, ok |
||||
} |
||||
} |
||||
|
||||
// pendingSections returns the number of pending section retrievals belonging to
|
||||
// the given bloom bit index.
|
||||
func (s *MatcherSession) pendingSections(bit uint) int { |
||||
fetcher := make(chan uint) |
||||
|
||||
select { |
||||
case <-s.quit: |
||||
return 0 |
||||
case s.matcher.counters <- fetcher: |
||||
fetcher <- bit |
||||
return int(<-fetcher) |
||||
} |
||||
} |
||||
|
||||
// allocateSections assigns all or part of an already allocated bit-task queue
|
||||
// to the requesting process.
|
||||
func (s *MatcherSession) allocateSections(bit uint, count int) []uint64 { |
||||
fetcher := make(chan *Retrieval) |
||||
|
||||
select { |
||||
case <-s.quit: |
||||
return nil |
||||
case s.matcher.retrievals <- fetcher: |
||||
task := &Retrieval{ |
||||
Bit: bit, |
||||
Sections: make([]uint64, count), |
||||
} |
||||
fetcher <- task |
||||
return (<-fetcher).Sections |
||||
} |
||||
} |
||||
|
||||
// deliverSections delivers a batch of section bit-vectors for a specific bloom
|
||||
// bit index to be injected into the processing pipeline.
|
||||
func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets [][]byte) { |
||||
s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets} |
||||
} |
||||
|
||||
// Multiplex polls the matcher session for retrieval tasks and multiplexes it into
|
||||
// the requested retrieval queue to be serviced together with other sessions.
|
||||
//
|
||||
// This method will block for the lifetime of the session. Even after termination
|
||||
// of the session, any request in-flight need to be responded to! Empty responses
|
||||
// are fine though in that case.
|
||||
func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) { |
||||
waitTimer := time.NewTimer(wait) |
||||
defer waitTimer.Stop() |
||||
|
||||
for { |
||||
// Allocate a new bloom bit index to retrieve data for, stopping when done
|
||||
bit, ok := s.allocateRetrieval() |
||||
if !ok { |
||||
return |
||||
} |
||||
// Bit allocated, throttle a bit if we're below our batch limit
|
||||
if s.pendingSections(bit) < batch { |
||||
waitTimer.Reset(wait) |
||||
select { |
||||
case <-s.quit: |
||||
// Session terminating, we can't meaningfully service, abort
|
||||
s.allocateSections(bit, 0) |
||||
s.deliverSections(bit, []uint64{}, [][]byte{}) |
||||
return |
||||
|
||||
case <-waitTimer.C: |
||||
// Throttling up, fetch whatever is available
|
||||
} |
||||
} |
||||
// Allocate as much as we can handle and request servicing
|
||||
sections := s.allocateSections(bit, batch) |
||||
request := make(chan *Retrieval) |
||||
|
||||
select { |
||||
case <-s.quit: |
||||
// Session terminating, we can't meaningfully service, abort
|
||||
s.deliverSections(bit, sections, make([][]byte, len(sections))) |
||||
return |
||||
|
||||
case mux <- request: |
||||
// Retrieval accepted, something must arrive before we're aborting
|
||||
request <- &Retrieval{Bit: bit, Sections: sections, Context: s.ctx} |
||||
|
||||
result := <-request |
||||
|
||||
// Deliver a result before s.Close() to avoid a deadlock
|
||||
s.deliverSections(result.Bit, result.Sections, result.Bitsets) |
||||
|
||||
if result.Error != nil { |
||||
s.errLock.Lock() |
||||
s.err = result.Error |
||||
s.errLock.Unlock() |
||||
s.Close() |
||||
} |
||||
} |
||||
} |
||||
} |
@ -1,292 +0,0 @@ |
||||
// Copyright 2017 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package bloombits |
||||
|
||||
import ( |
||||
"context" |
||||
"math/rand" |
||||
"sync/atomic" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
) |
||||
|
||||
const testSectionSize = 4096 |
||||
|
||||
// Tests that wildcard filter rules (nil) can be specified and are handled well.
|
||||
func TestMatcherWildcards(t *testing.T) { |
||||
t.Parallel() |
||||
matcher := NewMatcher(testSectionSize, [][][]byte{ |
||||
{common.Address{}.Bytes(), common.Address{0x01}.Bytes()}, // Default address is not a wildcard
|
||||
{common.Hash{}.Bytes(), common.Hash{0x01}.Bytes()}, // Default hash is not a wildcard
|
||||
{common.Hash{0x01}.Bytes()}, // Plain rule, sanity check
|
||||
{common.Hash{0x01}.Bytes(), nil}, // Wildcard suffix, drop rule
|
||||
{nil, common.Hash{0x01}.Bytes()}, // Wildcard prefix, drop rule
|
||||
{nil, nil}, // Wildcard combo, drop rule
|
||||
{}, // Inited wildcard rule, drop rule
|
||||
nil, // Proper wildcard rule, drop rule
|
||||
}) |
||||
if len(matcher.filters) != 3 { |
||||
t.Fatalf("filter system size mismatch: have %d, want %d", len(matcher.filters), 3) |
||||
} |
||||
if len(matcher.filters[0]) != 2 { |
||||
t.Fatalf("address clause size mismatch: have %d, want %d", len(matcher.filters[0]), 2) |
||||
} |
||||
if len(matcher.filters[1]) != 2 { |
||||
t.Fatalf("combo topic clause size mismatch: have %d, want %d", len(matcher.filters[1]), 2) |
||||
} |
||||
if len(matcher.filters[2]) != 1 { |
||||
t.Fatalf("singletone topic clause size mismatch: have %d, want %d", len(matcher.filters[2]), 1) |
||||
} |
||||
} |
||||
|
||||
// Tests the matcher pipeline on a single continuous workflow without interrupts.
|
||||
func TestMatcherContinuous(t *testing.T) { |
||||
t.Parallel() |
||||
testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 0, 100000, false, 75) |
||||
testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 0, 100000, false, 81) |
||||
testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 0, 10000, false, 36) |
||||
} |
||||
|
||||
// Tests the matcher pipeline on a constantly interrupted and resumed work pattern
|
||||
// with the aim of ensuring data items are requested only once.
|
||||
func TestMatcherIntermittent(t *testing.T) { |
||||
t.Parallel() |
||||
testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 0, 100000, true, 75) |
||||
testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 0, 100000, true, 81) |
||||
testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 0, 10000, true, 36) |
||||
} |
||||
|
||||
// Tests the matcher pipeline on random input to hopefully catch anomalies.
|
||||
func TestMatcherRandom(t *testing.T) { |
||||
t.Parallel() |
||||
for i := 0; i < 10; i++ { |
||||
testMatcherBothModes(t, makeRandomIndexes([]int{1}, 50), 0, 10000, 0) |
||||
testMatcherBothModes(t, makeRandomIndexes([]int{3}, 50), 0, 10000, 0) |
||||
testMatcherBothModes(t, makeRandomIndexes([]int{2, 2, 2}, 20), 0, 10000, 0) |
||||
testMatcherBothModes(t, makeRandomIndexes([]int{5, 5, 5}, 50), 0, 10000, 0) |
||||
testMatcherBothModes(t, makeRandomIndexes([]int{4, 4, 4}, 20), 0, 10000, 0) |
||||
} |
||||
} |
||||
|
||||
// Tests that the matcher can properly find matches if the starting block is
|
||||
// shifted from a multiple of 8. This is needed to cover an optimisation with
|
||||
// bitset matching https://github.com/ethereum/go-ethereum/issues/15309.
|
||||
func TestMatcherShifted(t *testing.T) { |
||||
t.Parallel() |
||||
// Block 0 always matches in the tests, skip ahead of first 8 blocks with the
|
||||
// start to get a potential zero byte in the matcher bitset.
|
||||
|
||||
// To keep the second bitset byte zero, the filter must only match for the first
|
||||
// time in block 16, so doing an all-16 bit filter should suffice.
|
||||
|
||||
// To keep the starting block non divisible by 8, block number 9 is the first
|
||||
// that would introduce a shift and not match block 0.
|
||||
testMatcherBothModes(t, [][]bloomIndexes{{{16, 16, 16}}}, 9, 64, 0) |
||||
} |
||||
|
||||
// Tests that matching on everything doesn't crash (special case internally).
|
||||
func TestWildcardMatcher(t *testing.T) { |
||||
t.Parallel() |
||||
testMatcherBothModes(t, nil, 0, 10000, 0) |
||||
} |
||||
|
||||
// makeRandomIndexes generates a random filter system, composed of multiple filter
|
||||
// criteria, each having one bloom list component for the address and arbitrarily
|
||||
// many topic bloom list components.
|
||||
func makeRandomIndexes(lengths []int, max int) [][]bloomIndexes { |
||||
res := make([][]bloomIndexes, len(lengths)) |
||||
for i, topics := range lengths { |
||||
res[i] = make([]bloomIndexes, topics) |
||||
for j := 0; j < topics; j++ { |
||||
for k := 0; k < len(res[i][j]); k++ { |
||||
res[i][j][k] = uint(rand.Intn(max-1) + 2) |
||||
} |
||||
} |
||||
} |
||||
return res |
||||
} |
||||
|
||||
// testMatcherDiffBatches runs the given matches test in single-delivery and also
|
||||
// in batches delivery mode, verifying that all kinds of deliveries are handled
|
||||
// correctly within.
|
||||
func testMatcherDiffBatches(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, intermittent bool, retrievals uint32) { |
||||
singleton := testMatcher(t, filter, start, blocks, intermittent, retrievals, 1) |
||||
batched := testMatcher(t, filter, start, blocks, intermittent, retrievals, 16) |
||||
|
||||
if singleton != batched { |
||||
t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, %v in singleton vs. %v in batched mode", filter, blocks, intermittent, singleton, batched) |
||||
} |
||||
} |
||||
|
||||
// testMatcherBothModes runs the given matcher test in both continuous as well as
|
||||
// in intermittent mode, verifying that the request counts match each other.
|
||||
func testMatcherBothModes(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, retrievals uint32) { |
||||
continuous := testMatcher(t, filter, start, blocks, false, retrievals, 16) |
||||
intermittent := testMatcher(t, filter, start, blocks, true, retrievals, 16) |
||||
|
||||
if continuous != intermittent { |
||||
t.Errorf("filter = %v blocks = %v: request count mismatch, %v in continuous vs. %v in intermittent mode", filter, blocks, continuous, intermittent) |
||||
} |
||||
} |
||||
|
||||
// testMatcher is a generic tester to run the given matcher test and return the
|
||||
// number of requests made for cross validation between different modes.
|
||||
func testMatcher(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, intermittent bool, retrievals uint32, maxReqCount int) uint32 { |
||||
// Create a new matcher an simulate our explicit random bitsets
|
||||
matcher := NewMatcher(testSectionSize, nil) |
||||
matcher.filters = filter |
||||
|
||||
for _, rule := range filter { |
||||
for _, topic := range rule { |
||||
for _, bit := range topic { |
||||
matcher.addScheduler(bit) |
||||
} |
||||
} |
||||
} |
||||
// Track the number of retrieval requests made
|
||||
var requested atomic.Uint32 |
||||
|
||||
// Start the matching session for the filter and the retriever goroutines
|
||||
quit := make(chan struct{}) |
||||
matches := make(chan uint64, 16) |
||||
|
||||
session, err := matcher.Start(context.Background(), start, blocks-1, matches) |
||||
if err != nil { |
||||
t.Fatalf("failed to stat matcher session: %v", err) |
||||
} |
||||
startRetrievers(session, quit, &requested, maxReqCount) |
||||
|
||||
// Iterate over all the blocks and verify that the pipeline produces the correct matches
|
||||
for i := start; i < blocks; i++ { |
||||
if expMatch3(filter, i) { |
||||
match, ok := <-matches |
||||
if !ok { |
||||
t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, results channel closed", filter, blocks, intermittent, i) |
||||
return 0 |
||||
} |
||||
if match != i { |
||||
t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, got #%v", filter, blocks, intermittent, i, match) |
||||
} |
||||
// If we're testing intermittent mode, abort and restart the pipeline
|
||||
if intermittent { |
||||
session.Close() |
||||
close(quit) |
||||
|
||||
quit = make(chan struct{}) |
||||
matches = make(chan uint64, 16) |
||||
|
||||
session, err = matcher.Start(context.Background(), i+1, blocks-1, matches) |
||||
if err != nil { |
||||
t.Fatalf("failed to stat matcher session: %v", err) |
||||
} |
||||
startRetrievers(session, quit, &requested, maxReqCount) |
||||
} |
||||
} |
||||
} |
||||
// Ensure the result channel is torn down after the last block
|
||||
match, ok := <-matches |
||||
if ok { |
||||
t.Errorf("filter = %v blocks = %v intermittent = %v: expected closed channel, got #%v", filter, blocks, intermittent, match) |
||||
} |
||||
// Clean up the session and ensure we match the expected retrieval count
|
||||
session.Close() |
||||
close(quit) |
||||
|
||||
if retrievals != 0 && requested.Load() != retrievals { |
||||
t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, have #%v, want #%v", filter, blocks, intermittent, requested.Load(), retrievals) |
||||
} |
||||
return requested.Load() |
||||
} |
||||
|
||||
// startRetrievers starts a batch of goroutines listening for section requests
|
||||
// and serving them.
|
||||
func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *atomic.Uint32, batch int) { |
||||
requests := make(chan chan *Retrieval) |
||||
|
||||
for i := 0; i < 10; i++ { |
||||
// Start a multiplexer to test multiple threaded execution
|
||||
go session.Multiplex(batch, 100*time.Microsecond, requests) |
||||
|
||||
// Start a services to match the above multiplexer
|
||||
go func() { |
||||
for { |
||||
// Wait for a service request or a shutdown
|
||||
select { |
||||
case <-quit: |
||||
return |
||||
|
||||
case request := <-requests: |
||||
task := <-request |
||||
|
||||
task.Bitsets = make([][]byte, len(task.Sections)) |
||||
for i, section := range task.Sections { |
||||
if rand.Int()%4 != 0 { // Handle occasional missing deliveries
|
||||
task.Bitsets[i] = generateBitset(task.Bit, section) |
||||
retrievals.Add(1) |
||||
} |
||||
} |
||||
request <- task |
||||
} |
||||
} |
||||
}() |
||||
} |
||||
} |
||||
|
||||
// generateBitset generates the rotated bitset for the given bloom bit and section
|
||||
// numbers.
|
||||
func generateBitset(bit uint, section uint64) []byte { |
||||
bitset := make([]byte, testSectionSize/8) |
||||
for i := 0; i < len(bitset); i++ { |
||||
for b := 0; b < 8; b++ { |
||||
blockIdx := section*testSectionSize + uint64(i*8+b) |
||||
bitset[i] += bitset[i] |
||||
if (blockIdx % uint64(bit)) == 0 { |
||||
bitset[i]++ |
||||
} |
||||
} |
||||
} |
||||
return bitset |
||||
} |
||||
|
||||
func expMatch1(filter bloomIndexes, i uint64) bool { |
||||
for _, ii := range filter { |
||||
if (i % uint64(ii)) != 0 { |
||||
return false |
||||
} |
||||
} |
||||
return true |
||||
} |
||||
|
||||
func expMatch2(filter []bloomIndexes, i uint64) bool { |
||||
for _, ii := range filter { |
||||
if expMatch1(ii, i) { |
||||
return true |
||||
} |
||||
} |
||||
return false |
||||
} |
||||
|
||||
func expMatch3(filter [][]bloomIndexes, i uint64) bool { |
||||
for _, ii := range filter { |
||||
if !expMatch2(ii, i) { |
||||
return false |
||||
} |
||||
} |
||||
return true |
||||
} |
@ -1,181 +0,0 @@ |
||||
// Copyright 2017 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package bloombits |
||||
|
||||
import ( |
||||
"sync" |
||||
) |
||||
|
||||
// request represents a bloom retrieval task to prioritize and pull from the local
|
||||
// database or remotely from the network.
|
||||
type request struct { |
||||
section uint64 // Section index to retrieve the bit-vector from
|
||||
bit uint // Bit index within the section to retrieve the vector of
|
||||
} |
||||
|
||||
// response represents the state of a requested bit-vector through a scheduler.
|
||||
type response struct { |
||||
cached []byte // Cached bits to dedup multiple requests
|
||||
done chan struct{} // Channel to allow waiting for completion
|
||||
} |
||||
|
||||
// scheduler handles the scheduling of bloom-filter retrieval operations for
|
||||
// entire section-batches belonging to a single bloom bit. Beside scheduling the
|
||||
// retrieval operations, this struct also deduplicates the requests and caches
|
||||
// the results to minimize network/database overhead even in complex filtering
|
||||
// scenarios.
|
||||
type scheduler struct { |
||||
bit uint // Index of the bit in the bloom filter this scheduler is responsible for
|
||||
responses map[uint64]*response // Currently pending retrieval requests or already cached responses
|
||||
lock sync.Mutex // Lock protecting the responses from concurrent access
|
||||
} |
||||
|
||||
// newScheduler creates a new bloom-filter retrieval scheduler for a specific
|
||||
// bit index.
|
||||
func newScheduler(idx uint) *scheduler { |
||||
return &scheduler{ |
||||
bit: idx, |
||||
responses: make(map[uint64]*response), |
||||
} |
||||
} |
||||
|
||||
// run creates a retrieval pipeline, receiving section indexes from sections and
|
||||
// returning the results in the same order through the done channel. Concurrent
|
||||
// runs of the same scheduler are allowed, leading to retrieval task deduplication.
|
||||
func (s *scheduler) run(sections chan uint64, dist chan *request, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) { |
||||
// Create a forwarder channel between requests and responses of the same size as
|
||||
// the distribution channel (since that will block the pipeline anyway).
|
||||
pend := make(chan uint64, cap(dist)) |
||||
|
||||
// Start the pipeline schedulers to forward between user -> distributor -> user
|
||||
wg.Add(2) |
||||
go s.scheduleRequests(sections, dist, pend, quit, wg) |
||||
go s.scheduleDeliveries(pend, done, quit, wg) |
||||
} |
||||
|
||||
// reset cleans up any leftovers from previous runs. This is required before a
|
||||
// restart to ensure the no previously requested but never delivered state will
|
||||
// cause a lockup.
|
||||
func (s *scheduler) reset() { |
||||
s.lock.Lock() |
||||
defer s.lock.Unlock() |
||||
|
||||
for section, res := range s.responses { |
||||
if res.cached == nil { |
||||
delete(s.responses, section) |
||||
} |
||||
} |
||||
} |
||||
|
||||
// scheduleRequests reads section retrieval requests from the input channel,
|
||||
// deduplicates the stream and pushes unique retrieval tasks into the distribution
|
||||
// channel for a database or network layer to honour.
|
||||
func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}, wg *sync.WaitGroup) { |
||||
// Clean up the goroutine and pipeline when done
|
||||
defer wg.Done() |
||||
defer close(pend) |
||||
|
||||
// Keep reading and scheduling section requests
|
||||
for { |
||||
select { |
||||
case <-quit: |
||||
return |
||||
|
||||
case section, ok := <-reqs: |
||||
// New section retrieval requested
|
||||
if !ok { |
||||
return |
||||
} |
||||
// Deduplicate retrieval requests
|
||||
unique := false |
||||
|
||||
s.lock.Lock() |
||||
if s.responses[section] == nil { |
||||
s.responses[section] = &response{ |
||||
done: make(chan struct{}), |
||||
} |
||||
unique = true |
||||
} |
||||
s.lock.Unlock() |
||||
|
||||
// Schedule the section for retrieval and notify the deliverer to expect this section
|
||||
if unique { |
||||
select { |
||||
case <-quit: |
||||
return |
||||
case dist <- &request{bit: s.bit, section: section}: |
||||
} |
||||
} |
||||
select { |
||||
case <-quit: |
||||
return |
||||
case pend <- section: |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// scheduleDeliveries reads section acceptance notifications and waits for them
|
||||
// to be delivered, pushing them into the output data buffer.
|
||||
func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) { |
||||
// Clean up the goroutine and pipeline when done
|
||||
defer wg.Done() |
||||
defer close(done) |
||||
|
||||
// Keep reading notifications and scheduling deliveries
|
||||
for { |
||||
select { |
||||
case <-quit: |
||||
return |
||||
|
||||
case idx, ok := <-pend: |
||||
// New section retrieval pending
|
||||
if !ok { |
||||
return |
||||
} |
||||
// Wait until the request is honoured
|
||||
s.lock.Lock() |
||||
res := s.responses[idx] |
||||
s.lock.Unlock() |
||||
|
||||
select { |
||||
case <-quit: |
||||
return |
||||
case <-res.done: |
||||
} |
||||
// Deliver the result
|
||||
select { |
||||
case <-quit: |
||||
return |
||||
case done <- res.cached: |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// deliver is called by the request distributor when a reply to a request arrives.
|
||||
func (s *scheduler) deliver(sections []uint64, data [][]byte) { |
||||
s.lock.Lock() |
||||
defer s.lock.Unlock() |
||||
|
||||
for i, section := range sections { |
||||
if res := s.responses[section]; res != nil && res.cached == nil { // Avoid non-requests and double deliveries
|
||||
res.cached = data[i] |
||||
close(res.done) |
||||
} |
||||
} |
||||
} |
@ -1,103 +0,0 @@ |
||||
// Copyright 2017 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package bloombits |
||||
|
||||
import ( |
||||
"bytes" |
||||
"math/big" |
||||
"sync" |
||||
"sync/atomic" |
||||
"testing" |
||||
) |
||||
|
||||
// Tests that the scheduler can deduplicate and forward retrieval requests to
|
||||
// underlying fetchers and serve responses back, irrelevant of the concurrency
|
||||
// of the requesting clients or serving data fetchers.
|
||||
func TestSchedulerSingleClientSingleFetcher(t *testing.T) { testScheduler(t, 1, 1, 5000) } |
||||
func TestSchedulerSingleClientMultiFetcher(t *testing.T) { testScheduler(t, 1, 10, 5000) } |
||||
func TestSchedulerMultiClientSingleFetcher(t *testing.T) { testScheduler(t, 10, 1, 5000) } |
||||
func TestSchedulerMultiClientMultiFetcher(t *testing.T) { testScheduler(t, 10, 10, 5000) } |
||||
|
||||
func testScheduler(t *testing.T, clients int, fetchers int, requests int) { |
||||
t.Parallel() |
||||
f := newScheduler(0) |
||||
|
||||
// Create a batch of handler goroutines that respond to bloom bit requests and
|
||||
// deliver them to the scheduler.
|
||||
var fetchPend sync.WaitGroup |
||||
fetchPend.Add(fetchers) |
||||
defer fetchPend.Wait() |
||||
|
||||
fetch := make(chan *request, 16) |
||||
defer close(fetch) |
||||
|
||||
var delivered atomic.Uint32 |
||||
for i := 0; i < fetchers; i++ { |
||||
go func() { |
||||
defer fetchPend.Done() |
||||
|
||||
for req := range fetch { |
||||
delivered.Add(1) |
||||
|
||||
f.deliver([]uint64{ |
||||
req.section + uint64(requests), // Non-requested data (ensure it doesn't go out of bounds)
|
||||
req.section, // Requested data
|
||||
req.section, // Duplicated data (ensure it doesn't double close anything)
|
||||
}, [][]byte{ |
||||
{}, |
||||
new(big.Int).SetUint64(req.section).Bytes(), |
||||
new(big.Int).SetUint64(req.section).Bytes(), |
||||
}) |
||||
} |
||||
}() |
||||
} |
||||
// Start a batch of goroutines to concurrently run scheduling tasks
|
||||
quit := make(chan struct{}) |
||||
|
||||
var pend sync.WaitGroup |
||||
pend.Add(clients) |
||||
|
||||
for i := 0; i < clients; i++ { |
||||
go func() { |
||||
defer pend.Done() |
||||
|
||||
in := make(chan uint64, 16) |
||||
out := make(chan []byte, 16) |
||||
|
||||
f.run(in, fetch, out, quit, &pend) |
||||
|
||||
go func() { |
||||
for j := 0; j < requests; j++ { |
||||
in <- uint64(j) |
||||
} |
||||
close(in) |
||||
}() |
||||
b := new(big.Int) |
||||
for j := 0; j < requests; j++ { |
||||
bits := <-out |
||||
if want := b.SetUint64(uint64(j)).Bytes(); !bytes.Equal(bits, want) { |
||||
t.Errorf("vector %d: delivered content mismatch: have %x, want %x", j, bits, want) |
||||
} |
||||
} |
||||
}() |
||||
} |
||||
pend.Wait() |
||||
|
||||
if have := delivered.Load(); int(have) != requests { |
||||
t.Errorf("request count mismatch: have %v, want %v", have, requests) |
||||
} |
||||
} |
@ -1,522 +0,0 @@ |
||||
// Copyright 2017 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package core |
||||
|
||||
import ( |
||||
"context" |
||||
"encoding/binary" |
||||
"errors" |
||||
"fmt" |
||||
"sync" |
||||
"sync/atomic" |
||||
"time" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/core/rawdb" |
||||
"github.com/ethereum/go-ethereum/core/types" |
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
"github.com/ethereum/go-ethereum/event" |
||||
"github.com/ethereum/go-ethereum/log" |
||||
) |
||||
|
||||
// ChainIndexerBackend defines the methods needed to process chain segments in
|
||||
// the background and write the segment results into the database. These can be
|
||||
// used to create filter blooms or CHTs.
|
||||
type ChainIndexerBackend interface { |
||||
// Reset initiates the processing of a new chain segment, potentially terminating
|
||||
// any partially completed operations (in case of a reorg).
|
||||
Reset(ctx context.Context, section uint64, prevHead common.Hash) error |
||||
|
||||
// Process crunches through the next header in the chain segment. The caller
|
||||
// will ensure a sequential order of headers.
|
||||
Process(ctx context.Context, header *types.Header) error |
||||
|
||||
// Commit finalizes the section metadata and stores it into the database.
|
||||
Commit() error |
||||
|
||||
// Prune deletes the chain index older than the given threshold.
|
||||
Prune(threshold uint64) error |
||||
} |
||||
|
||||
// ChainIndexerChain interface is used for connecting the indexer to a blockchain
|
||||
type ChainIndexerChain interface { |
||||
// CurrentHeader retrieves the latest locally known header.
|
||||
CurrentHeader() *types.Header |
||||
|
||||
// SubscribeChainHeadEvent subscribes to new head header notifications.
|
||||
SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription |
||||
} |
||||
|
||||
// ChainIndexer does a post-processing job for equally sized sections of the
|
||||
// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is
|
||||
// connected to the blockchain through the event system by starting a
|
||||
// ChainHeadEventLoop in a goroutine.
|
||||
//
|
||||
// Further child ChainIndexers can be added which use the output of the parent
|
||||
// section indexer. These child indexers receive new head notifications only
|
||||
// after an entire section has been finished or in case of rollbacks that might
|
||||
// affect already finished sections.
|
||||
type ChainIndexer struct { |
||||
chainDb ethdb.Database // Chain database to index the data from
|
||||
indexDb ethdb.Database // Prefixed table-view of the db to write index metadata into
|
||||
backend ChainIndexerBackend // Background processor generating the index data content
|
||||
children []*ChainIndexer // Child indexers to cascade chain updates to
|
||||
|
||||
active atomic.Bool // Flag whether the event loop was started
|
||||
update chan struct{} // Notification channel that headers should be processed
|
||||
quit chan chan error // Quit channel to tear down running goroutines
|
||||
ctx context.Context |
||||
ctxCancel func() |
||||
|
||||
sectionSize uint64 // Number of blocks in a single chain segment to process
|
||||
confirmsReq uint64 // Number of confirmations before processing a completed segment
|
||||
|
||||
storedSections uint64 // Number of sections successfully indexed into the database
|
||||
knownSections uint64 // Number of sections known to be complete (block wise)
|
||||
cascadedHead uint64 // Block number of the last completed section cascaded to subindexers
|
||||
|
||||
checkpointSections uint64 // Number of sections covered by the checkpoint
|
||||
checkpointHead common.Hash // Section head belonging to the checkpoint
|
||||
|
||||
throttling time.Duration // Disk throttling to prevent a heavy upgrade from hogging resources
|
||||
|
||||
log log.Logger |
||||
lock sync.Mutex |
||||
} |
||||
|
||||
// NewChainIndexer creates a new chain indexer to do background processing on
|
||||
// chain segments of a given size after certain number of confirmations passed.
|
||||
// The throttling parameter might be used to prevent database thrashing.
|
||||
func NewChainIndexer(chainDb ethdb.Database, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer { |
||||
c := &ChainIndexer{ |
||||
chainDb: chainDb, |
||||
indexDb: indexDb, |
||||
backend: backend, |
||||
update: make(chan struct{}, 1), |
||||
quit: make(chan chan error), |
||||
sectionSize: section, |
||||
confirmsReq: confirm, |
||||
throttling: throttling, |
||||
log: log.New("type", kind), |
||||
} |
||||
// Initialize database dependent fields and start the updater
|
||||
c.loadValidSections() |
||||
c.ctx, c.ctxCancel = context.WithCancel(context.Background()) |
||||
|
||||
go c.updateLoop() |
||||
|
||||
return c |
||||
} |
||||
|
||||
// AddCheckpoint adds a checkpoint. Sections are never processed and the chain
|
||||
// is not expected to be available before this point. The indexer assumes that
|
||||
// the backend has sufficient information available to process subsequent sections.
|
||||
//
|
||||
// Note: knownSections == 0 and storedSections == checkpointSections until
|
||||
// syncing reaches the checkpoint
|
||||
func (c *ChainIndexer) AddCheckpoint(section uint64, shead common.Hash) { |
||||
c.lock.Lock() |
||||
defer c.lock.Unlock() |
||||
|
||||
// Short circuit if the given checkpoint is below than local's.
|
||||
if c.checkpointSections >= section+1 || section < c.storedSections { |
||||
return |
||||
} |
||||
c.checkpointSections = section + 1 |
||||
c.checkpointHead = shead |
||||
|
||||
c.setSectionHead(section, shead) |
||||
c.setValidSections(section + 1) |
||||
} |
||||
|
||||
// Start creates a goroutine to feed chain head events into the indexer for
|
||||
// cascading background processing. Children do not need to be started, they
|
||||
// are notified about new events by their parents.
|
||||
func (c *ChainIndexer) Start(chain ChainIndexerChain) { |
||||
events := make(chan ChainHeadEvent, 10) |
||||
sub := chain.SubscribeChainHeadEvent(events) |
||||
|
||||
go c.eventLoop(chain.CurrentHeader(), events, sub) |
||||
} |
||||
|
||||
// Close tears down all goroutines belonging to the indexer and returns any error
|
||||
// that might have occurred internally.
|
||||
func (c *ChainIndexer) Close() error { |
||||
var errs []error |
||||
|
||||
c.ctxCancel() |
||||
|
||||
// Tear down the primary update loop
|
||||
errc := make(chan error) |
||||
c.quit <- errc |
||||
if err := <-errc; err != nil { |
||||
errs = append(errs, err) |
||||
} |
||||
// If needed, tear down the secondary event loop
|
||||
if c.active.Load() { |
||||
c.quit <- errc |
||||
if err := <-errc; err != nil { |
||||
errs = append(errs, err) |
||||
} |
||||
} |
||||
// Close all children
|
||||
for _, child := range c.children { |
||||
if err := child.Close(); err != nil { |
||||
errs = append(errs, err) |
||||
} |
||||
} |
||||
// Return any failures
|
||||
switch { |
||||
case len(errs) == 0: |
||||
return nil |
||||
|
||||
case len(errs) == 1: |
||||
return errs[0] |
||||
|
||||
default: |
||||
return fmt.Errorf("%v", errs) |
||||
} |
||||
} |
||||
|
||||
// eventLoop is a secondary - optional - event loop of the indexer which is only
|
||||
// started for the outermost indexer to push chain head events into a processing
|
||||
// queue.
|
||||
func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainHeadEvent, sub event.Subscription) { |
||||
// Mark the chain indexer as active, requiring an additional teardown
|
||||
c.active.Store(true) |
||||
|
||||
defer sub.Unsubscribe() |
||||
|
||||
// Fire the initial new head event to start any outstanding processing
|
||||
c.newHead(currentHeader.Number.Uint64(), false) |
||||
|
||||
var ( |
||||
prevHeader = currentHeader |
||||
prevHash = currentHeader.Hash() |
||||
) |
||||
for { |
||||
select { |
||||
case errc := <-c.quit: |
||||
// Chain indexer terminating, report no failure and abort
|
||||
errc <- nil |
||||
return |
||||
|
||||
case ev, ok := <-events: |
||||
// Received a new event, ensure it's not nil (closing) and update
|
||||
if !ok { |
||||
errc := <-c.quit |
||||
errc <- nil |
||||
return |
||||
} |
||||
if ev.Header.ParentHash != prevHash { |
||||
// Reorg to the common ancestor if needed (might not exist in light sync mode, skip reorg then)
|
||||
// TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly?
|
||||
|
||||
if rawdb.ReadCanonicalHash(c.chainDb, prevHeader.Number.Uint64()) != prevHash { |
||||
if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, ev.Header); h != nil { |
||||
c.newHead(h.Number.Uint64(), true) |
||||
} |
||||
} |
||||
} |
||||
c.newHead(ev.Header.Number.Uint64(), false) |
||||
|
||||
prevHeader, prevHash = ev.Header, ev.Header.Hash() |
||||
} |
||||
} |
||||
} |
||||
|
||||
// newHead notifies the indexer about new chain heads and/or reorgs.
|
||||
func (c *ChainIndexer) newHead(head uint64, reorg bool) { |
||||
c.lock.Lock() |
||||
defer c.lock.Unlock() |
||||
|
||||
// If a reorg happened, invalidate all sections until that point
|
||||
if reorg { |
||||
// Revert the known section number to the reorg point
|
||||
known := (head + 1) / c.sectionSize |
||||
stored := known |
||||
if known < c.checkpointSections { |
||||
known = 0 |
||||
} |
||||
if stored < c.checkpointSections { |
||||
stored = c.checkpointSections |
||||
} |
||||
if known < c.knownSections { |
||||
c.knownSections = known |
||||
} |
||||
// Revert the stored sections from the database to the reorg point
|
||||
if stored < c.storedSections { |
||||
c.setValidSections(stored) |
||||
} |
||||
// Update the new head number to the finalized section end and notify children
|
||||
head = known * c.sectionSize |
||||
|
||||
if head < c.cascadedHead { |
||||
c.cascadedHead = head |
||||
for _, child := range c.children { |
||||
child.newHead(c.cascadedHead, true) |
||||
} |
||||
} |
||||
return |
||||
} |
||||
// No reorg, calculate the number of newly known sections and update if high enough
|
||||
var sections uint64 |
||||
if head >= c.confirmsReq { |
||||
sections = (head + 1 - c.confirmsReq) / c.sectionSize |
||||
if sections < c.checkpointSections { |
||||
sections = 0 |
||||
} |
||||
if sections > c.knownSections { |
||||
if c.knownSections < c.checkpointSections { |
||||
// syncing reached the checkpoint, verify section head
|
||||
syncedHead := rawdb.ReadCanonicalHash(c.chainDb, c.checkpointSections*c.sectionSize-1) |
||||
if syncedHead != c.checkpointHead { |
||||
c.log.Error("Synced chain does not match checkpoint", "number", c.checkpointSections*c.sectionSize-1, "expected", c.checkpointHead, "synced", syncedHead) |
||||
return |
||||
} |
||||
} |
||||
c.knownSections = sections |
||||
|
||||
select { |
||||
case c.update <- struct{}{}: |
||||
default: |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// updateLoop is the main event loop of the indexer which pushes chain segments
|
||||
// down into the processing backend.
|
||||
func (c *ChainIndexer) updateLoop() { |
||||
var ( |
||||
updating bool |
||||
updated time.Time |
||||
) |
||||
|
||||
for { |
||||
select { |
||||
case errc := <-c.quit: |
||||
// Chain indexer terminating, report no failure and abort
|
||||
errc <- nil |
||||
return |
||||
|
||||
case <-c.update: |
||||
// Section headers completed (or rolled back), update the index
|
||||
c.lock.Lock() |
||||
if c.knownSections > c.storedSections { |
||||
// Periodically print an upgrade log message to the user
|
||||
if time.Since(updated) > 8*time.Second { |
||||
if c.knownSections > c.storedSections+1 { |
||||
updating = true |
||||
c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections) |
||||
} |
||||
updated = time.Now() |
||||
} |
||||
// Cache the current section count and head to allow unlocking the mutex
|
||||
c.verifyLastHead() |
||||
section := c.storedSections |
||||
var oldHead common.Hash |
||||
if section > 0 { |
||||
oldHead = c.SectionHead(section - 1) |
||||
} |
||||
// Process the newly defined section in the background
|
||||
c.lock.Unlock() |
||||
newHead, err := c.processSection(section, oldHead) |
||||
if err != nil { |
||||
select { |
||||
case <-c.ctx.Done(): |
||||
<-c.quit <- nil |
||||
return |
||||
default: |
||||
} |
||||
c.log.Error("Section processing failed", "error", err) |
||||
} |
||||
c.lock.Lock() |
||||
|
||||
// If processing succeeded and no reorgs occurred, mark the section completed
|
||||
if err == nil && (section == 0 || oldHead == c.SectionHead(section-1)) { |
||||
c.setSectionHead(section, newHead) |
||||
c.setValidSections(section + 1) |
||||
if c.storedSections == c.knownSections && updating { |
||||
updating = false |
||||
c.log.Info("Finished upgrading chain index") |
||||
} |
||||
c.cascadedHead = c.storedSections*c.sectionSize - 1 |
||||
for _, child := range c.children { |
||||
c.log.Trace("Cascading chain index update", "head", c.cascadedHead) |
||||
child.newHead(c.cascadedHead, false) |
||||
} |
||||
} else { |
||||
// If processing failed, don't retry until further notification
|
||||
c.log.Debug("Chain index processing failed", "section", section, "err", err) |
||||
c.verifyLastHead() |
||||
c.knownSections = c.storedSections |
||||
} |
||||
} |
||||
// If there are still further sections to process, reschedule
|
||||
if c.knownSections > c.storedSections { |
||||
time.AfterFunc(c.throttling, func() { |
||||
select { |
||||
case c.update <- struct{}{}: |
||||
default: |
||||
} |
||||
}) |
||||
} |
||||
c.lock.Unlock() |
||||
} |
||||
} |
||||
} |
||||
|
||||
// processSection processes an entire section by calling backend functions while
|
||||
// ensuring the continuity of the passed headers. Since the chain mutex is not
|
||||
// held while processing, the continuity can be broken by a long reorg, in which
|
||||
// case the function returns with an error.
|
||||
func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) { |
||||
c.log.Trace("Processing new chain section", "section", section) |
||||
|
||||
// Reset and partial processing
|
||||
if err := c.backend.Reset(c.ctx, section, lastHead); err != nil { |
||||
c.setValidSections(0) |
||||
return common.Hash{}, err |
||||
} |
||||
|
||||
for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ { |
||||
hash := rawdb.ReadCanonicalHash(c.chainDb, number) |
||||
if hash == (common.Hash{}) { |
||||
return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number) |
||||
} |
||||
header := rawdb.ReadHeader(c.chainDb, hash, number) |
||||
if header == nil { |
||||
return common.Hash{}, fmt.Errorf("block #%d [%x..] not found", number, hash[:4]) |
||||
} else if header.ParentHash != lastHead { |
||||
return common.Hash{}, errors.New("chain reorged during section processing") |
||||
} |
||||
if err := c.backend.Process(c.ctx, header); err != nil { |
||||
return common.Hash{}, err |
||||
} |
||||
lastHead = header.Hash() |
||||
} |
||||
if err := c.backend.Commit(); err != nil { |
||||
return common.Hash{}, err |
||||
} |
||||
return lastHead, nil |
||||
} |
||||
|
||||
// verifyLastHead compares last stored section head with the corresponding block hash in the
|
||||
// actual canonical chain and rolls back reorged sections if necessary to ensure that stored
|
||||
// sections are all valid
|
||||
func (c *ChainIndexer) verifyLastHead() { |
||||
for c.storedSections > 0 && c.storedSections > c.checkpointSections { |
||||
if c.SectionHead(c.storedSections-1) == rawdb.ReadCanonicalHash(c.chainDb, c.storedSections*c.sectionSize-1) { |
||||
return |
||||
} |
||||
c.setValidSections(c.storedSections - 1) |
||||
} |
||||
} |
||||
|
||||
// Sections returns the number of processed sections maintained by the indexer
|
||||
// and also the information about the last header indexed for potential canonical
|
||||
// verifications.
|
||||
func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) { |
||||
c.lock.Lock() |
||||
defer c.lock.Unlock() |
||||
|
||||
c.verifyLastHead() |
||||
return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1) |
||||
} |
||||
|
||||
// AddChildIndexer adds a child ChainIndexer that can use the output of this one
|
||||
func (c *ChainIndexer) AddChildIndexer(indexer *ChainIndexer) { |
||||
if indexer == c { |
||||
panic("can't add indexer as a child of itself") |
||||
} |
||||
c.lock.Lock() |
||||
defer c.lock.Unlock() |
||||
|
||||
c.children = append(c.children, indexer) |
||||
|
||||
// Cascade any pending updates to new children too
|
||||
sections := c.storedSections |
||||
if c.knownSections < sections { |
||||
// if a section is "stored" but not "known" then it is a checkpoint without
|
||||
// available chain data so we should not cascade it yet
|
||||
sections = c.knownSections |
||||
} |
||||
if sections > 0 { |
||||
indexer.newHead(sections*c.sectionSize-1, false) |
||||
} |
||||
} |
||||
|
||||
// Prune deletes all chain data older than given threshold.
|
||||
func (c *ChainIndexer) Prune(threshold uint64) error { |
||||
return c.backend.Prune(threshold) |
||||
} |
||||
|
||||
// loadValidSections reads the number of valid sections from the index database
|
||||
// and caches is into the local state.
|
||||
func (c *ChainIndexer) loadValidSections() { |
||||
data, _ := c.indexDb.Get([]byte("count")) |
||||
if len(data) == 8 { |
||||
c.storedSections = binary.BigEndian.Uint64(data) |
||||
} |
||||
} |
||||
|
||||
// setValidSections writes the number of valid sections to the index database
|
||||
func (c *ChainIndexer) setValidSections(sections uint64) { |
||||
// Set the current number of valid sections in the database
|
||||
var data [8]byte |
||||
binary.BigEndian.PutUint64(data[:], sections) |
||||
c.indexDb.Put([]byte("count"), data[:]) |
||||
|
||||
// Remove any reorged sections, caching the valids in the mean time
|
||||
for c.storedSections > sections { |
||||
c.storedSections-- |
||||
c.removeSectionHead(c.storedSections) |
||||
} |
||||
c.storedSections = sections // needed if new > old
|
||||
} |
||||
|
||||
// SectionHead retrieves the last block hash of a processed section from the
|
||||
// index database.
|
||||
func (c *ChainIndexer) SectionHead(section uint64) common.Hash { |
||||
var data [8]byte |
||||
binary.BigEndian.PutUint64(data[:], section) |
||||
|
||||
hash, _ := c.indexDb.Get(append([]byte("shead"), data[:]...)) |
||||
if len(hash) == len(common.Hash{}) { |
||||
return common.BytesToHash(hash) |
||||
} |
||||
return common.Hash{} |
||||
} |
||||
|
||||
// setSectionHead writes the last block hash of a processed section to the index
|
||||
// database.
|
||||
func (c *ChainIndexer) setSectionHead(section uint64, hash common.Hash) { |
||||
var data [8]byte |
||||
binary.BigEndian.PutUint64(data[:], section) |
||||
|
||||
c.indexDb.Put(append([]byte("shead"), data[:]...), hash.Bytes()) |
||||
} |
||||
|
||||
// removeSectionHead removes the reference to a processed section from the index
|
||||
// database.
|
||||
func (c *ChainIndexer) removeSectionHead(section uint64) { |
||||
var data [8]byte |
||||
binary.BigEndian.PutUint64(data[:], section) |
||||
|
||||
c.indexDb.Delete(append([]byte("shead"), data[:]...)) |
||||
} |
@ -1,246 +0,0 @@ |
||||
// Copyright 2017 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package core |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"fmt" |
||||
"math/big" |
||||
"math/rand" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/core/rawdb" |
||||
"github.com/ethereum/go-ethereum/core/types" |
||||
) |
||||
|
||||
// Runs multiple tests with randomized parameters.
|
||||
func TestChainIndexerSingle(t *testing.T) { |
||||
for i := 0; i < 10; i++ { |
||||
testChainIndexer(t, 1) |
||||
} |
||||
} |
||||
|
||||
// Runs multiple tests with randomized parameters and different number of
|
||||
// chain backends.
|
||||
func TestChainIndexerWithChildren(t *testing.T) { |
||||
for i := 2; i < 8; i++ { |
||||
testChainIndexer(t, i) |
||||
} |
||||
} |
||||
|
||||
// testChainIndexer runs a test with either a single chain indexer or a chain of
|
||||
// multiple backends. The section size and required confirmation count parameters
|
||||
// are randomized.
|
||||
func testChainIndexer(t *testing.T, count int) { |
||||
db := rawdb.NewMemoryDatabase() |
||||
defer db.Close() |
||||
|
||||
// Create a chain of indexers and ensure they all report empty
|
||||
backends := make([]*testChainIndexBackend, count) |
||||
for i := 0; i < count; i++ { |
||||
var ( |
||||
sectionSize = uint64(rand.Intn(100) + 1) |
||||
confirmsReq = uint64(rand.Intn(10)) |
||||
) |
||||
backends[i] = &testChainIndexBackend{t: t, processCh: make(chan uint64)} |
||||
backends[i].indexer = NewChainIndexer(db, rawdb.NewTable(db, string([]byte{byte(i)})), backends[i], sectionSize, confirmsReq, 0, fmt.Sprintf("indexer-%d", i)) |
||||
|
||||
if sections, _, _ := backends[i].indexer.Sections(); sections != 0 { |
||||
t.Fatalf("Canonical section count mismatch: have %v, want %v", sections, 0) |
||||
} |
||||
if i > 0 { |
||||
backends[i-1].indexer.AddChildIndexer(backends[i].indexer) |
||||
} |
||||
} |
||||
defer backends[0].indexer.Close() // parent indexer shuts down children
|
||||
// notify pings the root indexer about a new head or reorg, then expect
|
||||
// processed blocks if a section is processable
|
||||
notify := func(headNum, failNum uint64, reorg bool) { |
||||
backends[0].indexer.newHead(headNum, reorg) |
||||
if reorg { |
||||
for _, backend := range backends { |
||||
headNum = backend.reorg(headNum) |
||||
backend.assertSections() |
||||
} |
||||
return |
||||
} |
||||
var cascade bool |
||||
for _, backend := range backends { |
||||
headNum, cascade = backend.assertBlocks(headNum, failNum) |
||||
if !cascade { |
||||
break |
||||
} |
||||
backend.assertSections() |
||||
} |
||||
} |
||||
// inject inserts a new random canonical header into the database directly
|
||||
inject := func(number uint64) { |
||||
header := &types.Header{Number: big.NewInt(int64(number)), Extra: big.NewInt(rand.Int63()).Bytes()} |
||||
if number > 0 { |
||||
header.ParentHash = rawdb.ReadCanonicalHash(db, number-1) |
||||
} |
||||
rawdb.WriteHeader(db, header) |
||||
rawdb.WriteCanonicalHash(db, header.Hash(), number) |
||||
} |
||||
// Start indexer with an already existing chain
|
||||
for i := uint64(0); i <= 100; i++ { |
||||
inject(i) |
||||
} |
||||
notify(100, 100, false) |
||||
|
||||
// Add new blocks one by one
|
||||
for i := uint64(101); i <= 1000; i++ { |
||||
inject(i) |
||||
notify(i, i, false) |
||||
} |
||||
// Do a reorg
|
||||
notify(500, 500, true) |
||||
|
||||
// Create new fork
|
||||
for i := uint64(501); i <= 1000; i++ { |
||||
inject(i) |
||||
notify(i, i, false) |
||||
} |
||||
for i := uint64(1001); i <= 1500; i++ { |
||||
inject(i) |
||||
} |
||||
// Failed processing scenario where less blocks are available than notified
|
||||
notify(2000, 1500, false) |
||||
|
||||
// Notify about a reorg (which could have caused the missing blocks if happened during processing)
|
||||
notify(1500, 1500, true) |
||||
|
||||
// Create new fork
|
||||
for i := uint64(1501); i <= 2000; i++ { |
||||
inject(i) |
||||
notify(i, i, false) |
||||
} |
||||
} |
||||
|
||||
// testChainIndexBackend implements ChainIndexerBackend
|
||||
type testChainIndexBackend struct { |
||||
t *testing.T |
||||
indexer *ChainIndexer |
||||
section, headerCnt, stored uint64 |
||||
processCh chan uint64 |
||||
} |
||||
|
||||
// assertSections verifies if a chain indexer has the correct number of section.
|
||||
func (b *testChainIndexBackend) assertSections() { |
||||
// Keep trying for 3 seconds if it does not match
|
||||
var sections uint64 |
||||
for i := 0; i < 300; i++ { |
||||
sections, _, _ = b.indexer.Sections() |
||||
if sections == b.stored { |
||||
return |
||||
} |
||||
time.Sleep(10 * time.Millisecond) |
||||
} |
||||
b.t.Fatalf("Canonical section count mismatch: have %v, want %v", sections, b.stored) |
||||
} |
||||
|
||||
// assertBlocks expects processing calls after new blocks have arrived. If the
|
||||
// failNum < headNum then we are simulating a scenario where a reorg has happened
|
||||
// after the processing has started and the processing of a section fails.
|
||||
func (b *testChainIndexBackend) assertBlocks(headNum, failNum uint64) (uint64, bool) { |
||||
var sections uint64 |
||||
if headNum >= b.indexer.confirmsReq { |
||||
sections = (headNum + 1 - b.indexer.confirmsReq) / b.indexer.sectionSize |
||||
if sections > b.stored { |
||||
// expect processed blocks
|
||||
for expectd := b.stored * b.indexer.sectionSize; expectd < sections*b.indexer.sectionSize; expectd++ { |
||||
if expectd > failNum { |
||||
// rolled back after processing started, no more process calls expected
|
||||
// wait until updating is done to make sure that processing actually fails
|
||||
var updating bool |
||||
for i := 0; i < 300; i++ { |
||||
b.indexer.lock.Lock() |
||||
updating = b.indexer.knownSections > b.indexer.storedSections |
||||
b.indexer.lock.Unlock() |
||||
if !updating { |
||||
break |
||||
} |
||||
time.Sleep(10 * time.Millisecond) |
||||
} |
||||
if updating { |
||||
b.t.Fatalf("update did not finish") |
||||
} |
||||
sections = expectd / b.indexer.sectionSize |
||||
break |
||||
} |
||||
select { |
||||
case <-time.After(10 * time.Second): |
||||
b.t.Fatalf("Expected processed block #%d, got nothing", expectd) |
||||
case processed := <-b.processCh: |
||||
if processed != expectd { |
||||
b.t.Errorf("Expected processed block #%d, got #%d", expectd, processed) |
||||
} |
||||
} |
||||
} |
||||
b.stored = sections |
||||
} |
||||
} |
||||
if b.stored == 0 { |
||||
return 0, false |
||||
} |
||||
return b.stored*b.indexer.sectionSize - 1, true |
||||
} |
||||
|
||||
func (b *testChainIndexBackend) reorg(headNum uint64) uint64 { |
||||
firstChanged := (headNum + 1) / b.indexer.sectionSize |
||||
if firstChanged < b.stored { |
||||
b.stored = firstChanged |
||||
} |
||||
return b.stored * b.indexer.sectionSize |
||||
} |
||||
|
||||
func (b *testChainIndexBackend) Reset(ctx context.Context, section uint64, prevHead common.Hash) error { |
||||
b.section = section |
||||
b.headerCnt = 0 |
||||
return nil |
||||
} |
||||
|
||||
func (b *testChainIndexBackend) Process(ctx context.Context, header *types.Header) error { |
||||
b.headerCnt++ |
||||
if b.headerCnt > b.indexer.sectionSize { |
||||
b.t.Error("Processing too many headers") |
||||
} |
||||
//t.processCh <- header.Number.Uint64()
|
||||
select { |
||||
case <-time.After(10 * time.Second): |
||||
b.t.Error("Unexpected call to Process") |
||||
// Can't use Fatal since this is not the test's goroutine.
|
||||
// Returning error stops the chainIndexer's updateLoop
|
||||
return errors.New("unexpected call to Process") |
||||
case b.processCh <- header.Number.Uint64(): |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (b *testChainIndexBackend) Commit() error { |
||||
if b.headerCnt != b.indexer.sectionSize { |
||||
b.t.Error("Not enough headers processed") |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (b *testChainIndexBackend) Prune(threshold uint64) error { |
||||
return nil |
||||
} |
@ -1,74 +0,0 @@ |
||||
// Copyright 2017 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package eth |
||||
|
||||
import ( |
||||
"time" |
||||
|
||||
"github.com/ethereum/go-ethereum/common/bitutil" |
||||
"github.com/ethereum/go-ethereum/core/rawdb" |
||||
) |
||||
|
||||
const ( |
||||
// bloomServiceThreads is the number of goroutines used globally by an Ethereum
|
||||
// instance to service bloombits lookups for all running filters.
|
||||
bloomServiceThreads = 16 |
||||
|
||||
// bloomFilterThreads is the number of goroutines used locally per filter to
|
||||
// multiplex requests onto the global servicing goroutines.
|
||||
bloomFilterThreads = 3 |
||||
|
||||
// bloomRetrievalBatch is the maximum number of bloom bit retrievals to service
|
||||
// in a single batch.
|
||||
bloomRetrievalBatch = 16 |
||||
|
||||
// bloomRetrievalWait is the maximum time to wait for enough bloom bit requests
|
||||
// to accumulate request an entire batch (avoiding hysteresis).
|
||||
bloomRetrievalWait = time.Duration(0) |
||||
) |
||||
|
||||
// startBloomHandlers starts a batch of goroutines to accept bloom bit database
|
||||
// retrievals from possibly a range of filters and serving the data to satisfy.
|
||||
func (eth *Ethereum) startBloomHandlers(sectionSize uint64) { |
||||
for i := 0; i < bloomServiceThreads; i++ { |
||||
go func() { |
||||
for { |
||||
select { |
||||
case <-eth.closeBloomHandler: |
||||
return |
||||
|
||||
case request := <-eth.bloomRequests: |
||||
task := <-request |
||||
task.Bitsets = make([][]byte, len(task.Sections)) |
||||
for i, section := range task.Sections { |
||||
head := rawdb.ReadCanonicalHash(eth.chainDb, (section+1)*sectionSize-1) |
||||
if compVector, err := rawdb.ReadBloomBits(eth.chainDb, task.Bit, section, head); err == nil { |
||||
if blob, err := bitutil.DecompressBytes(compVector, int(sectionSize/8)); err == nil { |
||||
task.Bitsets[i] = blob |
||||
} else { |
||||
task.Error = err |
||||
} |
||||
} else { |
||||
task.Error = err |
||||
} |
||||
} |
||||
request <- task |
||||
} |
||||
} |
||||
}() |
||||
} |
||||
} |
Loading…
Reference in new issue