core/filtermaps: moved math stuff to separate file, added Params

Zsolt Felfoldi 5 months ago
parent e82c9994c1
commit d11db22a96
  1. 200
      core/filtermaps/filtermaps.go
  2. 101
      core/filtermaps/indexer.go
  3. 39
      core/filtermaps/matcher.go
  4. 51
      core/filtermaps/matcher_backend.go
  5. 180
      core/filtermaps/math.go
  6. 30
      core/filtermaps/math_test.go
  7. 2
      eth/backend.go

@ -1,10 +1,7 @@
package filtermaps package filtermaps
import ( import (
"crypto/sha256"
"encoding/binary"
"errors" "errors"
"sort"
"sync" "sync"
"time" "time"
@ -18,16 +15,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
const ( const headCacheSize = 8 // maximum number of recent filter maps cached in memory
logMapHeight = 12 // log2(mapHeight)
mapHeight = 1 << logMapHeight // filter map height (number of rows)
logMapsPerEpoch = 6 // log2(mmapsPerEpochapsPerEpoch)
mapsPerEpoch = 1 << logMapsPerEpoch // number of maps in an epoch
logValuesPerMap = 16 // log2(logValuesPerMap)
valuesPerMap = 1 << logValuesPerMap // number of log values marked on each filter map
headCacheSize = 8 // maximum number of recent filter maps cached in memory
)
// blockchain defines functions required by the FilterMaps log indexer. // blockchain defines functions required by the FilterMaps log indexer.
type blockchain interface { type blockchain interface {
@ -51,6 +39,7 @@ type FilterMaps struct {
history uint64 history uint64
noHistory bool noHistory bool
Params
filterMapsRange filterMapsRange
chain blockchain chain blockchain
matcherSyncCh chan *FilterMapsMatcherBackend matcherSyncCh chan *FilterMapsMatcherBackend
@ -60,7 +49,7 @@ type FilterMaps struct {
// while updating the structure. Note that the set of cached maps depends // while updating the structure. Note that the set of cached maps depends
// only on filterMapsRange and rows of other maps are not cached here. // only on filterMapsRange and rows of other maps are not cached here.
filterMapLock sync.Mutex filterMapLock sync.Mutex
filterMapCache map[uint32]*filterMap filterMapCache map[uint32]filterMap
blockPtrCache *lru.Cache[uint32, uint64] blockPtrCache *lru.Cache[uint32, uint64]
lvPointerCache *lru.Cache[uint64, uint64] lvPointerCache *lru.Cache[uint64, uint64]
revertPoints map[uint64]*revertPoint revertPoints map[uint64]*revertPoint
@ -73,7 +62,7 @@ type FilterMaps struct {
// It can be used as a memory cache or an overlay while preparing a batch of // It can be used as a memory cache or an overlay while preparing a batch of
// changes to the structure. In either case a nil value should be interpreted // changes to the structure. In either case a nil value should be interpreted
// as transparent (uncached/unchanged). // as transparent (uncached/unchanged).
type filterMap [mapHeight]FilterRow type filterMap []FilterRow
// FilterRow encodes a single row of a filter map as a list of column indices. // FilterRow encodes a single row of a filter map as a list of column indices.
// Note that the values are always stored in the same order as they were added // Note that the values are always stored in the same order as they were added
@ -105,17 +94,19 @@ type filterMapsRange struct {
// NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep // NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep
// the structure in sync with the given blockchain. // the structure in sync with the given blockchain.
func NewFilterMaps(db ethdb.Database, chain blockchain, history uint64, noHistory bool) *FilterMaps { func NewFilterMaps(db ethdb.Database, chain blockchain, params Params, history uint64, noHistory bool) *FilterMaps {
rs, err := rawdb.ReadFilterMapsRange(db) rs, err := rawdb.ReadFilterMapsRange(db)
if err != nil { if err != nil {
log.Error("Error reading log index range", "error", err) log.Error("Error reading log index range", "error", err)
} }
params.deriveFields()
fm := &FilterMaps{ fm := &FilterMaps{
db: db, db: db,
chain: chain, chain: chain,
closeCh: make(chan struct{}), closeCh: make(chan struct{}),
history: history, history: history,
noHistory: noHistory, noHistory: noHistory,
Params: params,
filterMapsRange: filterMapsRange{ filterMapsRange: filterMapsRange{
initialized: rs.Initialized, initialized: rs.Initialized,
headLvPointer: rs.HeadLvPointer, headLvPointer: rs.HeadLvPointer,
@ -127,7 +118,7 @@ func NewFilterMaps(db ethdb.Database, chain blockchain, history uint64, noHistor
}, },
matcherSyncCh: make(chan *FilterMapsMatcherBackend), matcherSyncCh: make(chan *FilterMapsMatcherBackend),
matchers: make(map[*FilterMapsMatcherBackend]struct{}), matchers: make(map[*FilterMapsMatcherBackend]struct{}),
filterMapCache: make(map[uint32]*filterMap), filterMapCache: make(map[uint32]filterMap),
blockPtrCache: lru.NewCache[uint32, uint64](1000), blockPtrCache: lru.NewCache[uint32, uint64](1000),
lvPointerCache: lru.NewCache[uint64, uint64](1000), lvPointerCache: lru.NewCache[uint64, uint64](1000),
revertPoints: make(map[uint64]*revertPoint), revertPoints: make(map[uint64]*revertPoint),
@ -154,7 +145,7 @@ func (f *FilterMaps) Close() {
func (f *FilterMaps) reset() bool { func (f *FilterMaps) reset() bool {
f.lock.Lock() f.lock.Lock()
f.filterMapsRange = filterMapsRange{} f.filterMapsRange = filterMapsRange{}
f.filterMapCache = make(map[uint32]*filterMap) f.filterMapCache = make(map[uint32]filterMap)
f.revertPoints = make(map[uint64]*revertPoint) f.revertPoints = make(map[uint64]*revertPoint)
f.blockPtrCache.Purge() f.blockPtrCache.Purge()
f.lvPointerCache.Purge() f.lvPointerCache.Purge()
@ -242,21 +233,21 @@ func (f *FilterMaps) updateMapCache() {
f.filterMapLock.Lock() f.filterMapLock.Lock()
defer f.filterMapLock.Unlock() defer f.filterMapLock.Unlock()
newFilterMapCache := make(map[uint32]*filterMap) newFilterMapCache := make(map[uint32]filterMap)
firstMap, afterLastMap := uint32(f.tailBlockLvPointer>>logValuesPerMap), uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap) firstMap, afterLastMap := uint32(f.tailBlockLvPointer>>f.logValuesPerMap), uint32((f.headLvPointer+f.valuesPerMap-1)>>f.logValuesPerMap)
headCacheFirst := firstMap + 1 headCacheFirst := firstMap + 1
if afterLastMap > headCacheFirst+headCacheSize { if afterLastMap > headCacheFirst+headCacheSize {
headCacheFirst = afterLastMap - headCacheSize headCacheFirst = afterLastMap - headCacheSize
} }
fm := f.filterMapCache[firstMap] fm := f.filterMapCache[firstMap]
if fm == nil { if fm == nil {
fm = new(filterMap) fm = make(filterMap, f.mapHeight)
} }
newFilterMapCache[firstMap] = fm newFilterMapCache[firstMap] = fm
for mapIndex := headCacheFirst; mapIndex < afterLastMap; mapIndex++ { for mapIndex := headCacheFirst; mapIndex < afterLastMap; mapIndex++ {
fm := f.filterMapCache[mapIndex] fm := f.filterMapCache[mapIndex]
if fm == nil { if fm == nil {
fm = new(filterMap) fm = make(filterMap, f.mapHeight)
} }
newFilterMapCache[mapIndex] = fm newFilterMapCache[mapIndex] = fm
} }
@ -275,7 +266,7 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
return nil, nil return nil, nil
} }
// find possible block range based on map to block pointers // find possible block range based on map to block pointers
mapIndex := uint32(lvIndex >> logValuesPerMap) mapIndex := uint32(lvIndex >> f.logValuesPerMap)
firstBlockNumber, err := f.getMapBlockPtr(mapIndex) firstBlockNumber, err := f.getMapBlockPtr(mapIndex)
if err != nil { if err != nil {
return nil, err return nil, err
@ -284,7 +275,7 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
firstBlockNumber = f.tailBlockNumber firstBlockNumber = f.tailBlockNumber
} }
var lastBlockNumber uint64 var lastBlockNumber uint64
if mapIndex+1 < uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap) { if mapIndex+1 < uint32((f.headLvPointer+f.valuesPerMap-1)>>f.logValuesPerMap) {
lastBlockNumber, err = f.getMapBlockPtr(mapIndex + 1) lastBlockNumber, err = f.getMapBlockPtr(mapIndex + 1)
if err != nil { if err != nil {
return nil, err return nil, err
@ -345,7 +336,7 @@ func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32) (FilterRow, erro
if fm != nil && fm[rowIndex] != nil { if fm != nil && fm[rowIndex] != nil {
return fm[rowIndex], nil return fm[rowIndex], nil
} }
row, err := rawdb.ReadFilterMapRow(f.db, mapRowIndex(mapIndex, rowIndex)) row, err := rawdb.ReadFilterMapRow(f.db, f.mapRowIndex(mapIndex, rowIndex))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -364,9 +355,9 @@ func (f *FilterMaps) storeFilterMapRow(batch ethdb.Batch, mapIndex, rowIndex uin
defer f.filterMapLock.Unlock() defer f.filterMapLock.Unlock()
if fm := f.filterMapCache[mapIndex]; fm != nil { if fm := f.filterMapCache[mapIndex]; fm != nil {
(*fm)[rowIndex] = row fm[rowIndex] = row
} }
rawdb.WriteFilterMapRow(batch, mapRowIndex(mapIndex, rowIndex), []uint32(row)) rawdb.WriteFilterMapRow(batch, f.mapRowIndex(mapIndex, rowIndex), []uint32(row))
} }
// mapRowIndex calculates the unified storage index where the given row of the // mapRowIndex calculates the unified storage index where the given row of the
@ -375,9 +366,9 @@ func (f *FilterMaps) storeFilterMapRow(batch ethdb.Batch, mapIndex, rowIndex uin
// same data proximity reasons it is also suitable for database representation. // same data proximity reasons it is also suitable for database representation.
// See also: // See also:
// https://eips.ethereum.org/EIPS/eip-7745#hash-tree-structure // https://eips.ethereum.org/EIPS/eip-7745#hash-tree-structure
func mapRowIndex(mapIndex, rowIndex uint32) uint64 { func (f *FilterMaps) mapRowIndex(mapIndex, rowIndex uint32) uint64 {
epochIndex, mapSubIndex := mapIndex>>logMapsPerEpoch, mapIndex%mapsPerEpoch epochIndex, mapSubIndex := mapIndex>>f.logMapsPerEpoch, mapIndex&(f.mapsPerEpoch-1)
return (uint64(epochIndex)<<logMapHeight+uint64(rowIndex))<<logMapsPerEpoch + uint64(mapSubIndex) return (uint64(epochIndex)<<f.logMapHeight+uint64(rowIndex))<<f.logMapsPerEpoch + uint64(mapSubIndex)
} }
// getBlockLvPointer returns the starting log value index where the log values // getBlockLvPointer returns the starting log value index where the log values
@ -440,152 +431,3 @@ func (f *FilterMaps) deleteMapBlockPtr(batch ethdb.Batch, mapIndex uint32) {
f.blockPtrCache.Remove(mapIndex) f.blockPtrCache.Remove(mapIndex)
rawdb.DeleteFilterMapBlockPtr(batch, mapIndex) rawdb.DeleteFilterMapBlockPtr(batch, mapIndex)
} }
// addressValue returns the log value hash of a log emitting address.
func addressValue(address common.Address) common.Hash {
var result common.Hash
hasher := sha256.New()
hasher.Write(address[:])
hasher.Sum(result[:0])
return result
}
// topicValue returns the log value hash of a log topic.
func topicValue(topic common.Hash) common.Hash {
var result common.Hash
hasher := sha256.New()
hasher.Write(topic[:])
hasher.Sum(result[:0])
return result
}
// rowIndex returns the row index in which the given log value should be marked
// during the given epoch. Note that row assignments are re-shuffled in every
// epoch in order to ensure that even though there are always a few more heavily
// used rows due to very popular addresses and topics, these will not make search
// for other log values very expensive. Even if certain values are occasionally
// sorted into these heavy rows, in most of the epochs they are placed in average
// length rows.
func rowIndex(epochIndex uint32, logValue common.Hash) uint32 {
hasher := sha256.New()
hasher.Write(logValue[:])
var indexEnc [4]byte
binary.LittleEndian.PutUint32(indexEnc[:], epochIndex)
hasher.Write(indexEnc[:])
var hash common.Hash
hasher.Sum(hash[:0])
return binary.LittleEndian.Uint32(hash[:4]) % mapHeight
}
// columnIndex returns the column index that should be added to the appropriate
// row in order to place a mark for the next log value.
func columnIndex(lvIndex uint64, logValue common.Hash) uint32 {
x := uint32(lvIndex % valuesPerMap) // log value sub-index
transformHash := transformHash(uint32(lvIndex/valuesPerMap), logValue)
// apply column index transformation function
x += binary.LittleEndian.Uint32(transformHash[0:4])
x *= binary.LittleEndian.Uint32(transformHash[4:8])*2 + 1
x ^= binary.LittleEndian.Uint32(transformHash[8:12])
x *= binary.LittleEndian.Uint32(transformHash[12:16])*2 + 1
x += binary.LittleEndian.Uint32(transformHash[16:20])
x *= binary.LittleEndian.Uint32(transformHash[20:24])*2 + 1
x ^= binary.LittleEndian.Uint32(transformHash[24:28])
x *= binary.LittleEndian.Uint32(transformHash[28:32])*2 + 1
return x
}
// transformHash calculates a hash specific to a given map and log value hash
// that defines a bijective function on the uint32 range. This function is used
// to transform the log value sub-index (distance from the first index of the map)
// into a 32 bit column index, then applied in reverse when searching for potential
// matches for a given log value.
func transformHash(mapIndex uint32, logValue common.Hash) (result common.Hash) {
hasher := sha256.New()
hasher.Write(logValue[:])
var indexEnc [4]byte
binary.LittleEndian.PutUint32(indexEnc[:], mapIndex)
hasher.Write(indexEnc[:])
hasher.Sum(result[:0])
return
}
// potentialMatches returns the list of log value indices potentially matching
// the given log value hash in the range of the filter map the row belongs to.
// Note that the list of indices is always sorted and potential duplicates are
// removed. Though the column indices are stored in the same order they were
// added and therefore the true matches are automatically reverse transformed
// in the right order, false positives can ruin this property. Since these can
// only be separated from true matches after the combined pattern matching of the
// outputs of individual log value matchers and this pattern matcher assumes a
// sorted and duplicate-free list of indices, we should ensure these properties
// here.
func (row FilterRow) potentialMatches(mapIndex uint32, logValue common.Hash) potentialMatches {
results := make(potentialMatches, 0, 8)
transformHash := transformHash(mapIndex, logValue)
sub1 := binary.LittleEndian.Uint32(transformHash[0:4])
mul1 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[4:8])*2 + 1)
xor1 := binary.LittleEndian.Uint32(transformHash[8:12])
mul2 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[12:16])*2 + 1)
sub2 := binary.LittleEndian.Uint32(transformHash[16:20])
mul3 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[20:24])*2 + 1)
xor2 := binary.LittleEndian.Uint32(transformHash[24:28])
mul4 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[28:32])*2 + 1)
// perform reverse column index transformation on all column indices of the row.
// if a column index was added by the searched log value then the reverse
// transform will yield a valid log value sub-index of the given map.
// Column index is 32 bits long while there are 2**16 valid log value indices
// in the map's range, so this can also happen by accident with 1 in 2**16
// chance, in which case we have a false positive.
for _, columnIndex := range row {
if potentialSubIndex := (((((((columnIndex * mul4) ^ xor2) * mul3) - sub2) * mul2) ^ xor1) * mul1) - sub1; potentialSubIndex < valuesPerMap {
results = append(results, uint64(mapIndex)*valuesPerMap+uint64(potentialSubIndex))
}
}
sort.Sort(results)
// remove duplicates
j := 0
for i, match := range results {
if i == 0 || match != results[i-1] {
results[j] = results[i]
j++
}
}
return results[:j]
}
// potentialMatches is a strictly monotonically increasing list of log value
// indices in the range of a filter map that are potential matches for certain
// filter criteria.
// Note that nil is used as a wildcard and therefore means that all log value
// indices in the filter map range are potential matches. If there are no
// potential matches in the given map's range then an empty slice should be used.
type potentialMatches []uint64
// noMatches means there are no potential matches in a given filter map's range.
var noMatches = potentialMatches{}
func (p potentialMatches) Len() int { return len(p) }
func (p potentialMatches) Less(i, j int) bool { return p[i] < p[j] }
func (p potentialMatches) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// uint32ModInverse takes an odd 32 bit number and returns its modular
// multiplicative inverse (mod 2**32), meaning that for any uint32 x and odd y
// x * y * uint32ModInverse(y) == 1.
func uint32ModInverse(v uint32) uint32 {
if v&1 == 0 {
panic("uint32ModInverse called with even argument")
}
m := int64(1) << 32
m0 := m
a := int64(v)
x, y := int64(1), int64(0)
for a > 1 {
q := a / m
m, a = a%m, m
x, y = y, x-q*y
}
if x < 0 {
x += m0
}
return uint32(x)
}

@ -13,10 +13,10 @@ import (
) )
const ( const (
startLvPointer = valuesPerMap << 31 // log value index assigned to init block startLvMap = 1 << 31 // map index assigned to init block
removedPointer = math.MaxUint64 // used in updateBatch to signal removed items removedPointer = math.MaxUint64 // used in updateBatch to signal removed items
revertPointFrequency = 256 // frequency of revert points in database revertPointFrequency = 256 // frequency of revert points in database
cachedRevertPoints = 64 // revert points for most recent blocks in memory cachedRevertPoints = 64 // revert points for most recent blocks in memory
) )
// updateLoop initializes and updates the log index structure according to the // updateLoop initializes and updates the log index structure according to the
@ -36,7 +36,7 @@ func (f *FilterMaps) updateLoop() {
} }
var ( var (
headEventCh = make(chan core.ChainHeadEvent) headEventCh = make(chan core.ChainHeadEvent, 10)
sub = f.chain.SubscribeChainHeadEvent(headEventCh) sub = f.chain.SubscribeChainHeadEvent(headEventCh)
head *types.Header head *types.Header
stop bool stop bool
@ -231,7 +231,7 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
log.Error("Error adding new block", "number", newHeader.Number, "hash", newHeader.Hash(), "error", err) log.Error("Error adding new block", "number", newHeader.Number, "hash", newHeader.Hash(), "error", err)
break break
} }
if update.updatedRangeLength() >= mapsPerEpoch { if update.updatedRangeLength() >= f.mapsPerEpoch {
// limit the amount of data updated in a single batch // limit the amount of data updated in a single batch
f.applyUpdateBatch(update) f.applyUpdateBatch(update)
update = f.newUpdateBatch() update = f.newUpdateBatch()
@ -336,12 +336,12 @@ func (f *FilterMaps) pruneTailPtr(tailTarget uint64) {
// pointers from the database. This function also updates targetLvPointer. // pointers from the database. This function also updates targetLvPointer.
func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) { func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) {
fmr := f.getRange() fmr := f.getRange()
tailMap := uint32(fmr.tailLvPointer >> logValuesPerMap) tailMap := uint32(fmr.tailLvPointer >> f.logValuesPerMap)
targetMap := uint32(fmr.tailBlockLvPointer >> logValuesPerMap) targetMap := uint32(fmr.tailBlockLvPointer >> f.logValuesPerMap)
if tailMap >= targetMap { if tailMap >= targetMap {
return return
} }
lastEpoch := (targetMap - 1) >> logMapsPerEpoch lastEpoch := (targetMap - 1) >> f.logMapsPerEpoch
removeLvPtr, err := f.getMapBlockPtr(tailMap) removeLvPtr, err := f.getMapBlockPtr(tailMap)
if err != nil { if err != nil {
log.Error("Error fetching tail map block pointer", "map index", tailMap, "error", err) log.Error("Error fetching tail map block pointer", "map index", tailMap, "error", err)
@ -352,12 +352,12 @@ func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) {
lastLogged time.Time lastLogged time.Time
) )
for tailMap < targetMap && !stopFn() { for tailMap < targetMap && !stopFn() {
tailEpoch := tailMap >> logMapsPerEpoch tailEpoch := tailMap >> f.logMapsPerEpoch
if tailEpoch == lastEpoch { if tailEpoch == lastEpoch {
f.pruneMaps(tailMap, targetMap, &removeLvPtr) f.pruneMaps(tailMap, targetMap, &removeLvPtr)
break break
} }
nextTailMap := (tailEpoch + 1) << logMapsPerEpoch nextTailMap := (tailEpoch + 1) << f.logMapsPerEpoch
f.pruneMaps(tailMap, nextTailMap, &removeLvPtr) f.pruneMaps(tailMap, nextTailMap, &removeLvPtr)
tailMap = nextTailMap tailMap = nextTailMap
if !logged || time.Since(lastLogged) >= time.Second*10 { if !logged || time.Since(lastLogged) >= time.Second*10 {
@ -386,13 +386,13 @@ func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) {
for mapIndex := first; mapIndex < afterLast; mapIndex++ { for mapIndex := first; mapIndex < afterLast; mapIndex++ {
f.deleteMapBlockPtr(batch, mapIndex) f.deleteMapBlockPtr(batch, mapIndex)
} }
for rowIndex := uint32(0); rowIndex < mapHeight; rowIndex++ { for rowIndex := uint32(0); rowIndex < f.mapHeight; rowIndex++ {
for mapIndex := first; mapIndex < afterLast; mapIndex++ { for mapIndex := first; mapIndex < afterLast; mapIndex++ {
f.storeFilterMapRow(batch, mapIndex, rowIndex, emptyRow) f.storeFilterMapRow(batch, mapIndex, rowIndex, emptyRow)
} }
} }
fmr := f.getRange() fmr := f.getRange()
fmr.tailLvPointer = uint64(afterLast) << logValuesPerMap fmr.tailLvPointer = uint64(afterLast) << f.logValuesPerMap
if fmr.tailLvPointer > fmr.tailBlockLvPointer { if fmr.tailLvPointer > fmr.tailBlockLvPointer {
log.Error("Cannot prune filter maps beyond tail block log value pointer", "tailLvPointer", fmr.tailLvPointer, "tailBlockLvPointer", fmr.tailBlockLvPointer) log.Error("Cannot prune filter maps beyond tail block log value pointer", "tailLvPointer", fmr.tailLvPointer, "tailBlockLvPointer", fmr.tailBlockLvPointer)
return return
@ -407,11 +407,11 @@ func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) {
// that can be written to the database in a single batch while the in-memory // that can be written to the database in a single batch while the in-memory
// representations in FilterMaps are also updated. // representations in FilterMaps are also updated.
type updateBatch struct { type updateBatch struct {
f *FilterMaps
filterMapsRange filterMapsRange
maps map[uint32]*filterMap // nil rows are unchanged maps map[uint32]filterMap // nil rows are unchanged
getFilterMapRow func(mapIndex, rowIndex uint32) (FilterRow, error) blockLvPointer map[uint64]uint64 // removedPointer means delete
blockLvPointer map[uint64]uint64 // removedPointer means delete mapBlockPtr map[uint32]uint64 // removedPointer means delete
mapBlockPtr map[uint32]uint64 // removedPointer means delete
revertPoints map[uint64]*revertPoint revertPoints map[uint64]*revertPoint
firstMap, afterLastMap uint32 firstMap, afterLastMap uint32
} }
@ -422,9 +422,9 @@ func (f *FilterMaps) newUpdateBatch() *updateBatch {
defer f.lock.RUnlock() defer f.lock.RUnlock()
return &updateBatch{ return &updateBatch{
f: f,
filterMapsRange: f.filterMapsRange, filterMapsRange: f.filterMapsRange,
maps: make(map[uint32]*filterMap), maps: make(map[uint32]filterMap),
getFilterMapRow: f.getFilterMapRow,
blockLvPointer: make(map[uint64]uint64), blockLvPointer: make(map[uint64]uint64),
mapBlockPtr: make(map[uint32]uint64), mapBlockPtr: make(map[uint32]uint64),
revertPoints: make(map[uint64]*revertPoint), revertPoints: make(map[uint64]*revertPoint),
@ -455,10 +455,10 @@ func (f *FilterMaps) applyUpdateBatch(u *updateBatch) {
} }
} }
// write filter map rows // write filter map rows
for rowIndex := uint32(0); rowIndex < mapHeight; rowIndex++ { for rowIndex := uint32(0); rowIndex < f.mapHeight; rowIndex++ {
for mapIndex := u.firstMap; mapIndex < u.afterLastMap; mapIndex++ { for mapIndex := u.firstMap; mapIndex < u.afterLastMap; mapIndex++ {
if fm := u.maps[mapIndex]; fm != nil { if fm := u.maps[mapIndex]; fm != nil {
if row := (*fm)[rowIndex]; row != nil { if row := fm[rowIndex]; row != nil {
f.storeFilterMapRow(batch, mapIndex, rowIndex, row) f.storeFilterMapRow(batch, mapIndex, rowIndex, row)
} }
} }
@ -488,7 +488,7 @@ func (f *FilterMaps) applyUpdateBatch(u *updateBatch) {
rawdb.WriteRevertPoint(batch, b, &rawdb.RevertPoint{ rawdb.WriteRevertPoint(batch, b, &rawdb.RevertPoint{
BlockHash: rp.blockHash, BlockHash: rp.blockHash,
MapIndex: rp.mapIndex, MapIndex: rp.mapIndex,
RowLength: rp.rowLength[:], RowLength: rp.rowLength,
}) })
} }
} }
@ -507,7 +507,7 @@ func (u *updateBatch) updatedRangeLength() uint32 {
// tailEpoch returns the tail epoch index. // tailEpoch returns the tail epoch index.
func (u *updateBatch) tailEpoch() uint32 { func (u *updateBatch) tailEpoch() uint32 {
return uint32(u.tailBlockLvPointer >> (logValuesPerMap + logMapsPerEpoch)) return uint32(u.tailBlockLvPointer >> (u.f.logValuesPerMap + u.f.logMapsPerEpoch))
} }
// getRowPtr returns a pointer to a FilterRow that can be modified. If the batch // getRowPtr returns a pointer to a FilterRow that can be modified. If the batch
@ -517,7 +517,7 @@ func (u *updateBatch) tailEpoch() uint32 {
func (u *updateBatch) getRowPtr(mapIndex, rowIndex uint32) (*FilterRow, error) { func (u *updateBatch) getRowPtr(mapIndex, rowIndex uint32) (*FilterRow, error) {
fm := u.maps[mapIndex] fm := u.maps[mapIndex]
if fm == nil { if fm == nil {
fm = new(filterMap) fm = make(filterMap, u.f.mapHeight)
u.maps[mapIndex] = fm u.maps[mapIndex] = fm
if mapIndex < u.firstMap || u.afterLastMap == 0 { if mapIndex < u.firstMap || u.afterLastMap == 0 {
u.firstMap = mapIndex u.firstMap = mapIndex
@ -526,9 +526,9 @@ func (u *updateBatch) getRowPtr(mapIndex, rowIndex uint32) (*FilterRow, error) {
u.afterLastMap = mapIndex + 1 u.afterLastMap = mapIndex + 1
} }
} }
rowPtr := &(*fm)[rowIndex] rowPtr := &fm[rowIndex]
if *rowPtr == nil { if *rowPtr == nil {
if filterRow, err := u.getFilterMapRow(mapIndex, rowIndex); err == nil { if filterRow, err := u.f.getFilterMapRow(mapIndex, rowIndex); err == nil {
// filterRow is read only, copy before write // filterRow is read only, copy before write
*rowPtr = make(FilterRow, len(filterRow), len(filterRow)+8) *rowPtr = make(FilterRow, len(filterRow), len(filterRow)+8)
copy(*rowPtr, filterRow) copy(*rowPtr, filterRow)
@ -545,6 +545,7 @@ func (u *updateBatch) initWithBlock(header *types.Header, receipts types.Receipt
return errors.New("already initialized") return errors.New("already initialized")
} }
u.initialized = true u.initialized = true
startLvPointer := uint64(startLvMap) << u.f.logValuesPerMap
u.headLvPointer, u.tailLvPointer, u.tailBlockLvPointer = startLvPointer, startLvPointer, startLvPointer u.headLvPointer, u.tailLvPointer, u.tailBlockLvPointer = startLvPointer, startLvPointer, startLvPointer
u.headBlockNumber, u.tailBlockNumber = header.Number.Uint64()-1, header.Number.Uint64() u.headBlockNumber, u.tailBlockNumber = header.Number.Uint64()-1, header.Number.Uint64()
u.headBlockHash, u.tailParentHash = header.ParentHash, header.ParentHash u.headBlockHash, u.tailParentHash = header.ParentHash, header.ParentHash
@ -554,12 +555,12 @@ func (u *updateBatch) initWithBlock(header *types.Header, receipts types.Receipt
// addValueToHead adds a single log value to the head of the log index. // addValueToHead adds a single log value to the head of the log index.
func (u *updateBatch) addValueToHead(logValue common.Hash) error { func (u *updateBatch) addValueToHead(logValue common.Hash) error {
mapIndex := uint32(u.headLvPointer >> logValuesPerMap) mapIndex := uint32(u.headLvPointer >> u.f.logValuesPerMap)
rowPtr, err := u.getRowPtr(mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, logValue)) rowPtr, err := u.getRowPtr(mapIndex, u.f.rowIndex(mapIndex>>u.f.logMapsPerEpoch, logValue))
if err != nil { if err != nil {
return err return err
} }
column := columnIndex(u.headLvPointer, logValue) column := u.f.columnIndex(u.headLvPointer, logValue)
*rowPtr = append(*rowPtr, column) *rowPtr = append(*rowPtr, column)
u.headLvPointer++ u.headLvPointer++
return nil return nil
@ -577,11 +578,11 @@ func (u *updateBatch) addBlockToHead(header *types.Header, receipts types.Receip
} }
number := header.Number.Uint64() number := header.Number.Uint64()
u.blockLvPointer[number] = u.headLvPointer u.blockLvPointer[number] = u.headLvPointer
startMap := uint32((u.headLvPointer + valuesPerMap - 1) >> logValuesPerMap) startMap := uint32((u.headLvPointer + u.f.valuesPerMap - 1) >> u.f.logValuesPerMap)
if err := iterateReceipts(receipts, u.addValueToHead); err != nil { if err := iterateReceipts(receipts, u.addValueToHead); err != nil {
return err return err
} }
stopMap := uint32((u.headLvPointer + valuesPerMap - 1) >> logValuesPerMap) stopMap := uint32((u.headLvPointer + u.f.valuesPerMap - 1) >> u.f.logValuesPerMap)
for m := startMap; m < stopMap; m++ { for m := startMap; m < stopMap; m++ {
u.mapBlockPtr[m] = number u.mapBlockPtr[m] = number
} }
@ -610,12 +611,12 @@ func (u *updateBatch) addValueToTail(logValue common.Hash) error {
return nil // already added to the map return nil // already added to the map
} }
u.tailLvPointer-- u.tailLvPointer--
mapIndex := uint32(u.tailBlockLvPointer >> logValuesPerMap) mapIndex := uint32(u.tailBlockLvPointer >> u.f.logValuesPerMap)
rowPtr, err := u.getRowPtr(mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, logValue)) rowPtr, err := u.getRowPtr(mapIndex, u.f.rowIndex(mapIndex>>u.f.logMapsPerEpoch, logValue))
if err != nil { if err != nil {
return err return err
} }
column := columnIndex(u.tailBlockLvPointer, logValue) column := u.f.columnIndex(u.tailBlockLvPointer, logValue)
*rowPtr = append(*rowPtr, 0) *rowPtr = append(*rowPtr, 0)
copy((*rowPtr)[1:], (*rowPtr)[:len(*rowPtr)-1]) copy((*rowPtr)[1:], (*rowPtr)[:len(*rowPtr)-1])
(*rowPtr)[0] = column (*rowPtr)[0] = column
@ -632,7 +633,7 @@ func (u *updateBatch) addBlockToTail(header *types.Header, receipts types.Receip
return errors.New("addBlockToTail parent mismatch") return errors.New("addBlockToTail parent mismatch")
} }
number := header.Number.Uint64() number := header.Number.Uint64()
stopMap := uint32((u.tailLvPointer + valuesPerMap - 1) >> logValuesPerMap) stopMap := uint32((u.tailLvPointer + u.f.valuesPerMap - 1) >> u.f.logValuesPerMap)
var cnt int var cnt int
if err := iterateReceiptsReverse(receipts, func(lv common.Hash) error { if err := iterateReceiptsReverse(receipts, func(lv common.Hash) error {
cnt++ cnt++
@ -640,7 +641,7 @@ func (u *updateBatch) addBlockToTail(header *types.Header, receipts types.Receip
}); err != nil { }); err != nil {
return err return err
} }
startMap := uint32(u.tailLvPointer >> logValuesPerMap) startMap := uint32(u.tailLvPointer >> u.f.logValuesPerMap)
for m := startMap; m < stopMap; m++ { for m := startMap; m < stopMap; m++ {
u.mapBlockPtr[m] = number u.mapBlockPtr[m] = number
} }
@ -693,7 +694,7 @@ type revertPoint struct {
blockNumber uint64 blockNumber uint64
blockHash common.Hash blockHash common.Hash
mapIndex uint32 mapIndex uint32
rowLength [mapHeight]uint rowLength []uint
} }
// makeRevertPoint creates a new revertPoint. // makeRevertPoint creates a new revertPoint.
@ -701,19 +702,20 @@ func (u *updateBatch) makeRevertPoint() (*revertPoint, error) {
rp := &revertPoint{ rp := &revertPoint{
blockNumber: u.headBlockNumber, blockNumber: u.headBlockNumber,
blockHash: u.headBlockHash, blockHash: u.headBlockHash,
mapIndex: uint32(u.headLvPointer >> logValuesPerMap), mapIndex: uint32(u.headLvPointer >> u.f.logValuesPerMap),
rowLength: make([]uint, u.f.mapHeight),
} }
if u.tailLvPointer > uint64(rp.mapIndex)<<logValuesPerMap { if u.tailLvPointer > uint64(rp.mapIndex)<<u.f.logValuesPerMap {
return nil, nil return nil, nil
} }
for i := range rp.rowLength[:] { for i := range rp.rowLength {
var row FilterRow var row FilterRow
if m := u.maps[rp.mapIndex]; m != nil { if m := u.maps[rp.mapIndex]; m != nil {
row = (*m)[i] row = m[i]
} }
if row == nil { if row == nil {
var err error var err error
row, err = u.getFilterMapRow(rp.mapIndex, uint32(i)) row, err = u.f.getFilterMapRow(rp.mapIndex, uint32(i))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -744,16 +746,15 @@ func (f *FilterMaps) getRevertPoint(blockNumber uint64) (*revertPoint, error) {
if rps == nil { if rps == nil {
return nil, nil return nil, nil
} }
if len(rps.RowLength) != mapHeight { if uint32(len(rps.RowLength)) != f.mapHeight {
return nil, errors.New("invalid number of rows in stored revert point") return nil, errors.New("invalid number of rows in stored revert point")
} }
rp := &revertPoint{ return &revertPoint{
blockNumber: blockNumber, blockNumber: blockNumber,
blockHash: rps.BlockHash, blockHash: rps.BlockHash,
mapIndex: rps.MapIndex, mapIndex: rps.MapIndex,
} rowLength: rps.RowLength,
copy(rp.rowLength[:], rps.RowLength) }, nil
return rp, nil
} }
// revertTo reverts the log index to the given revert point. // revertTo reverts the log index to the given revert point.
@ -762,12 +763,12 @@ func (f *FilterMaps) revertTo(rp *revertPoint) error {
defer f.lock.Unlock() defer f.lock.Unlock()
batch := f.db.NewBatch() batch := f.db.NewBatch()
afterLastMap := uint32((f.headLvPointer + valuesPerMap - 1) >> logValuesPerMap) afterLastMap := uint32((f.headLvPointer + f.valuesPerMap - 1) >> f.logValuesPerMap)
if rp.mapIndex >= afterLastMap { if rp.mapIndex >= afterLastMap {
return errors.New("cannot revert (head map behind revert point)") return errors.New("cannot revert (head map behind revert point)")
} }
lvPointer := uint64(rp.mapIndex) << logValuesPerMap lvPointer := uint64(rp.mapIndex) << f.logValuesPerMap
for rowIndex, rowLen := range rp.rowLength[:] { for rowIndex, rowLen := range rp.rowLength {
rowIndex := uint32(rowIndex) rowIndex := uint32(rowIndex)
row, err := f.getFilterMapRow(rp.mapIndex, rowIndex) row, err := f.getFilterMapRow(rp.mapIndex, rowIndex)
if err != nil { if err != nil {

@ -21,6 +21,7 @@ var ErrMatchAll = errors.New("match all patterns not supported")
// once EIP-7745 is implemented and active, these functions can also be trustlessly // once EIP-7745 is implemented and active, these functions can also be trustlessly
// served by a remote prover. // served by a remote prover.
type MatcherBackend interface { type MatcherBackend interface {
GetParams() *Params
GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error)
GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error)
GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error)
@ -139,6 +140,7 @@ func GetPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock
// to that block range might be missing or incorrect. // to that block range might be missing or incorrect.
// Also note that the returned list may contain false positives. // Also note that the returned list may contain false positives.
func getPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, error) { func getPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, error) {
params := backend.GetParams()
// find the log value index range to search // find the log value index range to search
firstIndex, err := backend.GetBlockLvPointer(ctx, firstBlock) firstIndex, err := backend.GetBlockLvPointer(ctx, firstBlock)
if err != nil { if err != nil {
@ -151,8 +153,8 @@ func getPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock
if lastIndex > 0 { if lastIndex > 0 {
lastIndex-- lastIndex--
} }
firstMap, lastMap := uint32(firstIndex>>logValuesPerMap), uint32(lastIndex>>logValuesPerMap) firstMap, lastMap := uint32(firstIndex>>params.logValuesPerMap), uint32(lastIndex>>params.logValuesPerMap)
firstEpoch, lastEpoch := firstMap>>logMapsPerEpoch, lastMap>>logMapsPerEpoch firstEpoch, lastEpoch := firstMap>>params.logMapsPerEpoch, lastMap>>params.logMapsPerEpoch
// build matcher according to the given filter criteria // build matcher according to the given filter criteria
matchers := make([]matcher, len(topics)+1) matchers := make([]matcher, len(topics)+1)
@ -178,13 +180,13 @@ func getPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock
} }
// matcher is the final sequence matcher that signals a match when all underlying // matcher is the final sequence matcher that signals a match when all underlying
// matchers signal a match for consecutive log value indices. // matchers signal a match for consecutive log value indices.
matcher := newMatchSequence(matchers) matcher := newMatchSequence(params, matchers)
// processEpoch returns the potentially matching logs from the given epoch. // processEpoch returns the potentially matching logs from the given epoch.
processEpoch := func(epochIndex uint32) ([]*types.Log, error) { processEpoch := func(epochIndex uint32) ([]*types.Log, error) {
var logs []*types.Log var logs []*types.Log
// create a list of map indices to process // create a list of map indices to process
fm, lm := epochIndex<<logMapsPerEpoch, (epochIndex+1)<<logMapsPerEpoch-1 fm, lm := epochIndex<<params.logMapsPerEpoch, (epochIndex+1)<<params.logMapsPerEpoch-1
if fm < firstMap { if fm < firstMap {
fm = firstMap fm = firstMap
} }
@ -318,13 +320,14 @@ type singleMatcher struct {
// getMatches implements matcher // getMatches implements matcher
func (s *singleMatcher) getMatches(ctx context.Context, mapIndices []uint32) ([]potentialMatches, error) { func (s *singleMatcher) getMatches(ctx context.Context, mapIndices []uint32) ([]potentialMatches, error) {
params := s.backend.GetParams()
results := make([]potentialMatches, len(mapIndices)) results := make([]potentialMatches, len(mapIndices))
for i, mapIndex := range mapIndices { for i, mapIndex := range mapIndices {
filterRow, err := s.backend.GetFilterMapRow(ctx, mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, s.value)) filterRow, err := s.backend.GetFilterMapRow(ctx, mapIndex, params.rowIndex(mapIndex>>params.logMapsPerEpoch, s.value))
if err != nil { if err != nil {
return nil, err return nil, err
} }
results[i] = filterRow.potentialMatches(mapIndex, s.value) results[i] = params.potentialMatches(filterRow, mapIndex, s.value)
} }
return results, nil return results, nil
} }
@ -403,6 +406,7 @@ func mergeResults(results []potentialMatches) potentialMatches {
// gives a match at X+offset. Note that matchSequence can be used recursively to // gives a match at X+offset. Note that matchSequence can be used recursively to
// detect any log value sequence. // detect any log value sequence.
type matchSequence struct { type matchSequence struct {
params *Params
base, next matcher base, next matcher
offset uint64 offset uint64
// *EmptyRate == totalCount << 32 + emptyCount (atomically accessed) // *EmptyRate == totalCount << 32 + emptyCount (atomically accessed)
@ -412,7 +416,7 @@ type matchSequence struct {
// newMatchSequence creates a recursive sequence matcher from a list of underlying // newMatchSequence creates a recursive sequence matcher from a list of underlying
// matchers. The resulting matcher signals a match at log value index X when each // matchers. The resulting matcher signals a match at log value index X when each
// underlying matcher matchers[i] returns a match at X+i. // underlying matcher matchers[i] returns a match at X+i.
func newMatchSequence(matchers []matcher) matcher { func newMatchSequence(params *Params, matchers []matcher) matcher {
if len(matchers) == 0 { if len(matchers) == 0 {
panic("zero length sequence matchers are not allowed") panic("zero length sequence matchers are not allowed")
} }
@ -420,7 +424,8 @@ func newMatchSequence(matchers []matcher) matcher {
return matchers[0] return matchers[0]
} }
return &matchSequence{ return &matchSequence{
base: newMatchSequence(matchers[:len(matchers)-1]), params: params,
base: newMatchSequence(params, matchers[:len(matchers)-1]),
next: matchers[len(matchers)-1], next: matchers[len(matchers)-1],
offset: uint64(len(matchers) - 1), offset: uint64(len(matchers) - 1),
} }
@ -461,7 +466,7 @@ func (m *matchSequence) getMatches(ctx context.Context, mapIndices []uint32) ([]
nextIndices = append(nextIndices, mapIndex) nextIndices = append(nextIndices, mapIndex)
lastAdded = mapIndex lastAdded = mapIndex
} }
if !baseFirst || baseRes[i] == nil || baseRes[i][len(baseRes[i])-1] >= (uint64(mapIndex+1)<<logValuesPerMap)-m.offset { if !baseFirst || baseRes[i] == nil || baseRes[i][len(baseRes[i])-1] >= (uint64(mapIndex+1)<<m.params.logValuesPerMap)-m.offset {
nextIndices = append(nextIndices, mapIndex+1) nextIndices = append(nextIndices, mapIndex+1)
lastAdded = mapIndex + 1 lastAdded = mapIndex + 1
} }
@ -492,8 +497,8 @@ func (m *matchSequence) getMatches(ctx context.Context, mapIndices []uint32) ([]
panic("invalid nextIndices") panic("invalid nextIndices")
} }
next1, next2 := nextRes[nextPtr], nextRes[nextPtr+1] next1, next2 := nextRes[nextPtr], nextRes[nextPtr+1]
if next1 == nil || (len(next1) > 0 && next1[len(next1)-1] >= (uint64(mapIndex)<<logValuesPerMap)+m.offset) || if next1 == nil || (len(next1) > 0 && next1[len(next1)-1] >= (uint64(mapIndex)<<m.params.logValuesPerMap)+m.offset) ||
next2 == nil || (len(next2) > 0 && next2[0] < (uint64(mapIndex+1)<<logValuesPerMap)+m.offset) { next2 == nil || (len(next2) > 0 && next2[0] < (uint64(mapIndex+1)<<m.params.logValuesPerMap)+m.offset) {
baseIndices = append(baseIndices, mapIndex) baseIndices = append(baseIndices, mapIndex)
} }
} }
@ -548,17 +553,17 @@ func (m *matchSequence) getMatches(ctx context.Context, mapIndices []uint32) ([]
// match corresponding base and next matcher results // match corresponding base and next matcher results
results := make([]potentialMatches, len(mapIndices)) results := make([]potentialMatches, len(mapIndices))
for i, mapIndex := range mapIndices { for i, mapIndex := range mapIndices {
results[i] = matchSequenceResults(mapIndex, m.offset, baseResult(mapIndex), nextResult(mapIndex), nextResult(mapIndex+1)) results[i] = m.matchResults(mapIndex, m.offset, baseResult(mapIndex), nextResult(mapIndex), nextResult(mapIndex+1))
} }
return results, nil return results, nil
} }
// matchSequenceResults returns a list of sequence matches for the given mapIndex // matchResults returns a list of sequence matches for the given mapIndex and
// and offset based on the base matcher's results at mapIndex and the next matcher's // offset based on the base matcher's results at mapIndex and the next matcher's
// results at mapIndex and mapIndex+1. Note that acquiring nextNextRes may be // results at mapIndex and mapIndex+1. Note that acquiring nextNextRes may be
// skipped and it can be substituted with an empty list if baseRes has no potential // skipped and it can be substituted with an empty list if baseRes has no potential
// matches that could be sequence matched with anything that could be in nextNextRes. // matches that could be sequence matched with anything that could be in nextNextRes.
func matchSequenceResults(mapIndex uint32, offset uint64, baseRes, nextRes, nextNextRes potentialMatches) potentialMatches { func (m *matchSequence) matchResults(mapIndex uint32, offset uint64, baseRes, nextRes, nextNextRes potentialMatches) potentialMatches {
if nextRes == nil || (baseRes != nil && len(baseRes) == 0) { if nextRes == nil || (baseRes != nil && len(baseRes) == 0) {
// if nextRes is a wild card or baseRes is empty then the sequence matcher // if nextRes is a wild card or baseRes is empty then the sequence matcher
// result equals baseRes. // result equals baseRes.
@ -568,7 +573,7 @@ func matchSequenceResults(mapIndex uint32, offset uint64, baseRes, nextRes, next
// discard items from nextRes whose corresponding base matcher results // discard items from nextRes whose corresponding base matcher results
// with the negative offset applied would be located at mapIndex-1. // with the negative offset applied would be located at mapIndex-1.
start := 0 start := 0
for start < len(nextRes) && nextRes[start] < uint64(mapIndex)<<logValuesPerMap+offset { for start < len(nextRes) && nextRes[start] < uint64(mapIndex)<<m.params.logValuesPerMap+offset {
start++ start++
} }
nextRes = nextRes[start:] nextRes = nextRes[start:]
@ -577,7 +582,7 @@ func matchSequenceResults(mapIndex uint32, offset uint64, baseRes, nextRes, next
// discard items from nextNextRes whose corresponding base matcher results // discard items from nextNextRes whose corresponding base matcher results
// with the negative offset applied would still be located at mapIndex+1. // with the negative offset applied would still be located at mapIndex+1.
stop := 0 stop := 0
for stop < len(nextNextRes) && nextNextRes[stop] < uint64(mapIndex+1)<<logValuesPerMap+offset { for stop < len(nextNextRes) && nextNextRes[stop] < uint64(mapIndex+1)<<m.params.logValuesPerMap+offset {
stop++ stop++
} }
nextNextRes = nextNextRes[:stop] nextNextRes = nextNextRes[:stop]

@ -32,32 +32,15 @@ func (f *FilterMaps) NewMatcherBackend() *FilterMapsMatcherBackend {
return fm return fm
} }
// updateMatchersValidRange iterates through active matchers and limits their // GetParams returns the filtermaps parameters.
// valid range with the current indexed range. This function should be called // GetParams implements MatcherBackend.
// whenever a part of the log index has been removed, before adding new blocks func (fm *FilterMapsMatcherBackend) GetParams() *Params {
// to it. return &fm.f.Params
func (f *FilterMaps) updateMatchersValidRange() {
for fm := range f.matchers {
if !f.initialized {
fm.valid = false
}
if !fm.valid {
continue
}
if fm.firstValid < f.tailBlockNumber {
fm.firstValid = f.tailBlockNumber
}
if fm.lastValid > f.headBlockNumber {
fm.lastValid = f.headBlockNumber
}
if fm.firstValid > fm.lastValid {
fm.valid = false
}
}
} }
// Close removes the matcher from the set of active matchers and ensures that // Close removes the matcher from the set of active matchers and ensures that
// any SyncLogIndex calls are cancelled. // any SyncLogIndex calls are cancelled.
// Close implements MatcherBackend.
func (fm *FilterMapsMatcherBackend) Close() { func (fm *FilterMapsMatcherBackend) Close() {
fm.f.lock.Lock() fm.f.lock.Lock()
defer fm.f.lock.Unlock() defer fm.f.lock.Unlock()
@ -156,3 +139,27 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange
return SyncRange{}, ctx.Err() return SyncRange{}, ctx.Err()
} }
} }
// updateMatchersValidRange iterates through active matchers and limits their
// valid range with the current indexed range. This function should be called
// whenever a part of the log index has been removed, before adding new blocks
// to it.
func (f *FilterMaps) updateMatchersValidRange() {
for fm := range f.matchers {
if !f.initialized {
fm.valid = false
}
if !fm.valid {
continue
}
if fm.firstValid < f.tailBlockNumber {
fm.firstValid = f.tailBlockNumber
}
if fm.lastValid > f.headBlockNumber {
fm.lastValid = f.headBlockNumber
}
if fm.firstValid > fm.lastValid {
fm.valid = false
}
}
}

@ -0,0 +1,180 @@
package filtermaps
import (
"crypto/sha256"
"encoding/binary"
"sort"
"github.com/ethereum/go-ethereum/common"
)
type Params struct {
logMapHeight uint // log2(mapHeight)
logMapsPerEpoch uint // log2(mmapsPerEpochapsPerEpoch)
logValuesPerMap uint // log2(logValuesPerMap)
// derived fields
mapHeight uint32 // filter map height (number of rows)
mapsPerEpoch uint32 // number of maps in an epoch
valuesPerMap uint64 // number of log values marked on each filter map
}
var DefaultParams = Params{
logMapHeight: 12,
logMapsPerEpoch: 6,
logValuesPerMap: 16,
}
func (p *Params) deriveFields() {
p.mapHeight = uint32(1) << p.logMapHeight
p.mapsPerEpoch = uint32(1) << p.logMapsPerEpoch
p.valuesPerMap = uint64(1) << p.logValuesPerMap
}
// addressValue returns the log value hash of a log emitting address.
func addressValue(address common.Address) common.Hash {
var result common.Hash
hasher := sha256.New()
hasher.Write(address[:])
hasher.Sum(result[:0])
return result
}
// topicValue returns the log value hash of a log topic.
func topicValue(topic common.Hash) common.Hash {
var result common.Hash
hasher := sha256.New()
hasher.Write(topic[:])
hasher.Sum(result[:0])
return result
}
// rowIndex returns the row index in which the given log value should be marked
// during the given epoch. Note that row assignments are re-shuffled in every
// epoch in order to ensure that even though there are always a few more heavily
// used rows due to very popular addresses and topics, these will not make search
// for other log values very expensive. Even if certain values are occasionally
// sorted into these heavy rows, in most of the epochs they are placed in average
// length rows.
func (p *Params) rowIndex(epochIndex uint32, logValue common.Hash) uint32 {
hasher := sha256.New()
hasher.Write(logValue[:])
var indexEnc [4]byte
binary.LittleEndian.PutUint32(indexEnc[:], epochIndex)
hasher.Write(indexEnc[:])
var hash common.Hash
hasher.Sum(hash[:0])
return binary.LittleEndian.Uint32(hash[:4]) % p.mapHeight
}
// columnIndex returns the column index that should be added to the appropriate
// row in order to place a mark for the next log value.
func (p *Params) columnIndex(lvIndex uint64, logValue common.Hash) uint32 {
x := uint32(lvIndex % p.valuesPerMap) // log value sub-index
transformHash := transformHash(uint32(lvIndex/p.valuesPerMap), logValue)
// apply column index transformation function
x += binary.LittleEndian.Uint32(transformHash[0:4])
x *= binary.LittleEndian.Uint32(transformHash[4:8])*2 + 1
x ^= binary.LittleEndian.Uint32(transformHash[8:12])
x *= binary.LittleEndian.Uint32(transformHash[12:16])*2 + 1
x += binary.LittleEndian.Uint32(transformHash[16:20])
x *= binary.LittleEndian.Uint32(transformHash[20:24])*2 + 1
x ^= binary.LittleEndian.Uint32(transformHash[24:28])
x *= binary.LittleEndian.Uint32(transformHash[28:32])*2 + 1
return x
}
// transformHash calculates a hash specific to a given map and log value hash
// that defines a bijective function on the uint32 range. This function is used
// to transform the log value sub-index (distance from the first index of the map)
// into a 32 bit column index, then applied in reverse when searching for potential
// matches for a given log value.
func transformHash(mapIndex uint32, logValue common.Hash) (result common.Hash) {
hasher := sha256.New()
hasher.Write(logValue[:])
var indexEnc [4]byte
binary.LittleEndian.PutUint32(indexEnc[:], mapIndex)
hasher.Write(indexEnc[:])
hasher.Sum(result[:0])
return
}
// potentialMatches returns the list of log value indices potentially matching
// the given log value hash in the range of the filter map the row belongs to.
// Note that the list of indices is always sorted and potential duplicates are
// removed. Though the column indices are stored in the same order they were
// added and therefore the true matches are automatically reverse transformed
// in the right order, false positives can ruin this property. Since these can
// only be separated from true matches after the combined pattern matching of the
// outputs of individual log value matchers and this pattern matcher assumes a
// sorted and duplicate-free list of indices, we should ensure these properties
// here.
func (p *Params) potentialMatches(row FilterRow, mapIndex uint32, logValue common.Hash) potentialMatches {
results := make(potentialMatches, 0, 8)
transformHash := transformHash(mapIndex, logValue)
sub1 := binary.LittleEndian.Uint32(transformHash[0:4])
mul1 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[4:8])*2 + 1)
xor1 := binary.LittleEndian.Uint32(transformHash[8:12])
mul2 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[12:16])*2 + 1)
sub2 := binary.LittleEndian.Uint32(transformHash[16:20])
mul3 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[20:24])*2 + 1)
xor2 := binary.LittleEndian.Uint32(transformHash[24:28])
mul4 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[28:32])*2 + 1)
// perform reverse column index transformation on all column indices of the row.
// if a column index was added by the searched log value then the reverse
// transform will yield a valid log value sub-index of the given map.
// Column index is 32 bits long while there are 2**16 valid log value indices
// in the map's range, so this can also happen by accident with 1 in 2**16
// chance, in which case we have a false positive.
for _, columnIndex := range row {
if potentialSubIndex := (((((((columnIndex * mul4) ^ xor2) * mul3) - sub2) * mul2) ^ xor1) * mul1) - sub1; potentialSubIndex < uint32(p.valuesPerMap) {
results = append(results, uint64(mapIndex)<<p.logValuesPerMap+uint64(potentialSubIndex))
}
}
sort.Sort(results)
// remove duplicates
j := 0
for i, match := range results {
if i == 0 || match != results[i-1] {
results[j] = results[i]
j++
}
}
return results[:j]
}
// potentialMatches is a strictly monotonically increasing list of log value
// indices in the range of a filter map that are potential matches for certain
// filter criteria.
// Note that nil is used as a wildcard and therefore means that all log value
// indices in the filter map range are potential matches. If there are no
// potential matches in the given map's range then an empty slice should be used.
type potentialMatches []uint64
// noMatches means there are no potential matches in a given filter map's range.
var noMatches = potentialMatches{}
func (p potentialMatches) Len() int { return len(p) }
func (p potentialMatches) Less(i, j int) bool { return p[i] < p[j] }
func (p potentialMatches) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// uint32ModInverse takes an odd 32 bit number and returns its modular
// multiplicative inverse (mod 2**32), meaning that for any uint32 x and odd y
// x * y * uint32ModInverse(y) == 1.
func uint32ModInverse(v uint32) uint32 {
if v&1 == 0 {
panic("uint32ModInverse called with even argument")
}
m := int64(1) << 32
m0 := m
a := int64(v)
x, y := int64(1), int64(0)
for a > 1 {
q := a / m
m, a = a%m, m
x, y = y, x-q*y
}
if x < 0 {
x += m0
}
return uint32(x)
}

@ -8,14 +8,17 @@ import (
) )
func TestSingleMatch(t *testing.T) { func TestSingleMatch(t *testing.T) {
params := DefaultParams
params.deriveFields()
for count := 0; count < 100000; count++ { for count := 0; count < 100000; count++ {
// generate a row with a single random entry // generate a row with a single random entry
mapIndex := rand.Uint32() mapIndex := rand.Uint32()
lvIndex := uint64(mapIndex)<<logValuesPerMap + uint64(rand.Intn(valuesPerMap)) lvIndex := uint64(mapIndex)<<params.logValuesPerMap + uint64(rand.Intn(int(params.valuesPerMap)))
var lvHash common.Hash var lvHash common.Hash
rand.Read(lvHash[:]) rand.Read(lvHash[:])
row := FilterRow{columnIndex(lvIndex, lvHash)} row := FilterRow{params.columnIndex(lvIndex, lvHash)}
matches := row.potentialMatches(mapIndex, lvHash) matches := params.potentialMatches(row, mapIndex, lvHash)
// check if it has been reverse transformed correctly // check if it has been reverse transformed correctly
if len(matches) != 1 { if len(matches) != 1 {
t.Fatalf("Invalid length of matches (got %d, expected 1)", len(matches)) t.Fatalf("Invalid length of matches (got %d, expected 1)", len(matches))
@ -34,23 +37,26 @@ const (
) )
func TestPotentialMatches(t *testing.T) { func TestPotentialMatches(t *testing.T) {
params := DefaultParams
params.deriveFields()
var falsePositives int var falsePositives int
for count := 0; count < testPmCount; count++ { for count := 0; count < testPmCount; count++ {
mapIndex := rand.Uint32() mapIndex := rand.Uint32()
lvStart := uint64(mapIndex) << logValuesPerMap lvStart := uint64(mapIndex) << params.logValuesPerMap
var row FilterRow var row FilterRow
lvIndices := make([]uint64, testPmLen) lvIndices := make([]uint64, testPmLen)
lvHashes := make([]common.Hash, testPmLen+1) lvHashes := make([]common.Hash, testPmLen+1)
for i := range lvIndices { for i := range lvIndices {
// add testPmLen single entries with different log value hashes at different indices // add testPmLen single entries with different log value hashes at different indices
lvIndices[i] = lvStart + uint64(rand.Intn(valuesPerMap)) lvIndices[i] = lvStart + uint64(rand.Intn(int(params.valuesPerMap)))
rand.Read(lvHashes[i][:]) rand.Read(lvHashes[i][:])
row = append(row, columnIndex(lvIndices[i], lvHashes[i])) row = append(row, params.columnIndex(lvIndices[i], lvHashes[i]))
} }
// add the same log value hash at the first testPmLen log value indices of the map's range // add the same log value hash at the first testPmLen log value indices of the map's range
rand.Read(lvHashes[testPmLen][:]) rand.Read(lvHashes[testPmLen][:])
for lvIndex := lvStart; lvIndex < lvStart+testPmLen; lvIndex++ { for lvIndex := lvStart; lvIndex < lvStart+testPmLen; lvIndex++ {
row = append(row, columnIndex(lvIndex, lvHashes[testPmLen])) row = append(row, params.columnIndex(lvIndex, lvHashes[testPmLen]))
} }
// randomly duplicate some entries // randomly duplicate some entries
for i := 0; i < testPmLen; i++ { for i := 0; i < testPmLen; i++ {
@ -63,7 +69,7 @@ func TestPotentialMatches(t *testing.T) {
} }
// check retrieved matches while also counting false positives // check retrieved matches while also counting false positives
for i, lvHash := range lvHashes { for i, lvHash := range lvHashes {
matches := row.potentialMatches(mapIndex, lvHash) matches := params.potentialMatches(row, mapIndex, lvHash)
if i < testPmLen { if i < testPmLen {
// check single entry match // check single entry match
if len(matches) < 1 { if len(matches) < 1 {
@ -97,15 +103,17 @@ func TestPotentialMatches(t *testing.T) {
} }
} }
// Whenever looking for a certain log value hash, each entry in the row that // Whenever looking for a certain log value hash, each entry in the row that
// was generated by another log value hash (a "foreign entry") has an // was generated by another log value hash (a "foreign entry") has a
// 1 / valuesPerMap chance of yielding a false positive. // valuesPerMap // 2^32 chance of yielding a false positive if the reverse
// transformed 32 bit integer is by random chance less than valuesPerMap and
// is therefore considered a potentially valid match.
// We have testPmLen unique hash entries and a testPmLen long series of entries // We have testPmLen unique hash entries and a testPmLen long series of entries
// for the same hash. For each of the testPmLen unique hash entries there are // for the same hash. For each of the testPmLen unique hash entries there are
// testPmLen*2-1 foreign entries while for the long series there are testPmLen // testPmLen*2-1 foreign entries while for the long series there are testPmLen
// foreign entries. This means that after performing all these filtering runs, // foreign entries. This means that after performing all these filtering runs,
// we have processed 2*testPmLen^2 foreign entries, which given us an estimate // we have processed 2*testPmLen^2 foreign entries, which given us an estimate
// of how many false positives to expect. // of how many false positives to expect.
expFalse := testPmCount * testPmLen * testPmLen * 2 / valuesPerMap expFalse := int(uint64(testPmCount*testPmLen*testPmLen*2) * params.valuesPerMap >> 32)
if falsePositives < expFalse/2 || falsePositives > expFalse*3/2 { if falsePositives < expFalse/2 || falsePositives > expFalse*3/2 {
t.Fatalf("False positive rate out of expected range (got %d, expected %d +-50%%)", falsePositives, expFalse) t.Fatalf("False positive rate out of expected range (got %d, expected %d +-50%%)", falsePositives, expFalse)
} }

@ -216,7 +216,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain, config.LogHistory, config.LogNoHistory) eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain, filtermaps.DefaultParams, config.LogHistory, config.LogNoHistory)
if config.BlobPool.Datadir != "" { if config.BlobPool.Datadir != "" {
config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir) config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)

Loading…
Cancel
Save