eth/filters: fixed tests, added more

Zsolt Felfoldi 5 months ago
parent 4ad24e0b07
commit 464ae36769
  1. 18
      core/filtermaps/filtermaps.go
  2. 57
      core/filtermaps/indexer.go
  3. 74
      core/filtermaps/indexer_test.go
  4. 7
      core/filtermaps/matcher_backend.go
  5. 2
      core/rawdb/accessors_indexes_test.go
  6. 4
      core/rawdb/database.go
  7. 160
      eth/filters/filter.go
  8. 36
      eth/filters/filter_system_test.go
  9. 184
      eth/filters/filter_test.go
  10. 4
      internal/ethapi/api_test.go
  11. 3
      internal/ethapi/transaction_args_test.go

@ -20,7 +20,7 @@ const headCacheSize = 8 // maximum number of recent filter maps cached in memory
// blockchain defines functions required by the FilterMaps log indexer.
type blockchain interface {
CurrentBlock() *types.Header
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
GetHeader(hash common.Hash, number uint64) *types.Header
GetCanonicalHash(number uint64) common.Hash
GetReceiptsByHash(hash common.Hash) types.Receipts
@ -55,7 +55,8 @@ type FilterMaps struct {
lvPointerCache *lru.Cache[uint64, uint64]
revertPoints map[uint64]*revertPoint
testHook func(int)
waitIdleCh chan chan bool
testHook func(int)
}
// filterMap is a full or partial in-memory representation of a filter map where
@ -104,12 +105,13 @@ func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, hist
}
params.deriveFields()
fm := &FilterMaps{
db: db,
chain: chain,
closeCh: make(chan struct{}),
history: history,
noHistory: noHistory,
Params: params,
db: db,
chain: chain,
closeCh: make(chan struct{}),
waitIdleCh: make(chan chan bool),
history: history,
noHistory: noHistory,
Params: params,
filterMapsRange: filterMapsRange{
initialized: rs.Initialized,
headLvPointer: rs.HeadLvPointer,

@ -17,7 +17,9 @@ const (
removedPointer = math.MaxUint64 // used in updateBatch to signal removed items
revertPointFrequency = 256 // frequency of revert points in database
cachedRevertPoints = 64 // revert points for most recent blocks in memory
)
const (
testHookInit = iota
testHookUpdateHeadEpoch
testHookUpdateHead
@ -52,8 +54,8 @@ func (f *FilterMaps) updateLoop() {
}
var (
headEventCh = make(chan core.ChainHeadEvent, 10)
sub = f.chain.SubscribeChainHeadEvent(headEventCh)
headEventCh = make(chan core.ChainEvent, 10)
sub = f.chain.SubscribeChainEvent(headEventCh)
head = f.chain.CurrentBlock()
stop bool
syncMatcher *FilterMapsMatcherBackend
@ -61,7 +63,7 @@ func (f *FilterMaps) updateLoop() {
)
matcherSync := func() {
if syncMatcher != nil && fmr.headBlockHash == head.Hash() {
if syncMatcher != nil && fmr.initialized && fmr.headBlockHash == head.Hash() {
syncMatcher.synced(head)
syncMatcher = nil
}
@ -79,19 +81,32 @@ func (f *FilterMaps) updateLoop() {
}
delay := time.Second * 20
if f.testHook != nil {
f.testHook(testHookWait)
delay = 0
}
select {
case ev := <-headEventCh:
head = ev.Block.Header()
case syncMatcher = <-f.matcherSyncCh:
head = f.chain.CurrentBlock()
case <-f.closeCh:
stop = true
case <-time.After(delay):
// keep updating log index during syncing
head = f.chain.CurrentBlock()
loop:
for {
select {
case ev := <-headEventCh:
head = ev.Block.Header()
case syncMatcher = <-f.matcherSyncCh:
head = f.chain.CurrentBlock()
case <-f.closeCh:
stop = true
case ch := <-f.waitIdleCh:
head = f.chain.CurrentBlock()
if head.Hash() == f.getRange().headBlockHash {
ch <- true
continue loop
}
ch <- false
case <-time.After(delay):
// keep updating log index during syncing
head = f.chain.CurrentBlock()
if f.testHook != nil {
f.testHook(testHookWait)
}
}
break
}
}
for head == nil {
@ -150,6 +165,18 @@ func (f *FilterMaps) updateLoop() {
}
}
// WaitIdle blocks until the indexer is in an idle state while synced up to the
// latest chain head.
func (f *FilterMaps) WaitIdle() {
for {
ch := make(chan bool)
f.waitIdleCh <- ch
if <-ch {
return
}
}
}
// getRange returns the current filterMapsRange.
func (f *FilterMaps) getRange() filterMapsRange {
f.lock.RLock()
@ -804,7 +831,7 @@ func (f *FilterMaps) revertTo(rp *revertPoint) error {
batch := f.db.NewBatch()
afterLastMap := uint32((f.headLvPointer + f.valuesPerMap - 1) >> f.logValuesPerMap)
if rp.mapIndex >= afterLastMap {
if rp.mapIndex > afterLastMap {
return errors.New("cannot revert (head map behind revert point)")
}
lvPointer := uint64(rp.mapIndex) << f.logValuesPerMap

@ -1,6 +1,8 @@
package filtermaps
import (
"crypto/sha256"
"fmt"
"math/big"
"math/rand"
"sync"
@ -59,6 +61,25 @@ func TestIndexerRandomSetHistory(t *testing.T) {
ts.checkLvRange(50)
}
func TestIndexerDbEquality(t *testing.T) {
ts := newTestSetup(t)
ts.setHistory(0, false)
for i := 0; i < 10; i++ {
ts.chain.addBlocks(100, 10, 3, 4, true)
ts.runUntilWait()
}
hash1 := ts.fmDbHash()
fmt.Println(hash1)
ts.setHistory(500, false)
ts.runUntilWait()
hash2 := ts.fmDbHash()
fmt.Println(hash2)
ts.setHistory(0, false)
ts.runUntilWait()
hash3 := ts.fmDbHash()
fmt.Println(hash3)
}
type testSetup struct {
t *testing.T
fm *FilterMaps
@ -94,9 +115,14 @@ func (ts *testSetup) runUntil(stop func() bool) {
}
func (ts *testSetup) runUntilWait() {
ts.nextEvent()
for ts.lastEvent != testHookWait {
for {
ts.nextEvent()
for ts.lastEvent != testHookWait {
ts.nextEvent()
}
if ts.fm.getRange().headBlockHash == ts.chain.CurrentBlock().Hash() {
return
}
}
}
@ -146,6 +172,19 @@ func (ts *testSetup) stopFm() {
ts.fm.closeWg.Wait()
}
func (ts *testSetup) fmDbHash() common.Hash {
hasher := sha256.New()
it := ts.db.NewIterator(nil, nil)
for it.Next() {
hasher.Write(it.Key())
hasher.Write(it.Value())
}
it.Release()
var result common.Hash
hasher.Sum(result[:0])
return result
}
func (ts *testSetup) close() {
ts.stopFm()
ts.db.Close()
@ -178,7 +217,7 @@ func (tc *testChain) CurrentBlock() *types.Header {
return tc.blocks[tc.canonical[len(tc.canonical)-1]].Header()
}
func (tc *testChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
func (tc *testChain) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
return tc.chainHeadFeed.Subscribe(ch)
}
@ -281,5 +320,32 @@ func (tc *testChain) addBlocks(count, maxTxPerBlock, maxLogsPerReceipt, maxTopic
tc.receipts[hash] = types.Receipts{}
}
}
tc.chainHeadFeed.Send(core.ChainHeadEvent{Block: tc.blocks[tc.canonical[len(tc.canonical)-1]]})
tc.chainHeadFeed.Send(core.ChainEvent{Block: tc.blocks[tc.canonical[len(tc.canonical)-1]]})
}
func (tc *testChain) setHead(headNum int) {
tc.lock.Lock()
defer tc.lock.Unlock()
tc.canonical = tc.canonical[:headNum+1]
tc.chainHeadFeed.Send(core.ChainEvent{Block: tc.blocks[tc.canonical[len(tc.canonical)-1]]})
}
func (tc *testChain) getCanonicalChain() []common.Hash {
tc.lock.RLock()
defer tc.lock.RUnlock()
cc := make([]common.Hash, len(tc.canonical))
copy(cc, tc.canonical)
return cc
}
// restore an earlier state of the chain
func (tc *testChain) setCanonicalChain(cc []common.Hash) {
tc.lock.Lock()
defer tc.lock.Unlock()
tc.canonical = make([]common.Hash, len(cc))
copy(tc.canonical, cc)
tc.chainHeadFeed.Send(core.ChainEvent{Block: tc.blocks[tc.canonical[len(tc.canonical)-1]]})
}

@ -118,6 +118,13 @@ func (fm *FilterMapsMatcherBackend) synced(head *types.Header) {
// range that has not been changed and has been consistent with all states of the
// chain since the previous SyncLogIndex or the creation of the matcher backend.
func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange, error) {
if fm.f.noHistory {
head := fm.f.chain.CurrentBlock()
if head == nil {
return SyncRange{}, errors.New("canonical chain head not available")
}
return SyncRange{Head: head}, nil
}
// add SyncRange return channel, ensuring that
syncCh := make(chan SyncRange, 1)
fm.f.lock.Lock()

@ -17,7 +17,6 @@
package rawdb
import (
"bytes"
"math/big"
"testing"
@ -25,7 +24,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/internal/blocktest"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
)

@ -375,6 +375,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
accountSnaps stat
storageSnaps stat
preimages stat
filterMaps stat
beaconHeaders stat
cliqueSnaps stat
@ -425,6 +426,8 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
codes.Add(size)
case bytes.HasPrefix(key, txLookupPrefix) && len(key) == (len(txLookupPrefix)+common.HashLength):
txLookups.Add(size)
case bytes.HasPrefix(key, FilterMapsPrefix):
filterMaps.Add(size)
case bytes.HasPrefix(key, SnapshotAccountPrefix) && len(key) == (len(SnapshotAccountPrefix)+common.HashLength):
accountSnaps.Add(size)
case bytes.HasPrefix(key, SnapshotStoragePrefix) && len(key) == (len(SnapshotStoragePrefix)+2*common.HashLength):
@ -499,6 +502,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
{"Key-Value store", "Block number->hash", numHashPairings.Size(), numHashPairings.Count()},
{"Key-Value store", "Block hash->number", hashNumPairings.Size(), hashNumPairings.Count()},
{"Key-Value store", "Transaction index", txLookups.Size(), txLookups.Count()},
{"Key-Value store", "Log search index", filterMaps.Size(), filterMaps.Count()},
{"Key-Value store", "Contract codes", codes.Size(), codes.Count()},
{"Key-Value store", "Hash trie nodes", legacyTries.Size(), legacyTries.Count()},
{"Key-Value store", "Path trie state lookups", stateLookups.Size(), stateLookups.Count()},

@ -37,9 +37,10 @@ type Filter struct {
addresses []common.Address
topics [][]common.Hash
block *common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks
bbMatchCount uint64
block *common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks
rangeLogsTestHook chan rangeLogsTestEvent
}
// NewRangeFilter creates a new filter which uses a bloom filter on blocks to
@ -131,10 +132,31 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
return f.rangeLogs(ctx, begin, end)
}
const (
rangeLogsTestSync = iota
rangeLogsTestTrimmed
rangeLogsTestIndexed
rangeLogsTestUnindexed
rangeLogsTestDone
)
type rangeLogsTestEvent struct {
event int
begin, end uint64
}
func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([]*types.Log, error) {
if f.rangeLogsTestHook != nil {
defer func() {
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestDone, 0, 0}
close(f.rangeLogsTestHook)
}()
}
if firstBlock > lastBlock {
return nil, errors.New("invalid search range")
return nil, nil
}
mb := f.sys.backend.NewMatcherBackend()
defer mb.Close()
@ -144,6 +166,21 @@ func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([
if err != nil {
return nil, err
}
if !syncRange.Indexed {
// fallback to completely unindexed search
headNum := syncRange.Head.Number.Uint64()
if firstBlock > headNum {
firstBlock = headNum
}
if lastBlock > headNum {
lastBlock = headNum
}
if f.rangeLogsTestHook != nil {
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, firstBlock, lastBlock}
}
return f.unindexedLogs(ctx, firstBlock, lastBlock)
}
headBlock := syncRange.Head.Number.Uint64() // Head is guaranteed != nil
// if haveMatches == true then matches correspond to the block number range
// between matchFirst and matchLast
@ -157,7 +194,7 @@ func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([
return
}
if trimLast < matchFirst || trimFirst > matchLast {
matches, haveMatches = nil, false
matches, haveMatches, matchFirst, matchLast = nil, false, 0, 0
return
}
if trimFirst > matchFirst {
@ -192,6 +229,9 @@ func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([
var trimTailIfNotValid uint64
if haveMatches && matchFirst > searchFirst {
// missing tail section; do unindexed search
if f.rangeLogsTestHook != nil {
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, searchFirst, matchFirst - 1}
}
tailMatches, err := f.unindexedLogs(ctx, searchFirst, matchFirst-1)
if err != nil {
return matches, err
@ -200,56 +240,67 @@ func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([
matchFirst = searchFirst
// unindexed results are not affected by valid tail; do not trim tail
trimTailIfNotValid = math.MaxUint64
}
// now if we have matches, they start at searchFirst
if haveMatches {
searchFirst = matchLast + 1
if !syncRange.Indexed || syncRange.FirstIndexed > searchFirst {
} else {
// if we have matches, they start at searchFirst
if haveMatches {
searchFirst = matchLast + 1
if !syncRange.Indexed || syncRange.FirstIndexed > searchFirst {
forceUnindexed = true
}
}
var newMatches []*types.Log
if !syncRange.Indexed || syncRange.FirstIndexed > searchLast || syncRange.LastIndexed < searchFirst {
forceUnindexed = true
}
}
var newMatches []*types.Log
if !syncRange.Indexed || syncRange.FirstIndexed > searchLast || syncRange.LastIndexed < searchFirst {
forceUnindexed = true
}
if !forceUnindexed {
if syncRange.FirstIndexed > searchFirst {
searchFirst = syncRange.FirstIndexed
if !forceUnindexed {
if syncRange.FirstIndexed > searchFirst {
searchFirst = syncRange.FirstIndexed
}
if syncRange.LastIndexed < searchLast {
searchLast = syncRange.LastIndexed
}
if f.rangeLogsTestHook != nil {
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestIndexed, searchFirst, searchLast}
}
newMatches, err = f.indexedLogs(ctx, mb, searchFirst, searchLast)
// trim tail if it affects the indexed search range
trimTailIfNotValid = searchFirst
if err == filtermaps.ErrMatchAll {
// "match all" filters are not supported by filtermaps; fall back
// to unindexed search which is the most efficient in this case
forceUnindexed = true
}
}
if syncRange.LastIndexed > searchLast {
searchLast = syncRange.LastIndexed
if forceUnindexed {
if f.rangeLogsTestHook != nil {
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, searchFirst, searchLast}
}
newMatches, err = f.unindexedLogs(ctx, searchFirst, searchLast)
// unindexed results are not affected by valid tail; do not trim tail
trimTailIfNotValid = math.MaxUint64
}
newMatches, err = f.indexedLogs(ctx, mb, searchFirst, searchLast)
// trim tail if it affects the indexed search range
trimTailIfNotValid = searchFirst
if err == filtermaps.ErrMatchAll {
// "match all" filters are not supported by filtermaps; fall back
// to unindexed search which is the most efficient in this case
forceUnindexed = true
if err != nil {
return matches, err
}
if !haveMatches {
matches = newMatches
haveMatches, matchFirst, matchLast = true, searchFirst, searchLast
} else {
matches = append(matches, newMatches...)
matchLast = searchLast
}
}
if forceUnindexed {
newMatches, err = f.unindexedLogs(ctx, searchFirst, searchLast)
// unindexed results are not affected by valid tail; do not trim tail
trimTailIfNotValid = math.MaxUint64
}
if err != nil {
return matches, err
}
if matches == nil {
matches = newMatches
haveMatches, matchFirst, matchLast = true, searchFirst, searchLast
} else {
matches = append(matches, newMatches...)
matchLast = searchLast
}
if f.rangeLogsTestHook != nil {
f.rangeLogsTestHook <- rangeLogsTestEvent{event: rangeLogsTestSync, begin: matchFirst, end: matchLast}
}
syncRange, err = mb.SyncLogIndex(ctx)
if err != nil {
return matches, err
}
headBlock = syncRange.Head.Number.Uint64() // Head is guaranteed != nil
if !syncRange.Valid {
matches, haveMatches = nil, false
matches, haveMatches, matchFirst, matchLast = nil, false, 0, 0
} else {
if syncRange.FirstValid > trimTailIfNotValid {
trimMatches(syncRange.FirstValid, syncRange.LastValid)
@ -257,37 +308,42 @@ func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([
trimMatches(0, syncRange.LastValid)
}
}
if f.rangeLogsTestHook != nil {
f.rangeLogsTestHook <- rangeLogsTestEvent{event: rangeLogsTestTrimmed, begin: matchFirst, end: matchLast}
}
}
}
func (f *Filter) indexedLogs(ctx context.Context, mb filtermaps.MatcherBackend, begin, end uint64) ([]*types.Log, error) {
logs, err := filtermaps.GetPotentialMatches(ctx, mb, begin, end, f.addresses, f.topics)
logs = filterLogs(logs, nil, nil, f.addresses, f.topics)
return logs, err
potentialMatches, err := filtermaps.GetPotentialMatches(ctx, mb, begin, end, f.addresses, f.topics)
matches := filterLogs(potentialMatches, nil, nil, f.addresses, f.topics)
log.Trace("Performed indexed log search", "begin", begin, "end", end, "true matches", len(matches), "false positives", len(potentialMatches)-len(matches))
return matches, err
}
// unindexedLogs returns the logs matching the filter criteria based on raw block
// iteration and bloom matching.
func (f *Filter) unindexedLogs(ctx context.Context, begin, end uint64) ([]*types.Log, error) {
log.Warn("Performing unindexed log search", "begin", begin, "end", end)
var logs []*types.Log
var matches []*types.Log
for blockNumber := begin; blockNumber <= end; blockNumber++ {
select {
case <-ctx.Done():
return logs, ctx.Err()
return matches, ctx.Err()
default:
}
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(blockNumber))
if header == nil || err != nil {
return logs, err
return matches, err
}
found, err := f.blockLogs(ctx, header)
if err != nil {
return logs, err
return matches, err
}
logs = append(logs, found...)
matches = append(matches, found...)
}
return logs, nil
log.Trace("Performed unindexed log search", "begin", begin, "end", end, "matches", len(matches))
return matches, nil
}
// blockLogs returns the logs matching the filter criteria within a single block.

@ -40,6 +40,7 @@ import (
type testBackend struct {
db ethdb.Database
fm *filtermaps.FilterMaps
sections uint64
txFeed event.Feed
logsFeed event.Feed
@ -58,10 +59,28 @@ func (b *testBackend) CurrentHeader() *types.Header {
return hdr
}
func (b *testBackend) CurrentBlock() *types.Header {
return b.CurrentHeader()
}
func (b *testBackend) ChainDb() ethdb.Database {
return b.db
}
func (b *testBackend) GetCanonicalHash(number uint64) common.Hash {
return rawdb.ReadCanonicalHash(b.db, number)
}
func (b *testBackend) GetHeader(hash common.Hash, number uint64) *types.Header {
hdr, _ := b.HeaderByHash(context.Background(), hash)
return hdr
}
func (b *testBackend) GetReceiptsByHash(hash common.Hash) types.Receipts {
r, _ := b.GetReceipts(context.Background(), hash)
return r
}
func (b *testBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) {
var (
hash common.Hash
@ -137,9 +156,20 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
}
func (b *testBackend) NewMatcherBackend() filtermaps.MatcherBackend {
fm := filtermaps.NewFilterMaps(b.db, b, filtermaps.DefaultParams, 0, false)
fm.Start()
return fm.NewMatcherBackend()
return b.fm.NewMatcherBackend()
}
func (b *testBackend) startFilterMaps(history uint64, noHistory bool) {
b.fm = filtermaps.NewFilterMaps(b.db, b, filtermaps.DefaultParams, history, noHistory)
b.fm.Start()
if !noHistory {
b.fm.WaitIdle()
}
}
func (b *testBackend) stopFilterMaps() {
b.fm.Stop()
b.fm = nil
}
func (b *testBackend) setPending(block *types.Block, receipts types.Receipts) {

@ -46,15 +46,27 @@ func makeReceipt(addr common.Address) *types.Receipt {
return receipt
}
func BenchmarkFilters(b *testing.B) {
func BenchmarkFiltersIndexed(b *testing.B) {
benchmarkFilters(b, 0, false)
}
func BenchmarkFiltersHalfIndexed(b *testing.B) {
benchmarkFilters(b, 50000, false)
}
func BenchmarkFiltersUnindexed(b *testing.B) {
benchmarkFilters(b, 0, true)
}
func benchmarkFilters(b *testing.B, history uint64, noHistory bool) {
var (
db = rawdb.NewMemoryDatabase()
_, sys = newTestFilterSystem(b, db, Config{})
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = common.BytesToAddress([]byte("jeff"))
addr3 = common.BytesToAddress([]byte("ethereum"))
addr4 = common.BytesToAddress([]byte("random addresses please"))
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(b, db, Config{})
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = common.BytesToAddress([]byte("jeff"))
addr3 = common.BytesToAddress([]byte("ethereum"))
addr4 = common.BytesToAddress([]byte("random addresses please"))
gspec = &core.Genesis{
Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(1000000)}},
@ -94,9 +106,12 @@ func BenchmarkFilters(b *testing.B) {
rawdb.WriteHeadBlockHash(db, block.Hash())
rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i])
}
backend.startFilterMaps(history, noHistory)
defer backend.stopFilterMaps()
b.ResetTimer()
filter := sys.NewRangeFilter(0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil)
filter := sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), []common.Address{addr1, addr2, addr3, addr4}, nil)
for i := 0; i < b.N; i++ {
filter.begin = 0
@ -107,7 +122,19 @@ func BenchmarkFilters(b *testing.B) {
}
}
func TestFilters(t *testing.T) {
func TestFiltersIndexed(t *testing.T) {
testFilters(t, 0, false)
}
func TestFiltersHalfIndexed(t *testing.T) {
testFilters(t, 500, false)
}
func TestFiltersUnindexed(t *testing.T) {
testFilters(t, 0, true)
}
func testFilters(t *testing.T, history uint64, noHistory bool) {
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
@ -279,6 +306,9 @@ func TestFilters(t *testing.T) {
})
backend.setPending(pchain[0], preceipts[0])
backend.startFilterMaps(history, noHistory)
defer backend.stopFilterMaps()
for i, tc := range []struct {
f *Filter
want string
@ -387,3 +417,137 @@ func TestFilters(t *testing.T) {
}
})
}
func TestRangeLogs(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
gspec = &core.Genesis{
Config: params.TestChainConfig,
Alloc: types.GenesisAlloc{},
BaseFee: big.NewInt(params.InitialBaseFee),
}
)
_, err := gspec.Commit(db, triedb.NewDatabase(db, nil))
if err != nil {
t.Fatal(err)
}
chain, _ := core.GenerateChain(gspec.Config, gspec.ToBlock(), ethash.NewFaker(), db, 1000, func(i int, gen *core.BlockGen) {})
var l uint64
bc, err := core.NewBlockChain(db, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, &l)
if err != nil {
t.Fatal(err)
}
_, err = bc.InsertChain(chain[:600])
if err != nil {
t.Fatal(err)
}
backend.startFilterMaps(200, false)
defer backend.stopFilterMaps()
var (
testCase, event int
filter *Filter
addresses = []common.Address{common.Address{}}
)
newFilter := func(begin, end int64) {
testCase++
event = 0
filter = sys.NewRangeFilter(begin, end, addresses, nil)
filter.rangeLogsTestHook = make(chan rangeLogsTestEvent)
go func(filter *Filter) {
filter.Logs(context.Background())
// ensure that filter will not be blocked if we exit early
for _ = range filter.rangeLogsTestHook {
}
}(filter)
}
expEvent := func(exp rangeLogsTestEvent) {
event++
ev := <-filter.rangeLogsTestHook
if ev != exp {
t.Fatalf("Test case #%d: wrong test event #%d received (got %v, expected %v)", testCase, event, ev, exp)
}
}
// test case #1
newFilter(300, 500)
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 401, 500})
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 401, 500})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 401, 500})
expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 300, 400})
if _, err := bc.InsertChain(chain[600:700]); err != nil {
t.Fatal(err)
}
backend.fm.WaitIdle()
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 300, 500})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 300, 500}) // unindexed search is not affected by trimmed tail
expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0})
// test case #2
newFilter(400, int64(rpc.LatestBlockNumber))
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 501, 700})
if _, err := bc.InsertChain(chain[700:800]); err != nil {
t.Fatal(err)
}
backend.fm.WaitIdle()
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 501, 700})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 601, 700})
expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 600})
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 700})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 700})
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 701, 800})
if err := bc.SetHead(750); err != nil {
t.Fatal(err)
}
backend.fm.WaitIdle()
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 800})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0})
// test case #3
newFilter(int64(rpc.LatestBlockNumber), int64(rpc.LatestBlockNumber))
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 750, 750})
if err := bc.SetHead(740); err != nil {
t.Fatal(err)
}
backend.fm.WaitIdle()
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 750, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 0, 0})
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 740, 740})
if _, err := bc.InsertChain(chain[740:750]); err != nil {
t.Fatal(err)
}
backend.fm.WaitIdle()
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 740, 740})
// trimmed at the beginning of the next iteration
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 740, 740})
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 750, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 750, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 750, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0})
// test case #4
newFilter(400, int64(rpc.LatestBlockNumber))
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 551, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 551, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 551, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 550})
if _, err := bc.InsertChain(chain[750:1000]); err != nil {
t.Fatal(err)
}
backend.fm.WaitIdle()
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 750})
// indexed range affected by tail pruning so we have to discard the entire
// match set
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 0, 0})
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 801, 1000})
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 801, 1000})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 801, 1000})
expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 800})
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 1000})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 1000})
}

@ -43,6 +43,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/beacon"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
@ -619,6 +620,9 @@ func (b testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent)
func (b testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
panic("implement me")
}
func (b testBackend) NewMatcherBackend() filtermaps.MatcherBackend {
panic("implement me")
}
func TestEstimateGas(t *testing.T) {
t.Parallel()
// Initialize test accounts

@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
@ -399,3 +400,5 @@ func (b *backendMock) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent)
}
func (b *backendMock) Engine() consensus.Engine { return nil }
func (b *backendMock) NewMatcherBackend() filtermaps.MatcherBackend { return nil }

Loading…
Cancel
Save