From bca0bfe7f1ddce97c0c5d04ce6e987a856a917cd Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Thu, 19 Sep 2024 17:14:27 +0200 Subject: [PATCH] core/filtermaps: safe concurrent index update and search --- core/filtermaps/filtermaps.go | 65 ++++-------- core/filtermaps/indexer.go | 56 +++++++--- core/filtermaps/matcher.go | 117 ++++++++++++++++++++- core/filtermaps/matcher_backend.go | 158 +++++++++++++++++++++++++++++ eth/api_backend.go | 5 +- eth/backend.go | 1 - eth/filters/filter.go | 41 ++------ eth/filters/filter_system.go | 4 +- internal/ethapi/backend.go | 4 +- 9 files changed, 348 insertions(+), 103 deletions(-) create mode 100644 core/filtermaps/matcher_backend.go diff --git a/core/filtermaps/filtermaps.go b/core/filtermaps/filtermaps.go index 87fec8a0a3..8f5e436d9f 100644 --- a/core/filtermaps/filtermaps.go +++ b/core/filtermaps/filtermaps.go @@ -1,7 +1,6 @@ package filtermaps import ( - "context" "crypto/sha256" "encoding/binary" "errors" @@ -14,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" ) @@ -28,6 +28,14 @@ const ( headCacheSize = 8 // maximum number of recent filter maps cached in memory ) +// blockchain defines functions required by the FilterMaps log indexer. +type blockchain interface { + CurrentBlock() *types.Header + SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription + GetHeader(hash common.Hash, number uint64) *types.Header + GetCanonicalHash(number uint64) common.Hash +} + // FilterMaps is the in-memory representation of the log index structure that is // responsible for building and updating the index according to the canonical // chain. @@ -38,10 +46,10 @@ type FilterMaps struct { lock sync.RWMutex db ethdb.Database closeCh chan chan struct{} - filterMapsRange - chain *core.BlockChain - + chain blockchain + matcherSyncCh chan *FilterMapsMatcherBackend + matchers map[*FilterMapsMatcherBackend]struct{} // filterMapCache caches certain filter maps (headCacheSize most recent maps // and one tail map) that are expected to be frequently accessed and modified // while updating the structure. Note that the set of cached maps depends @@ -86,7 +94,7 @@ type filterMapsRange struct { // NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep // the structure in sync with the given blockchain. -func NewFilterMaps(db ethdb.Database, chain *core.BlockChain) *FilterMaps { +func NewFilterMaps(db ethdb.Database, chain blockchain) *FilterMaps { rs, err := rawdb.ReadFilterMapsRange(db) if err != nil { log.Error("Error reading log index range", "error", err) @@ -104,6 +112,8 @@ func NewFilterMaps(db ethdb.Database, chain *core.BlockChain) *FilterMaps { headBlockHash: rs.HeadBlockHash, tailParentHash: rs.TailParentHash, }, + matcherSyncCh: make(chan *FilterMapsMatcherBackend), + matchers: make(map[*FilterMapsMatcherBackend]struct{}), filterMapCache: make(map[uint32]*filterMap), blockPtrCache: lru.NewCache[uint32, uint64](1000), lvPointerCache: lru.NewCache[uint64, uint64](1000), @@ -129,46 +139,6 @@ func (f *FilterMaps) Close() { <-ch } -// FilterMapsMatcherBackend implements MatcherBackend. -type FilterMapsMatcherBackend FilterMaps - -// GetFilterMapRow returns the given row of the given map. If the row is empty -// then a non-nil zero length row is returned. -// Note that the returned slices should not be modified, they should be copied -// on write. -// GetFilterMapRow implements MatcherBackend. -func (ff *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error) { - f := (*FilterMaps)(ff) - return f.getFilterMapRow(mapIndex, rowIndex) -} - -// GetBlockLvPointer returns the starting log value index where the log values -// generated by the given block are located. If blockNumber is beyond the current -// head then the first unoccupied log value index is returned. -// GetBlockLvPointer implements MatcherBackend. -func (ff *FilterMapsMatcherBackend) GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) { - f := (*FilterMaps)(ff) - f.lock.RLock() - defer f.lock.RUnlock() - - return f.getBlockLvPointer(blockNumber) -} - -// GetLogByLvIndex returns the log at the given log value index. If the index does -// not point to the first log value entry of a log then no log and no error are -// returned as this can happen when the log value index was a false positive. -// Note that this function assumes that the log index structure is consistent -// with the canonical chain at the point where the given log value index points. -// If this is not the case then an invalid result or an error may be returned. -// GetLogByLvIndex implements MatcherBackend. -func (ff *FilterMapsMatcherBackend) GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) { - f := (*FilterMaps)(ff) - f.lock.RLock() - defer f.lock.RUnlock() - - return f.getLogByLvIndex(lvIndex) -} - // reset un-initializes the FilterMaps structure and removes all related data from // the database. // Note that this function assumes that the read/write lock is being held. @@ -224,6 +194,7 @@ func (f *FilterMaps) setRange(batch ethdb.Batch, newRange filterMapsRange) { } rawdb.WriteFilterMapsRange(batch, rs) f.updateMapCache() + f.updateMatchersValidRange() } // updateMapCache updates the maps covered by the filterMapCache according to the @@ -266,7 +237,7 @@ func (f *FilterMaps) updateMapCache() { // Note that this function assumes that the read lock is being held. func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) { if lvIndex < f.tailLvPointer || lvIndex > f.headLvPointer { - return nil, errors.New("log value index outside available range") + return nil, nil } // find possible block range based on map to block pointers mapIndex := uint32(lvIndex >> logValuesPerMap) @@ -321,7 +292,7 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) { lvPointer += uint64(len(log.Topics) + 1) } } - return nil, errors.New("log value index not found") + return nil, nil } // getFilterMapRow returns the given row of the given map. If the row is empty diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index 0d050923ee..a9b1b56136 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -22,30 +22,35 @@ const ( // updateLoop initializes and updates the log index structure according to the // canonical chain. func (f *FilterMaps) updateLoop() { - headEventCh := make(chan core.ChainHeadEvent) - sub := f.chain.SubscribeChainHeadEvent(headEventCh) - defer sub.Unsubscribe() + var ( + headEventCh = make(chan core.ChainHeadEvent) + sub = f.chain.SubscribeChainHeadEvent(headEventCh) + head *types.Header + stop bool + syncMatcher *FilterMapsMatcherBackend + ) - head := f.chain.CurrentBlock() - if head == nil { - select { - case ev := <-headEventCh: - head = ev.Block.Header() - case ch := <-f.closeCh: - close(ch) - return + defer func() { + sub.Unsubscribe() + if syncMatcher != nil { + syncMatcher.synced(head) + syncMatcher = nil } - } - fmr := f.getRange() + }() - var stop bool wait := func() { + if syncMatcher != nil { + syncMatcher.synced(head) + syncMatcher = nil + } if stop { return } select { case ev := <-headEventCh: head = ev.Block.Header() + case syncMatcher = <-f.matcherSyncCh: + head = f.chain.CurrentBlock() case <-time.After(time.Second * 20): // keep updating log index during syncing head = f.chain.CurrentBlock() @@ -54,10 +59,21 @@ func (f *FilterMaps) updateLoop() { stop = true } } + for head == nil { + wait() + if stop { + return + } + } + fmr := f.getRange() for !stop { if !fmr.initialized { f.tryInit(head) + if syncMatcher != nil { + syncMatcher.synced(head) + syncMatcher = nil + } fmr = f.getRange() if !fmr.initialized { wait() @@ -73,12 +89,18 @@ func (f *FilterMaps) updateLoop() { continue } } + if syncMatcher != nil { + syncMatcher.synced(head) + syncMatcher = nil + } // log index head is at latest chain head; process tail blocks if possible f.tryExtendTail(func() bool { // return true if tail processing needs to be stopped select { case ev := <-headEventCh: head = ev.Block.Header() + case syncMatcher = <-f.matcherSyncCh: + head = f.chain.CurrentBlock() case ch := <-f.closeCh: close(ch) stop = true @@ -549,6 +571,9 @@ func (u *updateBatch) makeRevertPoint() (*revertPoint, error) { // number from memory cache or from the database if available. If no such revert // point is available then it returns no result and no error. func (f *FilterMaps) getRevertPoint(blockNumber uint64) (*revertPoint, error) { + f.lock.RLock() + defer f.lock.RUnlock() + if blockNumber > f.headBlockNumber { blockNumber = f.headBlockNumber } @@ -577,6 +602,9 @@ func (f *FilterMaps) getRevertPoint(blockNumber uint64) (*revertPoint, error) { // revertTo reverts the log index to the given revert point. func (f *FilterMaps) revertTo(rp *revertPoint) error { + f.lock.Lock() + defer f.lock.Unlock() + batch := f.db.NewBatch() afterLastMap := uint32((f.headLvPointer + valuesPerMap - 1) >> logValuesPerMap) if rp.mapIndex >= afterLastMap { diff --git a/core/filtermaps/matcher.go b/core/filtermaps/matcher.go index 3bc08494e1..8e2d236b6b 100644 --- a/core/filtermaps/matcher.go +++ b/core/filtermaps/matcher.go @@ -2,6 +2,7 @@ package filtermaps import ( "context" + "errors" "math" "sync" "sync/atomic" @@ -18,13 +19,121 @@ type MatcherBackend interface { GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error) GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) + SyncLogIndex(ctx context.Context) (SyncRange, error) + Close() +} + +// SyncRange is returned by MatcherBackend.SyncLogIndex. It contains the latest +// chain head, the indexed range that is currently consistent with the chain +// and the valid range that has not been changed and has been consistent with +// all states of the chain since the previous SyncLogIndex or the creation of +// the matcher backend. +type SyncRange struct { + Head *types.Header + // block range where the index has not changed since the last matcher sync + // and therefore the set of matches found in this region is guaranteed to + // be valid and complete. + Valid bool + FirstValid, LastValid uint64 + // block range indexed according to the given chain head. + Indexed bool + FirstIndexed, LastIndexed uint64 } // GetPotentialMatches returns a list of logs that are potential matches for the -// given filter criteria. Note that the returned list may still contain false -// positives. -//TODO add protection against reorgs during search -func GetPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, error) { +// given filter criteria. If parts of the requested range are not indexed then +// an error is returned. If parts of the requested range are changed during the +// search process then potentially incorrect logs are discarded and searched +// again, ensuring that the returned results are always consistent with the latest +// state of the chain. +// If firstBlock or lastBlock are bigger than the head block number then they are +// substituted with the latest head of the chain, ensuring that a search until +// the head block is still consistent with the latest canonical chain if a new +// head has been added during the process. +// Note that the returned list may still contain false positives. +func GetPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, *types.Header, uint64, uint64, error) { + if firstBlock > lastBlock { + return nil, nil, 0, 0, errors.New("invalid search range") + } + // enforce a consistent state before starting the search in order to be able + // to determine valid range later + syncRange, err := backend.SyncLogIndex(ctx) + if err != nil { + return nil, nil, 0, 0, err + } + headBlock := syncRange.Head.Number.Uint64() // Head is guaranteed != nil + // if haveMatches == true then matches correspond to the block number range + // between matchFirst and matchLast + var ( + matches []*types.Log + haveMatches bool + matchFirst, matchLast uint64 + ) + for !haveMatches || (matchLast < lastBlock && matchLast < headBlock) { + // determine range to be searched; for simplicity we only extend the most + // recent end of the existing match set by matching between searchFirst + // and searchLast. + searchFirst, searchLast := firstBlock, lastBlock + if searchFirst > headBlock { + searchFirst = headBlock + } + if searchLast > headBlock { + searchLast = headBlock + } + if haveMatches && matchFirst != searchFirst { + // searchFirst might change if firstBlock > headBlock + matches, haveMatches = nil, false + } + if haveMatches && matchLast >= searchFirst { + searchFirst = matchLast + 1 + } + // check if indexed range covers the requested range + if !syncRange.Indexed || syncRange.FirstIndexed > searchFirst || syncRange.LastIndexed < searchLast { + return nil, nil, 0, 0, errors.New("log index not available for requested range") + } + // search for matches in the required range + newMatches, err := getPotentialMatches(ctx, backend, searchFirst, searchLast, addresses, topics) + if err != nil { + return nil, nil, 0, 0, err + } + // enforce a consistent state again in order to determine the guaranteed + // valid range in which the log index has not been changed since the last + // sync. + syncRange, err = backend.SyncLogIndex(ctx) + if err != nil { + return nil, nil, 0, 0, err + } + headBlock = syncRange.Head.Number.Uint64() + // return with error if the beginning of the recently searched range might + // be invalid due to removed log index + if !syncRange.Valid || syncRange.FirstValid > searchFirst || syncRange.LastValid < searchFirst { + return nil, nil, 0, 0, errors.New("log index not available for requested range") + } + // roll back most recent matches if they are not covered by the guaranteed + // valid range + if syncRange.LastValid < searchLast { + for len(newMatches) > 0 && newMatches[len(newMatches)-1].BlockNumber > syncRange.LastValid { + newMatches = newMatches[:len(newMatches)-1] + } + searchLast = syncRange.LastValid + } + // append new matches to existing ones if the were any + if haveMatches { + matches = append(matches, newMatches...) + } else { + matches, haveMatches = newMatches, true + } + matchLast = searchLast + } + return matches, syncRange.Head, firstBlock, matchLast, nil +} + +// getPotentialMatches returns a list of logs that are potential matches for the +// given filter criteria. If parts of the log index in the searched range are +// missing or changed during the search process then the resulting logs belonging +// to that block range might be missing or incorrect. +// Also note that the returned list may contain false positives. +func getPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, error) { // find the log value index range to search firstIndex, err := backend.GetBlockLvPointer(ctx, firstBlock) if err != nil { diff --git a/core/filtermaps/matcher_backend.go b/core/filtermaps/matcher_backend.go new file mode 100644 index 0000000000..b340167bb4 --- /dev/null +++ b/core/filtermaps/matcher_backend.go @@ -0,0 +1,158 @@ +package filtermaps + +import ( + "context" + "errors" + + "github.com/ethereum/go-ethereum/core/types" +) + +// FilterMapsMatcherBackend implements MatcherBackend. +type FilterMapsMatcherBackend struct { + f *FilterMaps + valid bool + firstValid, lastValid uint64 + syncCh chan SyncRange +} + +// NewMatcherBackend returns a FilterMapsMatcherBackend after registering it in +// the active matcher set. +// Note that Close should always be called when the matcher is no longer used. +func (f *FilterMaps) NewMatcherBackend() *FilterMapsMatcherBackend { + f.lock.Lock() + defer f.lock.Unlock() + + fm := &FilterMapsMatcherBackend{ + f: f, + valid: f.initialized, + firstValid: f.tailBlockNumber, + lastValid: f.headBlockNumber, + } + f.matchers[fm] = struct{}{} + return fm +} + +// updateMatchersValidRange iterates through active matchers and limits their +// valid range with the current indexed range. This function should be called +// whenever a part of the log index has been removed, before adding new blocks +// to it. +func (f *FilterMaps) updateMatchersValidRange() { + for fm := range f.matchers { + if !f.initialized { + fm.valid = false + } + if !fm.valid { + continue + } + if fm.firstValid < f.tailBlockNumber { + fm.firstValid = f.tailBlockNumber + } + if fm.lastValid > f.headBlockNumber { + fm.lastValid = f.headBlockNumber + } + if fm.firstValid > fm.lastValid { + fm.valid = false + } + } +} + +// Close removes the matcher from the set of active matchers and ensures that +// any SyncLogIndex calls are cancelled. +func (fm *FilterMapsMatcherBackend) Close() { + fm.f.lock.Lock() + defer fm.f.lock.Unlock() + + delete(fm.f.matchers, fm) +} + +// GetFilterMapRow returns the given row of the given map. If the row is empty +// then a non-nil zero length row is returned. +// Note that the returned slices should not be modified, they should be copied +// on write. +// GetFilterMapRow implements MatcherBackend. +func (fm *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error) { + return fm.f.getFilterMapRow(mapIndex, rowIndex) +} + +// GetBlockLvPointer returns the starting log value index where the log values +// generated by the given block are located. If blockNumber is beyond the current +// head then the first unoccupied log value index is returned. +// GetBlockLvPointer implements MatcherBackend. +func (fm *FilterMapsMatcherBackend) GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) { + fm.f.lock.RLock() + defer fm.f.lock.RUnlock() + + return fm.f.getBlockLvPointer(blockNumber) +} + +// GetLogByLvIndex returns the log at the given log value index. +// Note that this function assumes that the log index structure is consistent +// with the canonical chain at the point where the given log value index points. +// If this is not the case then an invalid result may be returned or certain +// logs might not be returned at all. +// No error is returned though because of an inconsistency between the chain and +// the log index. It is the caller's responsibility to verify this consistency +// using SyncLogIndex and re-process certain blocks if necessary. +// GetLogByLvIndex implements MatcherBackend. +func (fm *FilterMapsMatcherBackend) GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) { + fm.f.lock.RLock() + defer fm.f.lock.RUnlock() + + return fm.f.getLogByLvIndex(lvIndex) +} + +// synced signals to the matcher that has triggered a synchronisation that it +// has been finished and the log index is consistent with the chain head passed +// as a parameter. +// Note that if the log index head was far behind the chain head then it might not +// be synced up to the given head in a single step. Still, the latest chain head +// should be passed as a parameter and the existing log index should be consistent +// with that chain. +func (fm *FilterMapsMatcherBackend) synced(head *types.Header) { + fm.f.lock.Lock() + defer fm.f.lock.Unlock() + + fm.syncCh <- SyncRange{ + Head: head, + Valid: fm.valid, + FirstValid: fm.firstValid, + LastValid: fm.lastValid, + Indexed: fm.f.initialized, + FirstIndexed: fm.f.tailBlockNumber, + LastIndexed: fm.f.headBlockNumber, + } + fm.valid = fm.f.initialized + fm.firstValid = fm.f.tailBlockNumber + fm.lastValid = fm.f.headBlockNumber + fm.syncCh = nil +} + +// SyncLogIndex ensures that the log index is consistent with the current state +// of the chain (note that it may or may not be actually synced up to the head). +// It blocks until this state is achieved. +// If successful, it returns a SyncRange that contains the latest chain head, +// the indexed range that is currently consistent with the chain and the valid +// range that has not been changed and has been consistent with all states of the +// chain since the previous SyncLogIndex or the creation of the matcher backend. +func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange, error) { + // add SyncRange return channel, ensuring that + syncCh := make(chan SyncRange, 1) + fm.f.lock.Lock() + fm.syncCh = syncCh + fm.f.lock.Unlock() + + select { + case fm.f.matcherSyncCh <- fm: + case <-ctx.Done(): + return SyncRange{}, ctx.Err() + } + select { + case vr := <-syncCh: + if vr.Head == nil { + return SyncRange{}, errors.New("canonical chain head not available") + } + return vr, nil + case <-ctx.Done(): + return SyncRange{}, ctx.Err() + } +} diff --git a/eth/api_backend.go b/eth/api_backend.go index c40f2a1bf7..0461bfd597 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -45,7 +45,6 @@ import ( // EthAPIBackend implements ethapi.Backend and tracers.Backend for full nodes type EthAPIBackend struct { - *filtermaps.FilterMapsMatcherBackend extRPCEnabled bool allowUnprotectedTxs bool eth *Ethereum @@ -414,6 +413,10 @@ func (b *EthAPIBackend) ServiceFilter(ctx context.Context, session *bloombits.Ma } } +func (b *EthAPIBackend) NewMatcherBackend() filtermaps.MatcherBackend { + return b.eth.filterMaps.NewMatcherBackend() +} + func (b *EthAPIBackend) Engine() consensus.Engine { return b.eth.engine } diff --git a/eth/backend.go b/eth/backend.go index c4007f4f48..0dc5b23d5d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -261,7 +261,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) eth.APIBackend = &EthAPIBackend{ - FilterMapsMatcherBackend: (*filtermaps.FilterMapsMatcherBackend)(eth.filterMaps), extRPCEnabled: stack.Config().ExtRPCEnabled(), allowUnprotectedTxs: stack.Config().AllowUnprotectedTxs, eth: eth, diff --git a/eth/filters/filter.go b/eth/filters/filter.go index e3d1adc5fe..343fa5f0ff 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -19,10 +19,8 @@ package filters import ( "context" "errors" - "fmt" + "math" "math/big" - - //"reflect" "slices" "time" @@ -30,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" ) @@ -120,29 +119,26 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { } resolveSpecial := func(number int64) (int64, error) { - var hdr *types.Header switch number { case rpc.LatestBlockNumber.Int64(), rpc.PendingBlockNumber.Int64(): // we should return head here since we've already captured // that we need to get the pending logs in the pending boolean above - hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) - if hdr == nil { - return 0, errors.New("latest header not found") - } + return math.MaxInt64, nil case rpc.FinalizedBlockNumber.Int64(): - hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.FinalizedBlockNumber) + hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.FinalizedBlockNumber) if hdr == nil { return 0, errors.New("finalized header not found") } + return hdr.Number.Int64(), nil case rpc.SafeBlockNumber.Int64(): - hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.SafeBlockNumber) + hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.SafeBlockNumber) if hdr == nil { return 0, errors.New("safe header not found") } + return hdr.Number.Int64(), nil default: return number, nil } - return hdr.Number.Int64(), nil } var err error @@ -155,26 +151,11 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { } start := time.Now() - logs, err := filtermaps.GetPotentialMatches(ctx, f.sys.backend, uint64(f.begin), uint64(f.end), f.addresses, f.topics) + mb := f.sys.backend.NewMatcherBackend() + logs, _, _, _, err := filtermaps.GetPotentialMatches(ctx, mb, uint64(f.begin), uint64(f.end), f.addresses, f.topics) + mb.Close() fmLogs := filterLogs(logs, nil, nil, f.addresses, f.topics) - fmt.Println("filtermaps (new) runtime", time.Since(start), "true matches", len(fmLogs), "false positives", len(logs)-len(fmLogs)) - - //TODO remove - /*f.bbMatchCount = 0 - start = time.Now() - logChan, errChan := f.rangeLogsAsync(ctx) - var bbLogs []*types.Log - loop: - for { - select { - case log := <-logChan: - bbLogs = append(bbLogs, log) - case <-errChan: - break loop - } - } - fmt.Println("bloombits (old) runtime", time.Since(start), "true matches", len(bbLogs), "false positives", f.bbMatchCount-uint64(len(bbLogs))) - fmt.Println("DeepEqual", reflect.DeepEqual(fmLogs, bbLogs))*/ + log.Debug("Finished log search", "run time", time.Since(start), "true matches", len(fmLogs), "false positives", len(logs)-len(fmLogs)) return fmLogs, err } diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index a6ed7a8c23..6c3d8be86b 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -73,9 +73,7 @@ type Backend interface { BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) - GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) - GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (filtermaps.FilterRow, error) - GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) + NewMatcherBackend() filtermaps.MatcherBackend } // FilterSystem holds resources shared by all filters. diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 2891d6095d..f09f0769f5 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -97,9 +97,7 @@ type Backend interface { BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) - GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) - GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (filtermaps.FilterRow, error) - GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) + NewMatcherBackend() filtermaps.MatcherBackend } func GetAPIs(apiBackend Backend) []rpc.API {