core/filtermaps: added more tests

pull/30370/head
Zsolt Felfoldi 2 months ago
parent fb57e6316b
commit 3ce3b80bb3
  1. 1
      core/filtermaps/filtermaps.go
  2. 63
      core/filtermaps/indexer.go
  3. 263
      core/filtermaps/indexer_test.go
  4. 86
      core/filtermaps/matcher_test.go
  5. 4
      core/rawdb/schema.go
  6. 4
      eth/filters/filter_system_test.go

@ -72,7 +72,6 @@ type FilterMaps struct {
revertPoints map[uint64]*revertPoint revertPoints map[uint64]*revertPoint
waitIdleCh chan chan bool waitIdleCh chan chan bool
testHook func(int)
} }
// filterMap is a full or partial in-memory representation of a filter map where // filterMap is a full or partial in-memory representation of a filter map where

@ -35,28 +35,10 @@ const (
cachedRevertPoints = 64 // revert points for most recent blocks in memory cachedRevertPoints = 64 // revert points for most recent blocks in memory
) )
const (
testHookInit = iota
testHookUpdateHeadEpoch
testHookUpdateHead
testHookExtendTailEpoch
testHookExtendTail
testHookPruneTail
testHookPruneTailMaps
testHookRevert
testHookWait
testHookStop
)
// updateLoop initializes and updates the log index structure according to the // updateLoop initializes and updates the log index structure according to the
// canonical chain. // canonical chain.
func (f *FilterMaps) updateLoop() { func (f *FilterMaps) updateLoop() {
defer func() { defer f.closeWg.Done()
f.closeWg.Done()
if f.testHook != nil {
f.testHook(testHookStop)
}
}()
if f.noHistory { if f.noHistory {
f.reset() f.reset()
@ -95,10 +77,6 @@ func (f *FilterMaps) updateLoop() {
if stop { if stop {
return return
} }
delay := time.Second * 20
if f.testHook != nil {
delay = 0
}
loop: loop:
for { for {
select { select {
@ -115,12 +93,9 @@ func (f *FilterMaps) updateLoop() {
continue loop continue loop
} }
ch <- false ch <- false
case <-time.After(delay): case <-time.After(time.Second * 20):
// keep updating log index during syncing // keep updating log index during syncing
head = f.chain.CurrentBlock() head = f.chain.CurrentBlock()
if f.testHook != nil {
f.testHook(testHookWait)
}
} }
break break
} }
@ -184,6 +159,10 @@ func (f *FilterMaps) updateLoop() {
// WaitIdle blocks until the indexer is in an idle state while synced up to the // WaitIdle blocks until the indexer is in an idle state while synced up to the
// latest chain head. // latest chain head.
func (f *FilterMaps) WaitIdle() { func (f *FilterMaps) WaitIdle() {
if f.noHistory {
f.closeWg.Wait()
return
}
for { for {
ch := make(chan bool) ch := make(chan bool)
f.waitIdleCh <- ch f.waitIdleCh <- ch
@ -219,9 +198,6 @@ func (f *FilterMaps) tryInit(head *types.Header) bool {
log.Error("Could not initialize log index", "error", err) log.Error("Could not initialize log index", "error", err)
} }
f.applyUpdateBatch(update) f.applyUpdateBatch(update)
if f.testHook != nil {
f.testHook(testHookInit)
}
return true return true
} }
@ -295,16 +271,10 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
if update.updatedRangeLength() >= f.mapsPerEpoch { if update.updatedRangeLength() >= f.mapsPerEpoch {
// limit the amount of data updated in a single batch // limit the amount of data updated in a single batch
f.applyUpdateBatch(update) f.applyUpdateBatch(update)
if f.testHook != nil {
f.testHook(testHookUpdateHeadEpoch)
}
update = f.newUpdateBatch() update = f.newUpdateBatch()
} }
} }
f.applyUpdateBatch(update) f.applyUpdateBatch(update)
if f.testHook != nil {
f.testHook(testHookUpdateHead)
}
return true return true
} }
@ -342,9 +312,6 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
if tailEpoch := update.tailEpoch(); tailEpoch < lastTailEpoch { if tailEpoch := update.tailEpoch(); tailEpoch < lastTailEpoch {
// limit the amount of data updated in a single batch // limit the amount of data updated in a single batch
f.applyUpdateBatch(update) f.applyUpdateBatch(update)
if f.testHook != nil {
f.testHook(testHookExtendTailEpoch)
}
update = f.newUpdateBatch() update = f.newUpdateBatch()
lastTailEpoch = tailEpoch lastTailEpoch = tailEpoch
} }
@ -365,9 +332,6 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
number, parentHash = newTail.Number.Uint64(), newTail.ParentHash number, parentHash = newTail.Number.Uint64(), newTail.ParentHash
} }
f.applyUpdateBatch(update) f.applyUpdateBatch(update)
if f.testHook != nil {
f.testHook(testHookExtendTail)
}
return number <= tailTarget return number <= tailTarget
} }
@ -406,9 +370,6 @@ func (f *FilterMaps) pruneTailPtr(tailTarget uint64) {
fmr.tailBlockNumber, fmr.tailParentHash = tailTarget, tailParentHash fmr.tailBlockNumber, fmr.tailParentHash = tailTarget, tailParentHash
fmr.tailBlockLvPointer = targetLvPointer fmr.tailBlockLvPointer = targetLvPointer
f.setRange(f.db, fmr) f.setRange(f.db, fmr)
if f.testHook != nil {
f.testHook(testHookPruneTail)
}
} }
// tryPruneTailMaps removes unused filter maps and corresponding log index // tryPruneTailMaps removes unused filter maps and corresponding log index
@ -461,6 +422,9 @@ func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) {
batch := f.db.NewBatch() batch := f.db.NewBatch()
for *removeLvPtr < nextBlockNumber { for *removeLvPtr < nextBlockNumber {
f.deleteBlockLvPointer(batch, *removeLvPtr) f.deleteBlockLvPointer(batch, *removeLvPtr)
if (*removeLvPtr)%revertPointFrequency == 0 {
rawdb.DeleteRevertPoint(batch, *removeLvPtr)
}
(*removeLvPtr)++ (*removeLvPtr)++
} }
for mapIndex := first; mapIndex < afterLast; mapIndex++ { for mapIndex := first; mapIndex < afterLast; mapIndex++ {
@ -481,9 +445,6 @@ func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) {
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
log.Crit("Could not write update batch", "error", err) log.Crit("Could not write update batch", "error", err)
} }
if f.testHook != nil {
f.testHook(testHookPruneTailMaps)
}
} }
// updateBatch is a memory overlay collecting changes to the index log structure // updateBatch is a memory overlay collecting changes to the index log structure
@ -873,6 +834,9 @@ func (f *FilterMaps) revertTo(rp *revertPoint) error {
} }
for blockNumber := rp.blockNumber + 1; blockNumber <= f.headBlockNumber; blockNumber++ { for blockNumber := rp.blockNumber + 1; blockNumber <= f.headBlockNumber; blockNumber++ {
f.deleteBlockLvPointer(batch, blockNumber) f.deleteBlockLvPointer(batch, blockNumber)
if blockNumber%revertPointFrequency == 0 {
rawdb.DeleteRevertPoint(batch, blockNumber)
}
} }
newRange := f.filterMapsRange newRange := f.filterMapsRange
newRange.headLvPointer = lvPointer newRange.headLvPointer = lvPointer
@ -882,8 +846,5 @@ func (f *FilterMaps) revertTo(rp *revertPoint) error {
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
log.Crit("Could not write update batch", "error", err) log.Crit("Could not write update batch", "error", err)
} }
if f.testHook != nil {
f.testHook(testHookRevert)
}
return nil return nil
} }

