core/filtermaps: add indexer test

pull/30370/head
Zsolt Felfoldi 2 months ago
parent d11db22a96
commit f8e98ae974
  1. 21
      core/filtermaps/filtermaps.go
  2. 50
      core/filtermaps/indexer.go
  3. 280
      core/filtermaps/indexer_test.go
  4. 5
      eth/backend.go

@ -23,6 +23,7 @@ type blockchain interface {
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
GetHeader(hash common.Hash, number uint64) *types.Header
GetCanonicalHash(number uint64) common.Hash
GetReceiptsByHash(hash common.Hash) types.Receipts
}
// FilterMaps is the in-memory representation of the log index structure that is
@ -33,7 +34,7 @@ type blockchain interface {
// https://eips.ethereum.org/EIPS/eip-7745
type FilterMaps struct {
lock sync.RWMutex
db ethdb.Database
db ethdb.KeyValueStore
closeCh chan struct{}
closeWg sync.WaitGroup
history uint64
@ -53,6 +54,8 @@ type FilterMaps struct {
blockPtrCache *lru.Cache[uint32, uint64]
lvPointerCache *lru.Cache[uint64, uint64]
revertPoints map[uint64]*revertPoint
testHook func(int)
}
// filterMap is a full or partial in-memory representation of a filter map where
@ -94,7 +97,7 @@ type filterMapsRange struct {
// NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep
// the structure in sync with the given blockchain.
func NewFilterMaps(db ethdb.Database, chain blockchain, params Params, history uint64, noHistory bool) *FilterMaps {
func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, history uint64, noHistory bool) *FilterMaps {
rs, err := rawdb.ReadFilterMapsRange(db)
if err != nil {
log.Error("Error reading log index range", "error", err)
@ -128,14 +131,17 @@ func NewFilterMaps(db ethdb.Database, chain blockchain, params Params, history u
log.Error("Error fetching tail block pointer, resetting log index", "error", err)
fm.filterMapsRange = filterMapsRange{} // updateLoop resets the database
}
fm.closeWg.Add(2)
go fm.removeBloomBits()
go fm.updateLoop()
return fm
}
func (f *FilterMaps) Start() {
f.closeWg.Add(2)
go f.removeBloomBits()
go f.updateLoop()
}
// Close ensures that the indexer is fully stopped before returning.
func (f *FilterMaps) Close() {
func (f *FilterMaps) Stop() {
close(f.closeCh)
f.closeWg.Wait()
}
@ -297,8 +303,7 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
}
}
// get block receipts
hash := f.chain.GetCanonicalHash(firstBlockNumber)
receipts := rawdb.ReadRawReceipts(f.db, hash, firstBlockNumber) //TODO small cache
receipts := f.chain.GetReceiptsByHash(f.chain.GetCanonicalHash(firstBlockNumber))
if receipts == nil {
return nil, errors.New("receipts not found")
}

@ -22,7 +22,12 @@ const (
// updateLoop initializes and updates the log index structure according to the
// canonical chain.
func (f *FilterMaps) updateLoop() {
defer f.closeWg.Done()
defer func() {
f.closeWg.Done()
if f.testHook != nil {
f.testHook(testHookStop)
}
}()
if f.noHistory {
f.reset()
@ -38,7 +43,7 @@ func (f *FilterMaps) updateLoop() {
var (
headEventCh = make(chan core.ChainHeadEvent, 10)
sub = f.chain.SubscribeChainHeadEvent(headEventCh)
head *types.Header
head = f.chain.CurrentBlock()
stop bool
syncMatcher *FilterMapsMatcherBackend
)
@ -59,16 +64,21 @@ func (f *FilterMaps) updateLoop() {
if stop {
return
}
delay := time.Second * 20
if f.testHook != nil {
f.testHook(testHookWait)
delay = 0
}
select {
case ev := <-headEventCh:
head = ev.Block.Header()
case syncMatcher = <-f.matcherSyncCh:
head = f.chain.CurrentBlock()
case <-time.After(time.Second * 20):
// keep updating log index during syncing
head = f.chain.CurrentBlock()
case <-f.closeCh:
stop = true
case <-time.After(delay):
// keep updating log index during syncing
head = f.chain.CurrentBlock()
}
}
for head == nil {
@ -151,7 +161,7 @@ func (f *FilterMaps) tryInit(head *types.Header) bool {
if !f.reset() {
return false
}
receipts := rawdb.ReadRawReceipts(f.db, head.Hash(), head.Number.Uint64())
receipts := f.chain.GetReceiptsByHash(head.Hash())
if receipts == nil {
log.Error("Could not retrieve block receipts for init block", "number", head.Number, "hash", head.Hash())
return true
@ -161,6 +171,9 @@ func (f *FilterMaps) tryInit(head *types.Header) bool {
log.Error("Could not initialize log index", "error", err)
}
f.applyUpdateBatch(update)
if f.testHook != nil {
f.testHook(testHookInit)
}
return true
}
@ -222,7 +235,7 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
update := f.newUpdateBatch()
for i := len(newHeaders) - 1; i >= 0; i-- {
newHeader := newHeaders[i]
receipts := rawdb.ReadRawReceipts(f.db, newHeader.Hash(), newHeader.Number.Uint64())
receipts := f.chain.GetReceiptsByHash(newHeader.Hash())
if receipts == nil {
log.Error("Could not retrieve block receipts for new block", "number", newHeader.Number, "hash", newHeader.Hash())
break
@ -234,10 +247,16 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
if update.updatedRangeLength() >= f.mapsPerEpoch {
// limit the amount of data updated in a single batch
f.applyUpdateBatch(update)
if f.testHook != nil {
f.testHook(testHookUpdateHeadEpoch)
}
update = f.newUpdateBatch()
}
}
f.applyUpdateBatch(update)
if f.testHook != nil {
f.testHook(testHookUpdateHead)
}
return true
}
@ -273,6 +292,9 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) {
if tailEpoch := update.tailEpoch(); tailEpoch < lastTailEpoch {
// limit the amount of data updated in a single batch
f.applyUpdateBatch(update)
if f.testHook != nil {
f.testHook(testHookExtendTailEpoch)
}
update = f.newUpdateBatch()
lastTailEpoch = tailEpoch
}
@ -281,7 +303,7 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) {
log.Error("Tail header not found", "number", number-1, "hash", parentHash)
break
}
receipts := rawdb.ReadRawReceipts(f.db, newTail.Hash(), newTail.Number.Uint64())
receipts := f.chain.GetReceiptsByHash(newTail.Hash())
if receipts == nil {
log.Error("Could not retrieve block receipts for tail block", "number", newTail.Number, "hash", newTail.Hash())
break
@ -293,6 +315,9 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) {
number, parentHash = newTail.Number.Uint64(), newTail.ParentHash
}
f.applyUpdateBatch(update)
if f.testHook != nil {
f.testHook(testHookExtendTail)
}
}
// pruneTailPtr updates the tail block number and hash and the corresponding
@ -330,6 +355,9 @@ func (f *FilterMaps) pruneTailPtr(tailTarget uint64) {
fmr.tailBlockNumber, fmr.tailParentHash = tailTarget, tailParentHash
fmr.tailBlockLvPointer = targetLvPointer
f.setRange(f.db, fmr)
if f.testHook != nil {
f.testHook(testHookPruneTail)
}
}
// tryPruneTailMaps removes unused filter maps and corresponding log index
@ -401,6 +429,9 @@ func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) {
if err := batch.Write(); err != nil {
log.Crit("Could not write update batch", "error", err)
}
if f.testHook != nil {
f.testHook(testHookPruneTailMaps)
}
}
// updateBatch is a memory overlay collecting changes to the index log structure
@ -799,5 +830,8 @@ func (f *FilterMaps) revertTo(rp *revertPoint) error {
if err := batch.Write(); err != nil {
log.Crit("Could not write update batch", "error", err)
}
if f.testHook != nil {
f.testHook(testHookRevert)
}
return nil
}

@ -0,0 +1,280 @@
package filtermaps
import (
"math/big"
"math/rand"
"sync"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
)
const (
testHookInit = iota
testHookUpdateHeadEpoch
testHookUpdateHead
testHookExtendTailEpoch
testHookExtendTail
testHookPruneTail
testHookPruneTailMaps
testHookRevert
testHookWait
testHookStop
)
var testParams = Params{
logMapHeight: 2,
logMapsPerEpoch: 4,
logValuesPerMap: 4,
}
func TestIndexerSetHistory(t *testing.T) {
ts := newTestSetup(t)
ts.setHistory(0, false)
ts.chain.addBlocks(1000, 5, 2, 4, false) // 50 log values per block
ts.runUntilWait()
ts.setHistory(100, false)
ts.runUntil(func() bool {
l := ts.lastRange.headLvPointer - ts.lastRange.tailLvPointer
return l > 44000 && l < 45000
})
ts.setHistory(200, false)
ts.runUntilWait()
ts.setHistory(0, false)
ts.runUntilWait()
if ts.lastRange.headLvPointer-ts.lastRange.tailLvPointer != 50000 {
t.Fatalf("Invalid number of log values in the final state (expected %d, got %d)", 50000, ts.lastRange.headLvPointer-ts.lastRange.tailLvPointer)
}
}
func TestIndexerRandomSetHistory(t *testing.T) {
ts := newTestSetup(t)
ts.chain.addBlocks(100, 5, 2, 4, false) // 50 log values per block
for i := 0; i < 3000; i++ {
ts.setHistory(uint64(rand.Intn(1001)), false)
ts.nextEvent()
for rand.Intn(20) != 0 && ts.lastEvent != testHookWait {
ts.nextEvent()
}
}
ts.setHistory(0, false)
ts.runUntilWait()
if ts.lastRange.headLvPointer-ts.lastRange.tailLvPointer != 5000 {
t.Fatalf("Invalid number of log values in the final state (expected %d, got %d)", 5000, ts.lastRange.headLvPointer-ts.lastRange.tailLvPointer)
}
}
type testSetup struct {
t *testing.T
fm *FilterMaps
db ethdb.Database
chain *testChain
eventCh chan int
resumeCh chan struct{}
lastEvent int
lastRange filterMapsRange
}
func newTestSetup(t *testing.T) *testSetup {
return &testSetup{
t: t,
chain: newTestChain(),
db: rawdb.NewMemoryDatabase(),
eventCh: make(chan int),
resumeCh: make(chan struct{}),
}
}
func (ts *testSetup) runUntil(stop func() bool) {
for !stop() {
ts.nextEvent()
for ts.lastEvent == testHookWait {
ts.t.Fatalf("Indexer in waiting state before runUntil condition is met")
}
}
}
func (ts *testSetup) runUntilWait() {
ts.nextEvent()
for ts.lastEvent != testHookWait {
ts.nextEvent()
}
}
func (ts *testSetup) setHistory(history uint64, noHistory bool) {
if ts.fm != nil {
ts.stopFm()
}
ts.fm = NewFilterMaps(ts.db, ts.chain, testParams, history, noHistory)
ts.fm.testHook = ts.testHook
ts.fm.Start()
ts.lastEvent = <-ts.eventCh
}
func (ts *testSetup) testHook(event int) {
ts.eventCh <- event
<-ts.resumeCh
}
func (ts *testSetup) nextEvent() {
ts.resumeCh <- struct{}{}
ts.lastEvent = <-ts.eventCh
ts.lastRange = ts.fm.getRange()
}
func (ts *testSetup) stopFm() {
close(ts.fm.closeCh)
for {
ts.nextEvent()
if ts.lastEvent == testHookStop {
break
}
}
ts.resumeCh <- struct{}{}
ts.fm.closeWg.Wait()
}
func (ts *testSetup) close() {
ts.stopFm()
ts.db.Close()
ts.chain.db.Close()
}
type testChain struct {
db ethdb.Database
lock sync.RWMutex
canonical []common.Hash
chainHeadFeed event.Feed
blocks map[common.Hash]*types.Block
receipts map[common.Hash]types.Receipts
}
func newTestChain() *testChain {
return &testChain{
blocks: make(map[common.Hash]*types.Block),
receipts: make(map[common.Hash]types.Receipts),
}
}
func (tc *testChain) CurrentBlock() *types.Header {
tc.lock.RLock()
defer tc.lock.RUnlock()
if len(tc.canonical) == 0 {
return nil
}
return tc.blocks[tc.canonical[len(tc.canonical)-1]].Header()
}
func (tc *testChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
return tc.chainHeadFeed.Subscribe(ch)
}
func (tc *testChain) GetHeader(hash common.Hash, number uint64) *types.Header {
tc.lock.RLock()
defer tc.lock.RUnlock()
return tc.blocks[hash].Header()
}
func (tc *testChain) GetCanonicalHash(number uint64) common.Hash {
tc.lock.RLock()
defer tc.lock.RUnlock()
if uint64(len(tc.canonical)) <= number {
return common.Hash{}
}
return tc.canonical[number]
}
func (tc *testChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
tc.lock.RLock()
defer tc.lock.RUnlock()
return tc.receipts[hash]
}
func (tc *testChain) addBlocks(count, maxTxPerBlock, maxLogsPerReceipt, maxTopicsPerLog int, random bool) {
tc.lock.Lock()
defer tc.lock.Unlock()
blockGen := func(i int, gen *core.BlockGen) {
var txCount int
if random {
txCount = rand.Intn(maxTxPerBlock + 1)
} else {
txCount = maxTxPerBlock
}
for k := txCount; k > 0; k-- {
receipt := types.NewReceipt(nil, false, 0)
var logCount int
if random {
logCount = rand.Intn(maxLogsPerReceipt + 1)
} else {
logCount = maxLogsPerReceipt
}
receipt.Logs = make([]*types.Log, logCount)
for i := range receipt.Logs {
log := &types.Log{}
receipt.Logs[i] = log
rand.Read(log.Address[:])
var topicCount int
if random {
topicCount = rand.Intn(maxTopicsPerLog + 1)
} else {
topicCount = maxTopicsPerLog
}
log.Topics = make([]common.Hash, topicCount)
for j := range log.Topics {
rand.Read(log.Topics[j][:])
}
}
gen.AddUncheckedReceipt(receipt)
gen.AddUncheckedTx(types.NewTransaction(999, common.HexToAddress("0x999"), big.NewInt(999), 999, gen.BaseFee(), nil))
}
}
var (
blocks []*types.Block
receipts []types.Receipts
engine = ethash.NewFaker()
)
if len(tc.canonical) == 0 {
gspec := &core.Genesis{
Alloc: types.GenesisAlloc{},
BaseFee: big.NewInt(params.InitialBaseFee),
Config: params.TestChainConfig,
}
tc.db, blocks, receipts = core.GenerateChainWithGenesis(gspec, engine, count, blockGen)
gblock := gspec.ToBlock()
ghash := gblock.Hash()
tc.canonical = []common.Hash{ghash}
tc.blocks[ghash] = gblock
tc.receipts[ghash] = types.Receipts{}
} else {
blocks, receipts = core.GenerateChain(params.TestChainConfig, tc.blocks[tc.canonical[len(tc.canonical)-1]], engine, tc.db, count, blockGen)
}
for i, block := range blocks {
num, hash := int(block.NumberU64()), block.Hash()
if len(tc.canonical) != num {
panic(nil)
}
tc.canonical = append(tc.canonical, hash)
tc.blocks[hash] = block
if receipts[i] != nil {
tc.receipts[hash] = receipts[i]
} else {
tc.receipts[hash] = types.Receipts{}
}
}
tc.chainHeadFeed.Send(core.ChainHeadEvent{Block: tc.blocks[tc.canonical[len(tc.canonical)-1]]})
}

@ -360,6 +360,9 @@ func (s *Ethereum) Start() error {
// Start the networking layer
s.handler.Start(s.p2pServer.MaxPeers)
// start log indexer
s.filterMaps.Start()
return nil
}
@ -403,7 +406,7 @@ func (s *Ethereum) Stop() error {
s.handler.Stop()
// Then stop everything else.
s.filterMaps.Close()
s.filterMaps.Stop()
s.txPool.Close()
s.blockchain.Stop()
s.engine.Close()

Loading…
Cancel
Save