diff --git a/core/filtermaps/filtermaps.go b/core/filtermaps/filtermaps.go index 3ba2856e29..600145fdca 100644 --- a/core/filtermaps/filtermaps.go +++ b/core/filtermaps/filtermaps.go @@ -1,10 +1,7 @@ package filtermaps import ( - "crypto/sha256" - "encoding/binary" "errors" - "sort" "sync" "time" @@ -18,16 +15,7 @@ import ( "github.com/ethereum/go-ethereum/log" ) -const ( - logMapHeight = 12 // log2(mapHeight) - mapHeight = 1 << logMapHeight // filter map height (number of rows) - logMapsPerEpoch = 6 // log2(mmapsPerEpochapsPerEpoch) - mapsPerEpoch = 1 << logMapsPerEpoch // number of maps in an epoch - logValuesPerMap = 16 // log2(logValuesPerMap) - valuesPerMap = 1 << logValuesPerMap // number of log values marked on each filter map - - headCacheSize = 8 // maximum number of recent filter maps cached in memory -) +const headCacheSize = 8 // maximum number of recent filter maps cached in memory // blockchain defines functions required by the FilterMaps log indexer. type blockchain interface { @@ -51,6 +39,7 @@ type FilterMaps struct { history uint64 noHistory bool + Params filterMapsRange chain blockchain matcherSyncCh chan *FilterMapsMatcherBackend @@ -60,7 +49,7 @@ type FilterMaps struct { // while updating the structure. Note that the set of cached maps depends // only on filterMapsRange and rows of other maps are not cached here. filterMapLock sync.Mutex - filterMapCache map[uint32]*filterMap + filterMapCache map[uint32]filterMap blockPtrCache *lru.Cache[uint32, uint64] lvPointerCache *lru.Cache[uint64, uint64] revertPoints map[uint64]*revertPoint @@ -73,7 +62,7 @@ type FilterMaps struct { // It can be used as a memory cache or an overlay while preparing a batch of // changes to the structure. In either case a nil value should be interpreted // as transparent (uncached/unchanged). -type filterMap [mapHeight]FilterRow +type filterMap []FilterRow // FilterRow encodes a single row of a filter map as a list of column indices. // Note that the values are always stored in the same order as they were added @@ -105,17 +94,19 @@ type filterMapsRange struct { // NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep // the structure in sync with the given blockchain. -func NewFilterMaps(db ethdb.Database, chain blockchain, history uint64, noHistory bool) *FilterMaps { +func NewFilterMaps(db ethdb.Database, chain blockchain, params Params, history uint64, noHistory bool) *FilterMaps { rs, err := rawdb.ReadFilterMapsRange(db) if err != nil { log.Error("Error reading log index range", "error", err) } + params.deriveFields() fm := &FilterMaps{ db: db, chain: chain, closeCh: make(chan struct{}), history: history, noHistory: noHistory, + Params: params, filterMapsRange: filterMapsRange{ initialized: rs.Initialized, headLvPointer: rs.HeadLvPointer, @@ -127,7 +118,7 @@ func NewFilterMaps(db ethdb.Database, chain blockchain, history uint64, noHistor }, matcherSyncCh: make(chan *FilterMapsMatcherBackend), matchers: make(map[*FilterMapsMatcherBackend]struct{}), - filterMapCache: make(map[uint32]*filterMap), + filterMapCache: make(map[uint32]filterMap), blockPtrCache: lru.NewCache[uint32, uint64](1000), lvPointerCache: lru.NewCache[uint64, uint64](1000), revertPoints: make(map[uint64]*revertPoint), @@ -154,7 +145,7 @@ func (f *FilterMaps) Close() { func (f *FilterMaps) reset() bool { f.lock.Lock() f.filterMapsRange = filterMapsRange{} - f.filterMapCache = make(map[uint32]*filterMap) + f.filterMapCache = make(map[uint32]filterMap) f.revertPoints = make(map[uint64]*revertPoint) f.blockPtrCache.Purge() f.lvPointerCache.Purge() @@ -242,21 +233,21 @@ func (f *FilterMaps) updateMapCache() { f.filterMapLock.Lock() defer f.filterMapLock.Unlock() - newFilterMapCache := make(map[uint32]*filterMap) - firstMap, afterLastMap := uint32(f.tailBlockLvPointer>>logValuesPerMap), uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap) + newFilterMapCache := make(map[uint32]filterMap) + firstMap, afterLastMap := uint32(f.tailBlockLvPointer>>f.logValuesPerMap), uint32((f.headLvPointer+f.valuesPerMap-1)>>f.logValuesPerMap) headCacheFirst := firstMap + 1 if afterLastMap > headCacheFirst+headCacheSize { headCacheFirst = afterLastMap - headCacheSize } fm := f.filterMapCache[firstMap] if fm == nil { - fm = new(filterMap) + fm = make(filterMap, f.mapHeight) } newFilterMapCache[firstMap] = fm for mapIndex := headCacheFirst; mapIndex < afterLastMap; mapIndex++ { fm := f.filterMapCache[mapIndex] if fm == nil { - fm = new(filterMap) + fm = make(filterMap, f.mapHeight) } newFilterMapCache[mapIndex] = fm } @@ -275,7 +266,7 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) { return nil, nil } // find possible block range based on map to block pointers - mapIndex := uint32(lvIndex >> logValuesPerMap) + mapIndex := uint32(lvIndex >> f.logValuesPerMap) firstBlockNumber, err := f.getMapBlockPtr(mapIndex) if err != nil { return nil, err @@ -284,7 +275,7 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) { firstBlockNumber = f.tailBlockNumber } var lastBlockNumber uint64 - if mapIndex+1 < uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap) { + if mapIndex+1 < uint32((f.headLvPointer+f.valuesPerMap-1)>>f.logValuesPerMap) { lastBlockNumber, err = f.getMapBlockPtr(mapIndex + 1) if err != nil { return nil, err @@ -345,7 +336,7 @@ func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32) (FilterRow, erro if fm != nil && fm[rowIndex] != nil { return fm[rowIndex], nil } - row, err := rawdb.ReadFilterMapRow(f.db, mapRowIndex(mapIndex, rowIndex)) + row, err := rawdb.ReadFilterMapRow(f.db, f.mapRowIndex(mapIndex, rowIndex)) if err != nil { return nil, err } @@ -364,9 +355,9 @@ func (f *FilterMaps) storeFilterMapRow(batch ethdb.Batch, mapIndex, rowIndex uin defer f.filterMapLock.Unlock() if fm := f.filterMapCache[mapIndex]; fm != nil { - (*fm)[rowIndex] = row + fm[rowIndex] = row } - rawdb.WriteFilterMapRow(batch, mapRowIndex(mapIndex, rowIndex), []uint32(row)) + rawdb.WriteFilterMapRow(batch, f.mapRowIndex(mapIndex, rowIndex), []uint32(row)) } // mapRowIndex calculates the unified storage index where the given row of the @@ -375,9 +366,9 @@ func (f *FilterMaps) storeFilterMapRow(batch ethdb.Batch, mapIndex, rowIndex uin // same data proximity reasons it is also suitable for database representation. // See also: // https://eips.ethereum.org/EIPS/eip-7745#hash-tree-structure -func mapRowIndex(mapIndex, rowIndex uint32) uint64 { - epochIndex, mapSubIndex := mapIndex>>logMapsPerEpoch, mapIndex%mapsPerEpoch - return (uint64(epochIndex)<>f.logMapsPerEpoch, mapIndex&(f.mapsPerEpoch-1) + return (uint64(epochIndex)< 1 { - q := a / m - m, a = a%m, m - x, y = y, x-q*y - } - if x < 0 { - x += m0 - } - return uint32(x) -} diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index d29debe135..6d94e10b45 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -13,10 +13,10 @@ import ( ) const ( - startLvPointer = valuesPerMap << 31 // log value index assigned to init block - removedPointer = math.MaxUint64 // used in updateBatch to signal removed items - revertPointFrequency = 256 // frequency of revert points in database - cachedRevertPoints = 64 // revert points for most recent blocks in memory + startLvMap = 1 << 31 // map index assigned to init block + removedPointer = math.MaxUint64 // used in updateBatch to signal removed items + revertPointFrequency = 256 // frequency of revert points in database + cachedRevertPoints = 64 // revert points for most recent blocks in memory ) // updateLoop initializes and updates the log index structure according to the @@ -36,7 +36,7 @@ func (f *FilterMaps) updateLoop() { } var ( - headEventCh = make(chan core.ChainHeadEvent) + headEventCh = make(chan core.ChainHeadEvent, 10) sub = f.chain.SubscribeChainHeadEvent(headEventCh) head *types.Header stop bool @@ -231,7 +231,7 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool { log.Error("Error adding new block", "number", newHeader.Number, "hash", newHeader.Hash(), "error", err) break } - if update.updatedRangeLength() >= mapsPerEpoch { + if update.updatedRangeLength() >= f.mapsPerEpoch { // limit the amount of data updated in a single batch f.applyUpdateBatch(update) update = f.newUpdateBatch() @@ -336,12 +336,12 @@ func (f *FilterMaps) pruneTailPtr(tailTarget uint64) { // pointers from the database. This function also updates targetLvPointer. func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) { fmr := f.getRange() - tailMap := uint32(fmr.tailLvPointer >> logValuesPerMap) - targetMap := uint32(fmr.tailBlockLvPointer >> logValuesPerMap) + tailMap := uint32(fmr.tailLvPointer >> f.logValuesPerMap) + targetMap := uint32(fmr.tailBlockLvPointer >> f.logValuesPerMap) if tailMap >= targetMap { return } - lastEpoch := (targetMap - 1) >> logMapsPerEpoch + lastEpoch := (targetMap - 1) >> f.logMapsPerEpoch removeLvPtr, err := f.getMapBlockPtr(tailMap) if err != nil { log.Error("Error fetching tail map block pointer", "map index", tailMap, "error", err) @@ -352,12 +352,12 @@ func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) { lastLogged time.Time ) for tailMap < targetMap && !stopFn() { - tailEpoch := tailMap >> logMapsPerEpoch + tailEpoch := tailMap >> f.logMapsPerEpoch if tailEpoch == lastEpoch { f.pruneMaps(tailMap, targetMap, &removeLvPtr) break } - nextTailMap := (tailEpoch + 1) << logMapsPerEpoch + nextTailMap := (tailEpoch + 1) << f.logMapsPerEpoch f.pruneMaps(tailMap, nextTailMap, &removeLvPtr) tailMap = nextTailMap if !logged || time.Since(lastLogged) >= time.Second*10 { @@ -386,13 +386,13 @@ func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) { for mapIndex := first; mapIndex < afterLast; mapIndex++ { f.deleteMapBlockPtr(batch, mapIndex) } - for rowIndex := uint32(0); rowIndex < mapHeight; rowIndex++ { + for rowIndex := uint32(0); rowIndex < f.mapHeight; rowIndex++ { for mapIndex := first; mapIndex < afterLast; mapIndex++ { f.storeFilterMapRow(batch, mapIndex, rowIndex, emptyRow) } } fmr := f.getRange() - fmr.tailLvPointer = uint64(afterLast) << logValuesPerMap + fmr.tailLvPointer = uint64(afterLast) << f.logValuesPerMap if fmr.tailLvPointer > fmr.tailBlockLvPointer { log.Error("Cannot prune filter maps beyond tail block log value pointer", "tailLvPointer", fmr.tailLvPointer, "tailBlockLvPointer", fmr.tailBlockLvPointer) return @@ -407,11 +407,11 @@ func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) { // that can be written to the database in a single batch while the in-memory // representations in FilterMaps are also updated. type updateBatch struct { + f *FilterMaps filterMapsRange - maps map[uint32]*filterMap // nil rows are unchanged - getFilterMapRow func(mapIndex, rowIndex uint32) (FilterRow, error) - blockLvPointer map[uint64]uint64 // removedPointer means delete - mapBlockPtr map[uint32]uint64 // removedPointer means delete + maps map[uint32]filterMap // nil rows are unchanged + blockLvPointer map[uint64]uint64 // removedPointer means delete + mapBlockPtr map[uint32]uint64 // removedPointer means delete revertPoints map[uint64]*revertPoint firstMap, afterLastMap uint32 } @@ -422,9 +422,9 @@ func (f *FilterMaps) newUpdateBatch() *updateBatch { defer f.lock.RUnlock() return &updateBatch{ + f: f, filterMapsRange: f.filterMapsRange, - maps: make(map[uint32]*filterMap), - getFilterMapRow: f.getFilterMapRow, + maps: make(map[uint32]filterMap), blockLvPointer: make(map[uint64]uint64), mapBlockPtr: make(map[uint32]uint64), revertPoints: make(map[uint64]*revertPoint), @@ -455,10 +455,10 @@ func (f *FilterMaps) applyUpdateBatch(u *updateBatch) { } } // write filter map rows - for rowIndex := uint32(0); rowIndex < mapHeight; rowIndex++ { + for rowIndex := uint32(0); rowIndex < f.mapHeight; rowIndex++ { for mapIndex := u.firstMap; mapIndex < u.afterLastMap; mapIndex++ { if fm := u.maps[mapIndex]; fm != nil { - if row := (*fm)[rowIndex]; row != nil { + if row := fm[rowIndex]; row != nil { f.storeFilterMapRow(batch, mapIndex, rowIndex, row) } } @@ -488,7 +488,7 @@ func (f *FilterMaps) applyUpdateBatch(u *updateBatch) { rawdb.WriteRevertPoint(batch, b, &rawdb.RevertPoint{ BlockHash: rp.blockHash, MapIndex: rp.mapIndex, - RowLength: rp.rowLength[:], + RowLength: rp.rowLength, }) } } @@ -507,7 +507,7 @@ func (u *updateBatch) updatedRangeLength() uint32 { // tailEpoch returns the tail epoch index. func (u *updateBatch) tailEpoch() uint32 { - return uint32(u.tailBlockLvPointer >> (logValuesPerMap + logMapsPerEpoch)) + return uint32(u.tailBlockLvPointer >> (u.f.logValuesPerMap + u.f.logMapsPerEpoch)) } // getRowPtr returns a pointer to a FilterRow that can be modified. If the batch @@ -517,7 +517,7 @@ func (u *updateBatch) tailEpoch() uint32 { func (u *updateBatch) getRowPtr(mapIndex, rowIndex uint32) (*FilterRow, error) { fm := u.maps[mapIndex] if fm == nil { - fm = new(filterMap) + fm = make(filterMap, u.f.mapHeight) u.maps[mapIndex] = fm if mapIndex < u.firstMap || u.afterLastMap == 0 { u.firstMap = mapIndex @@ -526,9 +526,9 @@ func (u *updateBatch) getRowPtr(mapIndex, rowIndex uint32) (*FilterRow, error) { u.afterLastMap = mapIndex + 1 } } - rowPtr := &(*fm)[rowIndex] + rowPtr := &fm[rowIndex] if *rowPtr == nil { - if filterRow, err := u.getFilterMapRow(mapIndex, rowIndex); err == nil { + if filterRow, err := u.f.getFilterMapRow(mapIndex, rowIndex); err == nil { // filterRow is read only, copy before write *rowPtr = make(FilterRow, len(filterRow), len(filterRow)+8) copy(*rowPtr, filterRow) @@ -545,6 +545,7 @@ func (u *updateBatch) initWithBlock(header *types.Header, receipts types.Receipt return errors.New("already initialized") } u.initialized = true + startLvPointer := uint64(startLvMap) << u.f.logValuesPerMap u.headLvPointer, u.tailLvPointer, u.tailBlockLvPointer = startLvPointer, startLvPointer, startLvPointer u.headBlockNumber, u.tailBlockNumber = header.Number.Uint64()-1, header.Number.Uint64() u.headBlockHash, u.tailParentHash = header.ParentHash, header.ParentHash @@ -554,12 +555,12 @@ func (u *updateBatch) initWithBlock(header *types.Header, receipts types.Receipt // addValueToHead adds a single log value to the head of the log index. func (u *updateBatch) addValueToHead(logValue common.Hash) error { - mapIndex := uint32(u.headLvPointer >> logValuesPerMap) - rowPtr, err := u.getRowPtr(mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, logValue)) + mapIndex := uint32(u.headLvPointer >> u.f.logValuesPerMap) + rowPtr, err := u.getRowPtr(mapIndex, u.f.rowIndex(mapIndex>>u.f.logMapsPerEpoch, logValue)) if err != nil { return err } - column := columnIndex(u.headLvPointer, logValue) + column := u.f.columnIndex(u.headLvPointer, logValue) *rowPtr = append(*rowPtr, column) u.headLvPointer++ return nil @@ -577,11 +578,11 @@ func (u *updateBatch) addBlockToHead(header *types.Header, receipts types.Receip } number := header.Number.Uint64() u.blockLvPointer[number] = u.headLvPointer - startMap := uint32((u.headLvPointer + valuesPerMap - 1) >> logValuesPerMap) + startMap := uint32((u.headLvPointer + u.f.valuesPerMap - 1) >> u.f.logValuesPerMap) if err := iterateReceipts(receipts, u.addValueToHead); err != nil { return err } - stopMap := uint32((u.headLvPointer + valuesPerMap - 1) >> logValuesPerMap) + stopMap := uint32((u.headLvPointer + u.f.valuesPerMap - 1) >> u.f.logValuesPerMap) for m := startMap; m < stopMap; m++ { u.mapBlockPtr[m] = number } @@ -610,12 +611,12 @@ func (u *updateBatch) addValueToTail(logValue common.Hash) error { return nil // already added to the map } u.tailLvPointer-- - mapIndex := uint32(u.tailBlockLvPointer >> logValuesPerMap) - rowPtr, err := u.getRowPtr(mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, logValue)) + mapIndex := uint32(u.tailBlockLvPointer >> u.f.logValuesPerMap) + rowPtr, err := u.getRowPtr(mapIndex, u.f.rowIndex(mapIndex>>u.f.logMapsPerEpoch, logValue)) if err != nil { return err } - column := columnIndex(u.tailBlockLvPointer, logValue) + column := u.f.columnIndex(u.tailBlockLvPointer, logValue) *rowPtr = append(*rowPtr, 0) copy((*rowPtr)[1:], (*rowPtr)[:len(*rowPtr)-1]) (*rowPtr)[0] = column @@ -632,7 +633,7 @@ func (u *updateBatch) addBlockToTail(header *types.Header, receipts types.Receip return errors.New("addBlockToTail parent mismatch") } number := header.Number.Uint64() - stopMap := uint32((u.tailLvPointer + valuesPerMap - 1) >> logValuesPerMap) + stopMap := uint32((u.tailLvPointer + u.f.valuesPerMap - 1) >> u.f.logValuesPerMap) var cnt int if err := iterateReceiptsReverse(receipts, func(lv common.Hash) error { cnt++ @@ -640,7 +641,7 @@ func (u *updateBatch) addBlockToTail(header *types.Header, receipts types.Receip }); err != nil { return err } - startMap := uint32(u.tailLvPointer >> logValuesPerMap) + startMap := uint32(u.tailLvPointer >> u.f.logValuesPerMap) for m := startMap; m < stopMap; m++ { u.mapBlockPtr[m] = number } @@ -693,7 +694,7 @@ type revertPoint struct { blockNumber uint64 blockHash common.Hash mapIndex uint32 - rowLength [mapHeight]uint + rowLength []uint } // makeRevertPoint creates a new revertPoint. @@ -701,19 +702,20 @@ func (u *updateBatch) makeRevertPoint() (*revertPoint, error) { rp := &revertPoint{ blockNumber: u.headBlockNumber, blockHash: u.headBlockHash, - mapIndex: uint32(u.headLvPointer >> logValuesPerMap), + mapIndex: uint32(u.headLvPointer >> u.f.logValuesPerMap), + rowLength: make([]uint, u.f.mapHeight), } - if u.tailLvPointer > uint64(rp.mapIndex)< uint64(rp.mapIndex)<> logValuesPerMap) + afterLastMap := uint32((f.headLvPointer + f.valuesPerMap - 1) >> f.logValuesPerMap) if rp.mapIndex >= afterLastMap { return errors.New("cannot revert (head map behind revert point)") } - lvPointer := uint64(rp.mapIndex) << logValuesPerMap - for rowIndex, rowLen := range rp.rowLength[:] { + lvPointer := uint64(rp.mapIndex) << f.logValuesPerMap + for rowIndex, rowLen := range rp.rowLength { rowIndex := uint32(rowIndex) row, err := f.getFilterMapRow(rp.mapIndex, rowIndex) if err != nil { diff --git a/core/filtermaps/matcher.go b/core/filtermaps/matcher.go index 02fb6ed82d..64c7c5efe6 100644 --- a/core/filtermaps/matcher.go +++ b/core/filtermaps/matcher.go @@ -21,6 +21,7 @@ var ErrMatchAll = errors.New("match all patterns not supported") // once EIP-7745 is implemented and active, these functions can also be trustlessly // served by a remote prover. type MatcherBackend interface { + GetParams() *Params GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error) GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) @@ -139,6 +140,7 @@ func GetPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock // to that block range might be missing or incorrect. // Also note that the returned list may contain false positives. func getPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, error) { + params := backend.GetParams() // find the log value index range to search firstIndex, err := backend.GetBlockLvPointer(ctx, firstBlock) if err != nil { @@ -151,8 +153,8 @@ func getPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock if lastIndex > 0 { lastIndex-- } - firstMap, lastMap := uint32(firstIndex>>logValuesPerMap), uint32(lastIndex>>logValuesPerMap) - firstEpoch, lastEpoch := firstMap>>logMapsPerEpoch, lastMap>>logMapsPerEpoch + firstMap, lastMap := uint32(firstIndex>>params.logValuesPerMap), uint32(lastIndex>>params.logValuesPerMap) + firstEpoch, lastEpoch := firstMap>>params.logMapsPerEpoch, lastMap>>params.logMapsPerEpoch // build matcher according to the given filter criteria matchers := make([]matcher, len(topics)+1) @@ -178,13 +180,13 @@ func getPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock } // matcher is the final sequence matcher that signals a match when all underlying // matchers signal a match for consecutive log value indices. - matcher := newMatchSequence(matchers) + matcher := newMatchSequence(params, matchers) // processEpoch returns the potentially matching logs from the given epoch. processEpoch := func(epochIndex uint32) ([]*types.Log, error) { var logs []*types.Log // create a list of map indices to process - fm, lm := epochIndex<>logMapsPerEpoch, s.value)) + filterRow, err := s.backend.GetFilterMapRow(ctx, mapIndex, params.rowIndex(mapIndex>>params.logMapsPerEpoch, s.value)) if err != nil { return nil, err } - results[i] = filterRow.potentialMatches(mapIndex, s.value) + results[i] = params.potentialMatches(filterRow, mapIndex, s.value) } return results, nil } @@ -403,6 +406,7 @@ func mergeResults(results []potentialMatches) potentialMatches { // gives a match at X+offset. Note that matchSequence can be used recursively to // detect any log value sequence. type matchSequence struct { + params *Params base, next matcher offset uint64 // *EmptyRate == totalCount << 32 + emptyCount (atomically accessed) @@ -412,7 +416,7 @@ type matchSequence struct { // newMatchSequence creates a recursive sequence matcher from a list of underlying // matchers. The resulting matcher signals a match at log value index X when each // underlying matcher matchers[i] returns a match at X+i. -func newMatchSequence(matchers []matcher) matcher { +func newMatchSequence(params *Params, matchers []matcher) matcher { if len(matchers) == 0 { panic("zero length sequence matchers are not allowed") } @@ -420,7 +424,8 @@ func newMatchSequence(matchers []matcher) matcher { return matchers[0] } return &matchSequence{ - base: newMatchSequence(matchers[:len(matchers)-1]), + params: params, + base: newMatchSequence(params, matchers[:len(matchers)-1]), next: matchers[len(matchers)-1], offset: uint64(len(matchers) - 1), } @@ -461,7 +466,7 @@ func (m *matchSequence) getMatches(ctx context.Context, mapIndices []uint32) ([] nextIndices = append(nextIndices, mapIndex) lastAdded = mapIndex } - if !baseFirst || baseRes[i] == nil || baseRes[i][len(baseRes[i])-1] >= (uint64(mapIndex+1)<= (uint64(mapIndex+1)< 0 && next1[len(next1)-1] >= (uint64(mapIndex)< 0 && next2[0] < (uint64(mapIndex+1)< 0 && next1[len(next1)-1] >= (uint64(mapIndex)< 0 && next2[0] < (uint64(mapIndex+1)< f.headBlockNumber { - fm.lastValid = f.headBlockNumber - } - if fm.firstValid > fm.lastValid { - fm.valid = false - } - } +// GetParams returns the filtermaps parameters. +// GetParams implements MatcherBackend. +func (fm *FilterMapsMatcherBackend) GetParams() *Params { + return &fm.f.Params } // Close removes the matcher from the set of active matchers and ensures that // any SyncLogIndex calls are cancelled. +// Close implements MatcherBackend. func (fm *FilterMapsMatcherBackend) Close() { fm.f.lock.Lock() defer fm.f.lock.Unlock() @@ -156,3 +139,27 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange return SyncRange{}, ctx.Err() } } + +// updateMatchersValidRange iterates through active matchers and limits their +// valid range with the current indexed range. This function should be called +// whenever a part of the log index has been removed, before adding new blocks +// to it. +func (f *FilterMaps) updateMatchersValidRange() { + for fm := range f.matchers { + if !f.initialized { + fm.valid = false + } + if !fm.valid { + continue + } + if fm.firstValid < f.tailBlockNumber { + fm.firstValid = f.tailBlockNumber + } + if fm.lastValid > f.headBlockNumber { + fm.lastValid = f.headBlockNumber + } + if fm.firstValid > fm.lastValid { + fm.valid = false + } + } +} diff --git a/core/filtermaps/math.go b/core/filtermaps/math.go new file mode 100644 index 0000000000..b0132df913 --- /dev/null +++ b/core/filtermaps/math.go @@ -0,0 +1,180 @@ +package filtermaps + +import ( + "crypto/sha256" + "encoding/binary" + "sort" + + "github.com/ethereum/go-ethereum/common" +) + +type Params struct { + logMapHeight uint // log2(mapHeight) + logMapsPerEpoch uint // log2(mmapsPerEpochapsPerEpoch) + logValuesPerMap uint // log2(logValuesPerMap) + // derived fields + mapHeight uint32 // filter map height (number of rows) + mapsPerEpoch uint32 // number of maps in an epoch + valuesPerMap uint64 // number of log values marked on each filter map +} + +var DefaultParams = Params{ + logMapHeight: 12, + logMapsPerEpoch: 6, + logValuesPerMap: 16, +} + +func (p *Params) deriveFields() { + p.mapHeight = uint32(1) << p.logMapHeight + p.mapsPerEpoch = uint32(1) << p.logMapsPerEpoch + p.valuesPerMap = uint64(1) << p.logValuesPerMap +} + +// addressValue returns the log value hash of a log emitting address. +func addressValue(address common.Address) common.Hash { + var result common.Hash + hasher := sha256.New() + hasher.Write(address[:]) + hasher.Sum(result[:0]) + return result +} + +// topicValue returns the log value hash of a log topic. +func topicValue(topic common.Hash) common.Hash { + var result common.Hash + hasher := sha256.New() + hasher.Write(topic[:]) + hasher.Sum(result[:0]) + return result +} + +// rowIndex returns the row index in which the given log value should be marked +// during the given epoch. Note that row assignments are re-shuffled in every +// epoch in order to ensure that even though there are always a few more heavily +// used rows due to very popular addresses and topics, these will not make search +// for other log values very expensive. Even if certain values are occasionally +// sorted into these heavy rows, in most of the epochs they are placed in average +// length rows. +func (p *Params) rowIndex(epochIndex uint32, logValue common.Hash) uint32 { + hasher := sha256.New() + hasher.Write(logValue[:]) + var indexEnc [4]byte + binary.LittleEndian.PutUint32(indexEnc[:], epochIndex) + hasher.Write(indexEnc[:]) + var hash common.Hash + hasher.Sum(hash[:0]) + return binary.LittleEndian.Uint32(hash[:4]) % p.mapHeight +} + +// columnIndex returns the column index that should be added to the appropriate +// row in order to place a mark for the next log value. +func (p *Params) columnIndex(lvIndex uint64, logValue common.Hash) uint32 { + x := uint32(lvIndex % p.valuesPerMap) // log value sub-index + transformHash := transformHash(uint32(lvIndex/p.valuesPerMap), logValue) + // apply column index transformation function + x += binary.LittleEndian.Uint32(transformHash[0:4]) + x *= binary.LittleEndian.Uint32(transformHash[4:8])*2 + 1 + x ^= binary.LittleEndian.Uint32(transformHash[8:12]) + x *= binary.LittleEndian.Uint32(transformHash[12:16])*2 + 1 + x += binary.LittleEndian.Uint32(transformHash[16:20]) + x *= binary.LittleEndian.Uint32(transformHash[20:24])*2 + 1 + x ^= binary.LittleEndian.Uint32(transformHash[24:28]) + x *= binary.LittleEndian.Uint32(transformHash[28:32])*2 + 1 + return x +} + +// transformHash calculates a hash specific to a given map and log value hash +// that defines a bijective function on the uint32 range. This function is used +// to transform the log value sub-index (distance from the first index of the map) +// into a 32 bit column index, then applied in reverse when searching for potential +// matches for a given log value. +func transformHash(mapIndex uint32, logValue common.Hash) (result common.Hash) { + hasher := sha256.New() + hasher.Write(logValue[:]) + var indexEnc [4]byte + binary.LittleEndian.PutUint32(indexEnc[:], mapIndex) + hasher.Write(indexEnc[:]) + hasher.Sum(result[:0]) + return +} + +// potentialMatches returns the list of log value indices potentially matching +// the given log value hash in the range of the filter map the row belongs to. +// Note that the list of indices is always sorted and potential duplicates are +// removed. Though the column indices are stored in the same order they were +// added and therefore the true matches are automatically reverse transformed +// in the right order, false positives can ruin this property. Since these can +// only be separated from true matches after the combined pattern matching of the +// outputs of individual log value matchers and this pattern matcher assumes a +// sorted and duplicate-free list of indices, we should ensure these properties +// here. +func (p *Params) potentialMatches(row FilterRow, mapIndex uint32, logValue common.Hash) potentialMatches { + results := make(potentialMatches, 0, 8) + transformHash := transformHash(mapIndex, logValue) + sub1 := binary.LittleEndian.Uint32(transformHash[0:4]) + mul1 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[4:8])*2 + 1) + xor1 := binary.LittleEndian.Uint32(transformHash[8:12]) + mul2 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[12:16])*2 + 1) + sub2 := binary.LittleEndian.Uint32(transformHash[16:20]) + mul3 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[20:24])*2 + 1) + xor2 := binary.LittleEndian.Uint32(transformHash[24:28]) + mul4 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[28:32])*2 + 1) + // perform reverse column index transformation on all column indices of the row. + // if a column index was added by the searched log value then the reverse + // transform will yield a valid log value sub-index of the given map. + // Column index is 32 bits long while there are 2**16 valid log value indices + // in the map's range, so this can also happen by accident with 1 in 2**16 + // chance, in which case we have a false positive. + for _, columnIndex := range row { + if potentialSubIndex := (((((((columnIndex * mul4) ^ xor2) * mul3) - sub2) * mul2) ^ xor1) * mul1) - sub1; potentialSubIndex < uint32(p.valuesPerMap) { + results = append(results, uint64(mapIndex)< 1 { + q := a / m + m, a = a%m, m + x, y = y, x-q*y + } + if x < 0 { + x += m0 + } + return uint32(x) +} diff --git a/core/filtermaps/filtermaps_test.go b/core/filtermaps/math_test.go similarity index 78% rename from core/filtermaps/filtermaps_test.go rename to core/filtermaps/math_test.go index 70a4ce3b14..5cf76dd34a 100644 --- a/core/filtermaps/filtermaps_test.go +++ b/core/filtermaps/math_test.go @@ -8,14 +8,17 @@ import ( ) func TestSingleMatch(t *testing.T) { + params := DefaultParams + params.deriveFields() + for count := 0; count < 100000; count++ { // generate a row with a single random entry mapIndex := rand.Uint32() - lvIndex := uint64(mapIndex)<> 32) if falsePositives < expFalse/2 || falsePositives > expFalse*3/2 { t.Fatalf("False positive rate out of expected range (got %d, expected %d +-50%%)", falsePositives, expFalse) } diff --git a/eth/backend.go b/eth/backend.go index 442f97eb2d..b5a5be3994 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -216,7 +216,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if err != nil { return nil, err } - eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain, config.LogHistory, config.LogNoHistory) + eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain, filtermaps.DefaultParams, config.LogHistory, config.LogNoHistory) if config.BlobPool.Datadir != "" { config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)