@ -18,7 +18,6 @@ package filtermaps
import ( import (
"crypto/sha256" "crypto/sha256"
"fmt"
"math/big" "math/big"
"math/rand" "math/rand"
"sync" "sync"
@ -40,72 +39,156 @@ var testParams = Params{
logValuesPerMap: 4, logValuesPerMap: 4,
} }
func TestIndexerSetHistory(t *testing.T) { func TestIndexerRandomRange(t *testing.T) {
ts := newTestSetup(t) ts := newTestSetup(t)
ts.setHistory(0, false) defer ts.close()
forks := make([][]common.Hash, 10)
ts.chain.addBlocks(1000, 5, 2, 4, false) // 50 log values per block ts.chain.addBlocks(1000, 5, 2, 4, false) // 50 log values per block
ts.runUntilWait() for i := range forks {
ts.checkLvRange(50) if i != 0 {
ts.setHistory(100, false) forkBlock := rand.Intn(1000)
ts.runUntil(func() bool { ts.chain.setHead(forkBlock)
l := ts.lastRange.headLvPointer - ts.lastRange.tailLvPointer ts.chain.addBlocks(1000-forkBlock, 5, 2, 4, false) // 50 log values per block
return l > 44000 && l < 45000 }
}) forks[i] = ts.chain.getCanonicalChain()
ts.setHistory(200, false) }
ts.runUntilWait()
ts.checkLvRange(50)
ts.setHistory(0, false) ts.setHistory(0, false)
ts.runUntilWait() var (
ts.checkLvRange(50) history int
} noHistory bool
fork, head = len(forks) - 1, 1000
func TestIndexerRandomSetHistory(t *testing.T) { )
ts := newTestSetup(t) ts.fm.WaitIdle()
ts.chain.addBlocks(100, 5, 2, 4, false) // 50 log values per block for i := 0; i < 200; i++ {
for i := 0; i < 3000; i++ { switch rand.Intn(2) {
ts.setHistory(uint64(rand.Intn(1001)), false) case 0:
ts.nextEvent() // change history settings
for rand.Intn(20) != 0 && ts.lastEvent != testHookWait { switch rand.Intn(10) {
ts.nextEvent() case 0:
history, noHistory = 0, false
case 1:
history, noHistory = 0, true
default:
history, noHistory = rand.Intn(1000)+1, false
}
ts.setHistory(uint64(history), noHistory)
case 1:
// change head
fork, head = rand.Intn(len(forks)), rand.Intn(1001)
ts.chain.setCanonicalChain(forks[fork][:head+1])
}
ts.fm.WaitIdle()
fmr := ts.fm.getRange()
if noHistory {
if fmr.initialized {
t.Fatalf("filterMapsRange initialized while indexing is disabled")
}
continue
} }
if ts.lastEvent == testHookWait { if !fmr.initialized {
ts.checkLvRange(50) t.Fatalf("filterMapsRange not initialized while indexing is enabled")
}
var (
tail int
tpHash common.Hash
)
if history > 0 && history <= head {
tail = head + 1 - history
}
if tail > 0 {
tpHash = forks[fork][tail-1]
}
if fmr.headBlockNumber != uint64(head) || fmr.headBlockHash != forks[fork][head] {
ts.t.Fatalf("Invalid index head (expected #%d %v, got #%d %v)", head, forks[fork][head], fmr.headBlockNumber, fmr.headBlockHash)
}
if fmr.tailBlockNumber != uint64(tail) || fmr.tailParentHash != tpHash {
ts.t.Fatalf("Invalid index head (expected #%d %v, got #%d %v)", tail, tpHash, fmr.tailBlockNumber, fmr.tailParentHash)
}
expLvCount := uint64(head+1-tail) * 50
if tail == 0 {
expLvCount -= 50 // no logs in genesis block
}
if fmr.headLvPointer-fmr.tailBlockLvPointer != expLvCount {
ts.t.Fatalf("Invalid number of log values (expected %d, got %d)", expLvCount, fmr.headLvPointer-fmr.tailBlockLvPointer)
}
if fmr.tailBlockLvPointer-fmr.tailLvPointer >= ts.params.valuesPerMap {
ts.t.Fatalf("Invalid number of leftover tail log values (expected < %d, got %d)", ts.params.valuesPerMap, fmr.tailBlockLvPointer-fmr.tailLvPointer)
} }
} }
ts.setHistory(0, false)
ts.runUntilWait()
ts.checkLvRange(50)
} }
func TestIndexerDbEquality(t *testing.T) { func TestIndexerCompareDb(t *testing.T) {
ts := newTestSetup(t) ts := newTestSetup(t)
defer ts.close()
ts.setHistory(0, false) ts.setHistory(0, false)
for i := 0; i < 10; i++ { ts.chain.addBlocks(500, 10, 3, 4, true)
ts.chain.addBlocks(100, 10, 3, 4, true) ts.fm.WaitIdle()
ts.runUntilWait() // revert points are stored after block 500
} ts.chain.addBlocks(500, 10, 3, 4, true)
hash1 := ts.fmDbHash() ts.fm.WaitIdle()
fmt.Println(hash1) chain1 := ts.chain.getCanonicalChain()
ts.setHistory(500, false) ts.storeDbHash("chain 1 [0, 1000]")
ts.runUntilWait()
hash2 := ts.fmDbHash() ts.chain.setHead(600)
fmt.Println(hash2) ts.fm.WaitIdle()
ts.storeDbHash("chain 1/2 [0, 600]")
ts.chain.addBlocks(600, 10, 3, 4, true)
ts.fm.WaitIdle()
chain2 := ts.chain.getCanonicalChain()
ts.storeDbHash("chain 2 [0, 1200]")
ts.setHistory(800, false)
ts.fm.WaitIdle()
ts.storeDbHash("chain 2 [401, 1200]")
ts.chain.setHead(600)
ts.fm.WaitIdle()
ts.checkDbHash("chain 1/2 [0, 600]")
ts.chain.setCanonicalChain(chain1)
ts.fm.WaitIdle()
ts.storeDbHash("chain 1 [201, 1000]")
ts.setHistory(0, false) ts.setHistory(0, false)
ts.runUntilWait() ts.fm.WaitIdle()
hash3 := ts.fmDbHash() ts.checkDbHash("chain 1 [0, 1000]")
fmt.Println(hash3)
ts.setHistory(0, true)
ts.fm.WaitIdle()
ts.storeDbHash("no index")
ts.chain.setCanonicalChain(chain2[:501])
ts.setHistory(0, false)
ts.fm.WaitIdle()
ts.chain.setCanonicalChain(chain2)
ts.fm.WaitIdle()
ts.checkDbHash("chain 2 [0, 1200]")
ts.chain.setCanonicalChain(chain1)
ts.fm.WaitIdle()
ts.setHistory(800, false)
ts.fm.WaitIdle()
ts.checkDbHash("chain 1 [201, 1000]")
ts.chain.setCanonicalChain(chain2)
ts.fm.WaitIdle()
ts.checkDbHash("chain 2 [401, 1200]")
ts.setHistory(0, true)
ts.fm.WaitIdle()
ts.checkDbHash("no index")
} }
type testSetup struct { type testSetup struct {
t *testing.T t *testing.T
fm *FilterMaps fm *FilterMaps
db ethdb.Database db ethdb.Database
chain *testChain chain *testChain
params Params params Params
eventCh chan int dbHashes map[string]common.Hash
resumeCh chan struct{}
lastEvent int
lastRange filterMapsRange
} }
func newTestSetup(t *testing.T) *testSetup { func newTestSetup(t *testing.T) *testSetup {
@ -116,76 +199,32 @@ func newTestSetup(t *testing.T) *testSetup {
chain: newTestChain(), chain: newTestChain(),
db: rawdb.NewMemoryDatabase(), db: rawdb.NewMemoryDatabase(),
params: params, params: params,
eventCh: make(chan int), dbHashes: make(map[string]common.Hash),
resumeCh: make(chan struct{}),
}
}
func (ts *testSetup) runUntil(stop func() bool) {
for !stop() {
ts.nextEvent()
for ts.lastEvent == testHookWait {
ts.t.Fatalf("Indexer in waiting state before runUntil condition is met")
}
}
}
func (ts *testSetup) runUntilWait() {
for {
ts.nextEvent()
for ts.lastEvent != testHookWait {
ts.nextEvent()
}
if ts.fm.getRange().headBlockHash == ts.chain.CurrentBlock().Hash() {
return
}
}
}
func (ts *testSetup) checkLvRange(lvPerBlock uint64) {
expBlockCount := uint64(len(ts.chain.canonical) - 1)
if ts.fm.history != 0 && ts.fm.history < expBlockCount {
expBlockCount = ts.fm.history
}
if ts.lastRange.headLvPointer-ts.lastRange.tailBlockLvPointer != expBlockCount*lvPerBlock {
ts.t.Fatalf("Invalid number of log values (expected %d, got %d)", expBlockCount*lvPerBlock, ts.lastRange.headLvPointer-ts.lastRange.tailLvPointer)
}
if ts.lastRange.tailBlockLvPointer-ts.lastRange.tailLvPointer >= ts.params.valuesPerMap {
ts.t.Fatalf("Invalid number of leftover tail log values (expected < %d, got %d)", ts.params.valuesPerMap, ts.lastRange.tailBlockLvPointer-ts.lastRange.tailLvPointer)
} }
} }
func (ts *testSetup) setHistory(history uint64, noHistory bool) { func (ts *testSetup) setHistory(history uint64, noHistory bool) {
if ts.fm != nil { if ts.fm != nil {
ts.stopFm() ts.fm.Stop()
} }
ts.fm = NewFilterMaps(ts.db, ts.chain, ts.params, history, noHistory) ts.fm = NewFilterMaps(ts.db, ts.chain, ts.params, history, noHistory)
ts.fm.testHook = ts.testHook
ts.fm.Start() ts.fm.Start()
ts.lastEvent = <-ts.eventCh
}
func (ts *testSetup) testHook(event int) {
ts.eventCh <- event
<-ts.resumeCh
} }
func (ts *testSetup) nextEvent() { func (ts *testSetup) storeDbHash(id string) {
ts.resumeCh <- struct{}{} dbHash := ts.fmDbHash()
ts.lastEvent = <-ts.eventCh for otherId, otherHash := range ts.dbHashes {
ts.lastRange = ts.fm.getRange() if otherHash == dbHash {
ts.t.Fatalf("Unexpected equal database hashes `%s` and `%s`", id, otherId)
}
}
ts.dbHashes[id] = dbHash
} }
func (ts *testSetup) stopFm() { func (ts *testSetup) checkDbHash(id string) {
close(ts.fm.closeCh) if ts.fmDbHash() != ts.dbHashes[id] {
for { ts.t.Fatalf("Database `%s` hash mismatch", id)
ts.nextEvent()
if ts.lastEvent == testHookStop {
break
}
} }
ts.resumeCh <- struct{}{}
ts.fm.closeWg.Wait()
} }
func (ts *testSetup) fmDbHash() common.Hash { func (ts *testSetup) fmDbHash() common.Hash {
@ -202,7 +241,9 @@ func (ts *testSetup) fmDbHash() common.Hash {
} }
func (ts *testSetup) close() { func (ts *testSetup) close() {
ts.stopFm() if ts.fm != nil {
ts.fm.Stop()
}
ts.db.Close() ts.db.Close()
ts.chain.db.Close() ts.chain.db.Close()
} }

@ -0,0 +1,86 @@
// Copyright 2024 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package filtermaps
import (
"context"
"math/rand"
"testing"
"github.com/ethereum/go-ethereum/common"
)
func TestMatcher(t *testing.T) {
ts := newTestSetup(t)
defer ts.close()
ts.chain.addBlocks(1000, 10, 10, 4, true)
ts.setHistory(0, false)
ts.fm.WaitIdle()
for i := 0; i < 500; i++ {
bhash := ts.chain.canonical[rand.Intn(len(ts.chain.canonical))]
receipts := ts.chain.receipts[bhash]
if len(receipts) == 0 {
continue
}
receipt := receipts[rand.Intn(len(receipts))]
if len(receipt.Logs) == 0 {
continue
}
log := receipt.Logs[rand.Intn(len(receipt.Logs))]
var ok bool
addresses := make([]common.Address, rand.Intn(3))
for i := range addresses {
rand.Read(addresses[i][:])
}
if len(addresses) > 0 {
addresses[rand.Intn(len(addresses))] = log.Address
ok = true
}
topics := make([][]common.Hash, rand.Intn(len(log.Topics)+1))
for j := range topics {
topics[j] = make([]common.Hash, rand.Intn(3))
for i := range topics[j] {
rand.Read(topics[j][i][:])
}
if len(topics[j]) > 0 {
topics[j][rand.Intn(len(topics[j]))] = log.Topics[j]
ok = true
}
}
if !ok {
continue // cannot search for match-all pattern
}
mb := ts.fm.NewMatcherBackend()
logs, err := GetPotentialMatches(context.Background(), mb, 0, 1000, addresses, topics)
mb.Close()
if err != nil {
t.Fatalf("Log search error: %v", err)
}
var found bool
for _, l := range logs {
if l == log {
found = true
break
}
}
if !found {
t.Fatalf("Log search did not return expected log (addresses: %v, topics: %v, expected log: %v)", addresses, topics, *log)
}
}
}

@ -347,14 +347,14 @@ func IsStorageTrieNode(key []byte) bool {
// filterMapRowKey = filterMapRowPrefix + mapRowIndex (uint64 big endian) // filterMapRowKey = filterMapRowPrefix + mapRowIndex (uint64 big endian)
func filterMapRowKey(mapRowIndex uint64) []byte { func filterMapRowKey(mapRowIndex uint64) []byte {
key := append(filterMapRowPrefix, make([]byte, 8)...) key := append(filterMapRowPrefix, make([]byte, 8)...)
binary.BigEndian.PutUint64(key[1:], mapRowIndex) binary.BigEndian.PutUint64(key[len(filterMapRowPrefix):], mapRowIndex)
return key return key
} }
// filterMapBlockPtrKey = filterMapBlockPtrPrefix + mapIndex (uint32 big endian) // filterMapBlockPtrKey = filterMapBlockPtrPrefix + mapIndex (uint32 big endian)
func filterMapBlockPtrKey(mapIndex uint32) []byte { func filterMapBlockPtrKey(mapIndex uint32) []byte {
key := append(filterMapBlockPtrPrefix, make([]byte, 4)...) key := append(filterMapBlockPtrPrefix, make([]byte, 4)...)
binary.BigEndian.PutUint32(key[1:], mapIndex) binary.BigEndian.PutUint32(key[len(filterMapBlockPtrPrefix):], mapIndex)
return key return key
} }

@ -162,9 +162,7 @@ func (b *testBackend) NewMatcherBackend() filtermaps.MatcherBackend {
func (b *testBackend) startFilterMaps(history uint64, noHistory bool) { func (b *testBackend) startFilterMaps(history uint64, noHistory bool) {
b.fm = filtermaps.NewFilterMaps(b.db, b, filtermaps.DefaultParams, history, noHistory) b.fm = filtermaps.NewFilterMaps(b.db, b, filtermaps.DefaultParams, history, noHistory)
b.fm.Start() b.fm.Start()
if !noHistory { b.fm.WaitIdle()
b.fm.WaitIdle()
}
} }
func (b *testBackend) stopFilterMaps() { func (b *testBackend) stopFilterMaps() {

Loading…
Cancel
Save