From f8e98ae974b97cd27dc3c68f9a6ec36174e2f449 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Sun, 29 Sep 2024 10:38:49 +0200 Subject: [PATCH] core/filtermaps: add indexer test --- core/filtermaps/filtermaps.go | 21 ++- core/filtermaps/indexer.go | 50 +++++- core/filtermaps/indexer_test.go | 280 ++++++++++++++++++++++++++++++++ eth/backend.go | 5 +- 4 files changed, 339 insertions(+), 17 deletions(-) create mode 100644 core/filtermaps/indexer_test.go diff --git a/core/filtermaps/filtermaps.go b/core/filtermaps/filtermaps.go index 600145fdca..75ad51ada9 100644 --- a/core/filtermaps/filtermaps.go +++ b/core/filtermaps/filtermaps.go @@ -23,6 +23,7 @@ type blockchain interface { SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription GetHeader(hash common.Hash, number uint64) *types.Header GetCanonicalHash(number uint64) common.Hash + GetReceiptsByHash(hash common.Hash) types.Receipts } // FilterMaps is the in-memory representation of the log index structure that is @@ -33,7 +34,7 @@ type blockchain interface { // https://eips.ethereum.org/EIPS/eip-7745 type FilterMaps struct { lock sync.RWMutex - db ethdb.Database + db ethdb.KeyValueStore closeCh chan struct{} closeWg sync.WaitGroup history uint64 @@ -53,6 +54,8 @@ type FilterMaps struct { blockPtrCache *lru.Cache[uint32, uint64] lvPointerCache *lru.Cache[uint64, uint64] revertPoints map[uint64]*revertPoint + + testHook func(int) } // filterMap is a full or partial in-memory representation of a filter map where @@ -94,7 +97,7 @@ type filterMapsRange struct { // NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep // the structure in sync with the given blockchain. -func NewFilterMaps(db ethdb.Database, chain blockchain, params Params, history uint64, noHistory bool) *FilterMaps { +func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, history uint64, noHistory bool) *FilterMaps { rs, err := rawdb.ReadFilterMapsRange(db) if err != nil { log.Error("Error reading log index range", "error", err) @@ -128,14 +131,17 @@ func NewFilterMaps(db ethdb.Database, chain blockchain, params Params, history u log.Error("Error fetching tail block pointer, resetting log index", "error", err) fm.filterMapsRange = filterMapsRange{} // updateLoop resets the database } - fm.closeWg.Add(2) - go fm.removeBloomBits() - go fm.updateLoop() return fm } +func (f *FilterMaps) Start() { + f.closeWg.Add(2) + go f.removeBloomBits() + go f.updateLoop() +} + // Close ensures that the indexer is fully stopped before returning. -func (f *FilterMaps) Close() { +func (f *FilterMaps) Stop() { close(f.closeCh) f.closeWg.Wait() } @@ -297,8 +303,7 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) { } } // get block receipts - hash := f.chain.GetCanonicalHash(firstBlockNumber) - receipts := rawdb.ReadRawReceipts(f.db, hash, firstBlockNumber) //TODO small cache + receipts := f.chain.GetReceiptsByHash(f.chain.GetCanonicalHash(firstBlockNumber)) if receipts == nil { return nil, errors.New("receipts not found") } diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index 6d94e10b45..c5a0fbb485 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -22,7 +22,12 @@ const ( // updateLoop initializes and updates the log index structure according to the // canonical chain. func (f *FilterMaps) updateLoop() { - defer f.closeWg.Done() + defer func() { + f.closeWg.Done() + if f.testHook != nil { + f.testHook(testHookStop) + } + }() if f.noHistory { f.reset() @@ -38,7 +43,7 @@ func (f *FilterMaps) updateLoop() { var ( headEventCh = make(chan core.ChainHeadEvent, 10) sub = f.chain.SubscribeChainHeadEvent(headEventCh) - head *types.Header + head = f.chain.CurrentBlock() stop bool syncMatcher *FilterMapsMatcherBackend ) @@ -59,16 +64,21 @@ func (f *FilterMaps) updateLoop() { if stop { return } + delay := time.Second * 20 + if f.testHook != nil { + f.testHook(testHookWait) + delay = 0 + } select { case ev := <-headEventCh: head = ev.Block.Header() case syncMatcher = <-f.matcherSyncCh: head = f.chain.CurrentBlock() - case <-time.After(time.Second * 20): - // keep updating log index during syncing - head = f.chain.CurrentBlock() case <-f.closeCh: stop = true + case <-time.After(delay): + // keep updating log index during syncing + head = f.chain.CurrentBlock() } } for head == nil { @@ -151,7 +161,7 @@ func (f *FilterMaps) tryInit(head *types.Header) bool { if !f.reset() { return false } - receipts := rawdb.ReadRawReceipts(f.db, head.Hash(), head.Number.Uint64()) + receipts := f.chain.GetReceiptsByHash(head.Hash()) if receipts == nil { log.Error("Could not retrieve block receipts for init block", "number", head.Number, "hash", head.Hash()) return true @@ -161,6 +171,9 @@ func (f *FilterMaps) tryInit(head *types.Header) bool { log.Error("Could not initialize log index", "error", err) } f.applyUpdateBatch(update) + if f.testHook != nil { + f.testHook(testHookInit) + } return true } @@ -222,7 +235,7 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool { update := f.newUpdateBatch() for i := len(newHeaders) - 1; i >= 0; i-- { newHeader := newHeaders[i] - receipts := rawdb.ReadRawReceipts(f.db, newHeader.Hash(), newHeader.Number.Uint64()) + receipts := f.chain.GetReceiptsByHash(newHeader.Hash()) if receipts == nil { log.Error("Could not retrieve block receipts for new block", "number", newHeader.Number, "hash", newHeader.Hash()) break @@ -234,10 +247,16 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool { if update.updatedRangeLength() >= f.mapsPerEpoch { // limit the amount of data updated in a single batch f.applyUpdateBatch(update) + if f.testHook != nil { + f.testHook(testHookUpdateHeadEpoch) + } update = f.newUpdateBatch() } } f.applyUpdateBatch(update) + if f.testHook != nil { + f.testHook(testHookUpdateHead) + } return true } @@ -273,6 +292,9 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) { if tailEpoch := update.tailEpoch(); tailEpoch < lastTailEpoch { // limit the amount of data updated in a single batch f.applyUpdateBatch(update) + if f.testHook != nil { + f.testHook(testHookExtendTailEpoch) + } update = f.newUpdateBatch() lastTailEpoch = tailEpoch } @@ -281,7 +303,7 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) { log.Error("Tail header not found", "number", number-1, "hash", parentHash) break } - receipts := rawdb.ReadRawReceipts(f.db, newTail.Hash(), newTail.Number.Uint64()) + receipts := f.chain.GetReceiptsByHash(newTail.Hash()) if receipts == nil { log.Error("Could not retrieve block receipts for tail block", "number", newTail.Number, "hash", newTail.Hash()) break @@ -293,6 +315,9 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) { number, parentHash = newTail.Number.Uint64(), newTail.ParentHash } f.applyUpdateBatch(update) + if f.testHook != nil { + f.testHook(testHookExtendTail) + } } // pruneTailPtr updates the tail block number and hash and the corresponding @@ -330,6 +355,9 @@ func (f *FilterMaps) pruneTailPtr(tailTarget uint64) { fmr.tailBlockNumber, fmr.tailParentHash = tailTarget, tailParentHash fmr.tailBlockLvPointer = targetLvPointer f.setRange(f.db, fmr) + if f.testHook != nil { + f.testHook(testHookPruneTail) + } } // tryPruneTailMaps removes unused filter maps and corresponding log index @@ -401,6 +429,9 @@ func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) { if err := batch.Write(); err != nil { log.Crit("Could not write update batch", "error", err) } + if f.testHook != nil { + f.testHook(testHookPruneTailMaps) + } } // updateBatch is a memory overlay collecting changes to the index log structure @@ -799,5 +830,8 @@ func (f *FilterMaps) revertTo(rp *revertPoint) error { if err := batch.Write(); err != nil { log.Crit("Could not write update batch", "error", err) } + if f.testHook != nil { + f.testHook(testHookRevert) + } return nil } diff --git a/core/filtermaps/indexer_test.go b/core/filtermaps/indexer_test.go new file mode 100644 index 0000000000..5fd3f12ce5 --- /dev/null +++ b/core/filtermaps/indexer_test.go @@ -0,0 +1,280 @@ +package filtermaps + +import ( + "math/big" + "math/rand" + "sync" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/params" +) + +const ( + testHookInit = iota + testHookUpdateHeadEpoch + testHookUpdateHead + testHookExtendTailEpoch + testHookExtendTail + testHookPruneTail + testHookPruneTailMaps + testHookRevert + testHookWait + testHookStop +) + +var testParams = Params{ + logMapHeight: 2, + logMapsPerEpoch: 4, + logValuesPerMap: 4, +} + +func TestIndexerSetHistory(t *testing.T) { + ts := newTestSetup(t) + ts.setHistory(0, false) + ts.chain.addBlocks(1000, 5, 2, 4, false) // 50 log values per block + ts.runUntilWait() + ts.setHistory(100, false) + ts.runUntil(func() bool { + l := ts.lastRange.headLvPointer - ts.lastRange.tailLvPointer + return l > 44000 && l < 45000 + }) + ts.setHistory(200, false) + ts.runUntilWait() + ts.setHistory(0, false) + ts.runUntilWait() + if ts.lastRange.headLvPointer-ts.lastRange.tailLvPointer != 50000 { + t.Fatalf("Invalid number of log values in the final state (expected %d, got %d)", 50000, ts.lastRange.headLvPointer-ts.lastRange.tailLvPointer) + } +} + +func TestIndexerRandomSetHistory(t *testing.T) { + ts := newTestSetup(t) + ts.chain.addBlocks(100, 5, 2, 4, false) // 50 log values per block + for i := 0; i < 3000; i++ { + ts.setHistory(uint64(rand.Intn(1001)), false) + ts.nextEvent() + for rand.Intn(20) != 0 && ts.lastEvent != testHookWait { + ts.nextEvent() + } + } + ts.setHistory(0, false) + ts.runUntilWait() + if ts.lastRange.headLvPointer-ts.lastRange.tailLvPointer != 5000 { + t.Fatalf("Invalid number of log values in the final state (expected %d, got %d)", 5000, ts.lastRange.headLvPointer-ts.lastRange.tailLvPointer) + } +} + +type testSetup struct { + t *testing.T + fm *FilterMaps + db ethdb.Database + chain *testChain + eventCh chan int + resumeCh chan struct{} + lastEvent int + lastRange filterMapsRange +} + +func newTestSetup(t *testing.T) *testSetup { + return &testSetup{ + t: t, + chain: newTestChain(), + db: rawdb.NewMemoryDatabase(), + eventCh: make(chan int), + resumeCh: make(chan struct{}), + } +} + +func (ts *testSetup) runUntil(stop func() bool) { + for !stop() { + ts.nextEvent() + for ts.lastEvent == testHookWait { + ts.t.Fatalf("Indexer in waiting state before runUntil condition is met") + } + } +} + +func (ts *testSetup) runUntilWait() { + ts.nextEvent() + for ts.lastEvent != testHookWait { + ts.nextEvent() + } +} + +func (ts *testSetup) setHistory(history uint64, noHistory bool) { + if ts.fm != nil { + ts.stopFm() + } + ts.fm = NewFilterMaps(ts.db, ts.chain, testParams, history, noHistory) + ts.fm.testHook = ts.testHook + ts.fm.Start() + ts.lastEvent = <-ts.eventCh +} + +func (ts *testSetup) testHook(event int) { + ts.eventCh <- event + <-ts.resumeCh +} + +func (ts *testSetup) nextEvent() { + ts.resumeCh <- struct{}{} + ts.lastEvent = <-ts.eventCh + ts.lastRange = ts.fm.getRange() +} + +func (ts *testSetup) stopFm() { + close(ts.fm.closeCh) + for { + ts.nextEvent() + if ts.lastEvent == testHookStop { + break + } + } + ts.resumeCh <- struct{}{} + ts.fm.closeWg.Wait() +} + +func (ts *testSetup) close() { + ts.stopFm() + ts.db.Close() + ts.chain.db.Close() +} + +type testChain struct { + db ethdb.Database + lock sync.RWMutex + canonical []common.Hash + chainHeadFeed event.Feed + blocks map[common.Hash]*types.Block + receipts map[common.Hash]types.Receipts +} + +func newTestChain() *testChain { + return &testChain{ + blocks: make(map[common.Hash]*types.Block), + receipts: make(map[common.Hash]types.Receipts), + } +} + +func (tc *testChain) CurrentBlock() *types.Header { + tc.lock.RLock() + defer tc.lock.RUnlock() + + if len(tc.canonical) == 0 { + return nil + } + return tc.blocks[tc.canonical[len(tc.canonical)-1]].Header() +} + +func (tc *testChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { + return tc.chainHeadFeed.Subscribe(ch) +} + +func (tc *testChain) GetHeader(hash common.Hash, number uint64) *types.Header { + tc.lock.RLock() + defer tc.lock.RUnlock() + + return tc.blocks[hash].Header() +} + +func (tc *testChain) GetCanonicalHash(number uint64) common.Hash { + tc.lock.RLock() + defer tc.lock.RUnlock() + + if uint64(len(tc.canonical)) <= number { + return common.Hash{} + } + return tc.canonical[number] +} + +func (tc *testChain) GetReceiptsByHash(hash common.Hash) types.Receipts { + tc.lock.RLock() + defer tc.lock.RUnlock() + + return tc.receipts[hash] +} + +func (tc *testChain) addBlocks(count, maxTxPerBlock, maxLogsPerReceipt, maxTopicsPerLog int, random bool) { + tc.lock.Lock() + defer tc.lock.Unlock() + + blockGen := func(i int, gen *core.BlockGen) { + var txCount int + if random { + txCount = rand.Intn(maxTxPerBlock + 1) + } else { + txCount = maxTxPerBlock + } + for k := txCount; k > 0; k-- { + receipt := types.NewReceipt(nil, false, 0) + var logCount int + if random { + logCount = rand.Intn(maxLogsPerReceipt + 1) + } else { + logCount = maxLogsPerReceipt + } + receipt.Logs = make([]*types.Log, logCount) + for i := range receipt.Logs { + log := &types.Log{} + receipt.Logs[i] = log + rand.Read(log.Address[:]) + var topicCount int + if random { + topicCount = rand.Intn(maxTopicsPerLog + 1) + } else { + topicCount = maxTopicsPerLog + } + log.Topics = make([]common.Hash, topicCount) + for j := range log.Topics { + rand.Read(log.Topics[j][:]) + } + } + gen.AddUncheckedReceipt(receipt) + gen.AddUncheckedTx(types.NewTransaction(999, common.HexToAddress("0x999"), big.NewInt(999), 999, gen.BaseFee(), nil)) + } + } + + var ( + blocks []*types.Block + receipts []types.Receipts + engine = ethash.NewFaker() + ) + + if len(tc.canonical) == 0 { + gspec := &core.Genesis{ + Alloc: types.GenesisAlloc{}, + BaseFee: big.NewInt(params.InitialBaseFee), + Config: params.TestChainConfig, + } + tc.db, blocks, receipts = core.GenerateChainWithGenesis(gspec, engine, count, blockGen) + gblock := gspec.ToBlock() + ghash := gblock.Hash() + tc.canonical = []common.Hash{ghash} + tc.blocks[ghash] = gblock + tc.receipts[ghash] = types.Receipts{} + } else { + blocks, receipts = core.GenerateChain(params.TestChainConfig, tc.blocks[tc.canonical[len(tc.canonical)-1]], engine, tc.db, count, blockGen) + } + + for i, block := range blocks { + num, hash := int(block.NumberU64()), block.Hash() + if len(tc.canonical) != num { + panic(nil) + } + tc.canonical = append(tc.canonical, hash) + tc.blocks[hash] = block + if receipts[i] != nil { + tc.receipts[hash] = receipts[i] + } else { + tc.receipts[hash] = types.Receipts{} + } + } + tc.chainHeadFeed.Send(core.ChainHeadEvent{Block: tc.blocks[tc.canonical[len(tc.canonical)-1]]}) +} diff --git a/eth/backend.go b/eth/backend.go index b5a5be3994..283ad20dcb 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -360,6 +360,9 @@ func (s *Ethereum) Start() error { // Start the networking layer s.handler.Start(s.p2pServer.MaxPeers) + + // start log indexer + s.filterMaps.Start() return nil } @@ -403,7 +406,7 @@ func (s *Ethereum) Stop() error { s.handler.Stop() // Then stop everything else. - s.filterMaps.Close() + s.filterMaps.Stop() s.txPool.Close() s.blockchain.Stop() s.engine.Close()