core/filtermaps: two dimensional log filter

Zsolt Felfoldi 4 months ago
parent ae707445f5
commit 9ad34e5c69
  1. 582
      core/filtermaps/filtermaps.go
  2. 618
      core/filtermaps/indexer.go
  3. 500
      core/filtermaps/matcher.go
  4. 206
      core/rawdb/accessors_indexes.go
  5. 31
      core/rawdb/schema.go
  6. 2
      eth/api_backend.go
  7. 13
      eth/backend.go
  8. 41
      eth/filters/filter.go
  9. 5
      eth/filters/filter_system.go
  10. 5
      internal/ethapi/backend.go

@ -0,0 +1,582 @@
package filtermaps
import (
"context"
"crypto/sha256"
"encoding/binary"
"errors"
"sort"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
)
const (
logMapHeight = 12 // log2(mapHeight)
mapHeight = 1 << logMapHeight // filter map height (number of rows)
logMapsPerEpoch = 6 // log2(mmapsPerEpochapsPerEpoch)
mapsPerEpoch = 1 << logMapsPerEpoch // number of maps in an epoch
logValuesPerMap = 16 // log2(logValuesPerMap)
valuesPerMap = 1 << logValuesPerMap // number of log values marked on each filter map
headCacheSize = 8 // maximum number of recent filter maps cached in memory
)
// FilterMaps is the in-memory representation of the log index structure that is
// responsible for building and updating the index according to the canonical
// chain.
// Note that FilterMaps implements the same data structure as proposed in EIP-7745
// without the tree hashing and consensus changes:
// https://eips.ethereum.org/EIPS/eip-7745
type FilterMaps struct {
lock sync.RWMutex
db ethdb.KeyValueStore
closeCh chan chan struct{}
filterMapsRange
chain *core.BlockChain
// filterMapCache caches certain filter maps (headCacheSize most recent maps
// and one tail map) that are expected to be frequently accessed and modified
// while updating the structure. Note that the set of cached maps depends
// only on filterMapsRange and rows of other maps are not cached here.
filterMapLock sync.Mutex
filterMapCache map[uint32]*filterMap
blockPtrCache *lru.Cache[uint32, uint64]
lvPointerCache *lru.Cache[uint64, uint64]
revertPoints map[uint64]*revertPoint
}
// filterMap is a full or partial in-memory representation of a filter map where
// rows are allowed to have a nil value meaning the row is not stored in the
// structure. Note that therefore a known empty row should be represented with
// a zero-length slice.
// It can be used as a memory cache or an overlay while preparing a batch of
// changes to the structure. In either case a nil value should be interpreted
// as transparent (uncached/unchanged).
type filterMap [mapHeight]FilterRow
// FilterRow encodes a single row of a filter map as a list of column indices.
// Note that the values are always stored in the same order as they were added
// and if the same column index is added twice, it is also stored twice.
// Order of column indices and potential duplications do not matter when searching
// for a value but leaving the original order makes reverting to a previous state
// simpler.
type FilterRow []uint32
// emptyRow represents an empty FilterRow. Note that in case of decoded FilterRows
// nil has a special meaning (transparent; not stored in the cache/overlay map)
// and therefore an empty row is represented by a zero length slice.
var emptyRow = FilterRow{}
// filterMapsRange describes the block range that has been indexed and the log
// value index range it has been mapped to.
type filterMapsRange struct {
initialized bool
headLvPointer, tailLvPointer uint64
headBlockNumber, tailBlockNumber uint64
headBlockHash, tailParentHash common.Hash
}
// NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep
// the structure in sync with the given blockchain.
func NewFilterMaps(db ethdb.KeyValueStore, chain *core.BlockChain) *FilterMaps {
rs, err := rawdb.ReadFilterMapsRange(db)
if err != nil {
log.Error("Error reading log index range", "error", err)
}
fm := &FilterMaps{
db: db,
chain: chain,
closeCh: make(chan chan struct{}),
filterMapsRange: filterMapsRange{
initialized: rs.Initialized,
headLvPointer: rs.HeadLvPointer,
tailLvPointer: rs.TailLvPointer,
headBlockNumber: rs.HeadBlockNumber,
tailBlockNumber: rs.TailBlockNumber,
headBlockHash: rs.HeadBlockHash,
tailParentHash: rs.TailParentHash,
},
filterMapCache: make(map[uint32]*filterMap),
blockPtrCache: lru.NewCache[uint32, uint64](1000),
lvPointerCache: lru.NewCache[uint64, uint64](1000),
revertPoints: make(map[uint64]*revertPoint),
}
if !fm.initialized {
fm.resetDb()
}
fm.updateMapCache()
if rp, err := fm.newUpdateBatch().makeRevertPoint(); err == nil {
fm.revertPoints[rp.blockNumber] = rp
} else {
log.Error("Error creating head revert point", "error", err)
}
go fm.updateLoop()
return fm
}
// Close ensures that the indexer is fully stopped before returning.
func (f *FilterMaps) Close() {
ch := make(chan struct{})
f.closeCh <- ch
<-ch
}
// FilterMapsMatcherBackend implements MatcherBackend.
type FilterMapsMatcherBackend FilterMaps
// GetFilterMapRow returns the given row of the given map. If the row is empty
// then a non-nil zero length row is returned.
// Note that the returned slices should not be modified, they should be copied
// on write.
// GetFilterMapRow implements MatcherBackend.
func (ff *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error) {
f := (*FilterMaps)(ff)
return f.getFilterMapRow(mapIndex, rowIndex)
}
// GetBlockLvPointer returns the starting log value index where the log values
// generated by the given block are located. If blockNumber is beyond the current
// head then the first unoccupied log value index is returned.
// GetBlockLvPointer implements MatcherBackend.
func (ff *FilterMapsMatcherBackend) GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) {
f := (*FilterMaps)(ff)
f.lock.RLock()
defer f.lock.RUnlock()
return f.getBlockLvPointer(blockNumber)
}
// GetLogByLvIndex returns the log at the given log value index. If the index does
// not point to the first log value entry of a log then no log and no error are
// returned as this can happen when the log value index was a false positive.
// Note that this function assumes that the log index structure is consistent
// with the canonical chain at the point where the given log value index points.
// If this is not the case then an invalid result or an error may be returned.
// GetLogByLvIndex implements MatcherBackend.
func (ff *FilterMapsMatcherBackend) GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) {
f := (*FilterMaps)(ff)
f.lock.RLock()
defer f.lock.RUnlock()
return f.getLogByLvIndex(lvIndex)
}
// reset un-initializes the FilterMaps structure and removes all related data from
// the database.
// Note that this function assumes that the read/write lock is being held.
func (f *FilterMaps) reset() {
// deleting the range first ensures that resetDb will be called again at next
// startup and any leftover data will be removed even if it cannot finish now.
rawdb.DeleteFilterMapsRange(f.db)
f.resetDb()
f.filterMapsRange = filterMapsRange{}
f.filterMapCache = make(map[uint32]*filterMap)
f.revertPoints = make(map[uint64]*revertPoint)
f.blockPtrCache.Purge()
f.lvPointerCache.Purge()
}
// resetDb removes all log index data from the database.
func (f *FilterMaps) resetDb() {
var logged bool
for {
it := f.db.NewIterator(rawdb.FilterMapsPrefix, nil)
batch := f.db.NewBatch()
var count int
for ; count < 10000 && it.Next(); count++ {
batch.Delete(it.Key())
}
it.Release()
if count == 0 {
break
}
if !logged {
log.Info("Resetting log index database...")
logged = true
}
batch.Write()
}
if logged {
log.Info("Resetting log index database finished")
}
}
// setRange updates the covered range and also adds the changes to the given batch.
// Note that this function assumes that the read/write lock is being held.
func (f *FilterMaps) setRange(batch ethdb.Batch, newRange filterMapsRange) {
f.filterMapsRange = newRange
rs := rawdb.FilterMapsRange{
Initialized: newRange.initialized,
HeadLvPointer: newRange.headLvPointer,
TailLvPointer: newRange.tailLvPointer,
HeadBlockNumber: newRange.headBlockNumber,
TailBlockNumber: newRange.tailBlockNumber,
HeadBlockHash: newRange.headBlockHash,
TailParentHash: newRange.tailParentHash,
}
rawdb.WriteFilterMapsRange(batch, rs)
f.updateMapCache()
}
// updateMapCache updates the maps covered by the filterMapCache according to the
// covered range.
// Note that this function assumes that the read lock is being held.
func (f *FilterMaps) updateMapCache() {
if !f.initialized {
return
}
f.filterMapLock.Lock()
defer f.filterMapLock.Unlock()
newFilterMapCache := make(map[uint32]*filterMap)
firstMap, afterLastMap := uint32(f.tailLvPointer>>logValuesPerMap), uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap)
headCacheFirst := firstMap + 1
if afterLastMap > headCacheFirst+headCacheSize {
headCacheFirst = afterLastMap - headCacheSize
}
fm := f.filterMapCache[firstMap]
if fm == nil {
fm = new(filterMap)
}
newFilterMapCache[firstMap] = fm
for mapIndex := headCacheFirst; mapIndex < afterLastMap; mapIndex++ {
fm := f.filterMapCache[mapIndex]
if fm == nil {
fm = new(filterMap)
}
newFilterMapCache[mapIndex] = fm
}
f.filterMapCache = newFilterMapCache
}
// getLogByLvIndex returns the log at the given log value index. If the index does
// not point to the first log value entry of a log then no log and no error are
// returned as this can happen when the log value index was a false positive.
// Note that this function assumes that the log index structure is consistent
// with the canonical chain at the point where the given log value index points.
// If this is not the case then an invalid result or an error may be returned.
// Note that this function assumes that the read lock is being held.
func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
if lvIndex < f.tailLvPointer || lvIndex > f.headLvPointer {
return nil, errors.New("log value index outside available range")
}
// find possible block range based on map to block pointers
mapIndex := uint32(lvIndex >> logValuesPerMap)
firstBlockNumber, err := f.getMapBlockPtr(mapIndex)
if err != nil {
return nil, err
}
var lastBlockNumber uint64
if mapIndex+1 < uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap) {
lastBlockNumber, err = f.getMapBlockPtr(mapIndex + 1)
if err != nil {
return nil, err
}
} else {
lastBlockNumber = f.headBlockNumber
}
// find block with binary search based on block to log value index pointers
for firstBlockNumber < lastBlockNumber {
midBlockNumber := (firstBlockNumber + lastBlockNumber + 1) / 2
midLvPointer, err := f.getBlockLvPointer(midBlockNumber)
if err != nil {
return nil, err
}
if lvIndex < midLvPointer {
lastBlockNumber = midBlockNumber - 1
} else {
firstBlockNumber = midBlockNumber
}
}
// get block receipts
hash := f.chain.GetCanonicalHash(firstBlockNumber)
receipts := f.chain.GetReceiptsByHash(hash) //TODO small cache
if receipts == nil {
return nil, errors.New("receipts not found")
}
lvPointer, err := f.getBlockLvPointer(firstBlockNumber)
if err != nil {
return nil, err
}
// iterate through receipts to find the exact log starting at lvIndex
for _, receipt := range receipts {
for _, log := range receipt.Logs {
if lvPointer > lvIndex {
// lvIndex does not point to the first log value (address value)
// generated by a log as true matches should always do, so it
// is considered a false positive (no log and no error returned).
return nil, nil
}
if lvPointer == lvIndex {
return log, nil // potential match
}
lvPointer += uint64(len(log.Topics) + 1)
}
}
return nil, errors.New("log value index not found")
}
// getFilterMapRow returns the given row of the given map. If the row is empty
// then a non-nil zero length row is returned.
// Note that the returned slices should not be modified, they should be copied
// on write.
func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32) (FilterRow, error) {
f.filterMapLock.Lock()
defer f.filterMapLock.Unlock()
fm := f.filterMapCache[mapIndex]
if fm != nil && fm[rowIndex] != nil {
return fm[rowIndex], nil
}
row, err := rawdb.ReadFilterMapRow(f.db, mapRowIndex(mapIndex, rowIndex))
if err != nil {
return nil, err
}
if fm != nil {
fm[rowIndex] = FilterRow(row)
}
return FilterRow(row), nil
}
// storeFilterMapRow stores a row at the given row index of the given map and also
// caches it in filterMapCache if the given map is cached.
// Note that empty rows are not stored in the database and therefore there is no
// separate delete function; deleting a row is the same as storing an empty row.
func (f *FilterMaps) storeFilterMapRow(batch ethdb.Batch, mapIndex, rowIndex uint32, row FilterRow) {
f.filterMapLock.Lock()
defer f.filterMapLock.Unlock()
if fm := f.filterMapCache[mapIndex]; fm != nil {
(*fm)[rowIndex] = row
}
rawdb.WriteFilterMapRow(batch, mapRowIndex(mapIndex, rowIndex), []uint32(row))
}
// mapRowIndex calculates the unified storage index where the given row of the
// given map is stored. Note that this indexing scheme is the same as the one
// proposed in EIP-7745 for tree-hashing the filter map structure and for the
// same data proximity reasons it is also suitable for database representation.
// See also:
// https://eips.ethereum.org/EIPS/eip-7745#hash-tree-structure
func mapRowIndex(mapIndex, rowIndex uint32) uint64 {
epochIndex, mapSubIndex := mapIndex>>logMapsPerEpoch, mapIndex%mapsPerEpoch
return (uint64(epochIndex)<<logMapHeight+uint64(rowIndex))<<logMapsPerEpoch + uint64(mapSubIndex)
}
// getBlockLvPointer returns the starting log value index where the log values
// generated by the given block are located. If blockNumber is beyond the current
// head then the first unoccupied log value index is returned.
// Note that this function assumes that the read lock is being held.
func (f *FilterMaps) getBlockLvPointer(blockNumber uint64) (uint64, error) {
if blockNumber > f.headBlockNumber {
return f.headLvPointer, nil
}
if lvPointer, ok := f.lvPointerCache.Get(blockNumber); ok {
return lvPointer, nil
}
lvPointer, err := rawdb.ReadBlockLvPointer(f.db, blockNumber)
if err != nil {
return 0, err
}
f.lvPointerCache.Add(blockNumber, lvPointer)
return lvPointer, nil
}
// storeBlockLvPointer stores the starting log value index where the log values
// generated by the given block are located.
func (f *FilterMaps) storeBlockLvPointer(batch ethdb.Batch, blockNumber, lvPointer uint64) {
f.lvPointerCache.Add(blockNumber, lvPointer)
rawdb.WriteBlockLvPointer(batch, blockNumber, lvPointer)
}
// deleteBlockLvPointer deletes the starting log value index where the log values
// generated by the given block are located.
func (f *FilterMaps) deleteBlockLvPointer(batch ethdb.Batch, blockNumber uint64) {
f.lvPointerCache.Remove(blockNumber)
rawdb.DeleteBlockLvPointer(batch, blockNumber)
}
// getMapBlockPtr returns the number of the block that generated the first log
// value entry of the given map.
func (f *FilterMaps) getMapBlockPtr(mapIndex uint32) (uint64, error) {
if blockPtr, ok := f.blockPtrCache.Get(mapIndex); ok {
return blockPtr, nil
}
blockPtr, err := rawdb.ReadFilterMapBlockPtr(f.db, mapIndex)
if err != nil {
return 0, err
}
f.blockPtrCache.Add(mapIndex, blockPtr)
return blockPtr, nil
}
// storeMapBlockPtr stores the number of the block that generated the first log
// value entry of the given map.
func (f *FilterMaps) storeMapBlockPtr(batch ethdb.Batch, mapIndex uint32, blockPtr uint64) {
f.blockPtrCache.Add(mapIndex, blockPtr)
rawdb.WriteFilterMapBlockPtr(batch, mapIndex, blockPtr)
}
// deleteMapBlockPtr deletes the number of the block that generated the first log
// value entry of the given map.
func (f *FilterMaps) deleteMapBlockPtr(batch ethdb.Batch, mapIndex uint32) {
f.blockPtrCache.Remove(mapIndex)
rawdb.DeleteFilterMapBlockPtr(batch, mapIndex)
}
// addressValue returns the log value hash of a log emitting address.
func addressValue(address common.Address) common.Hash {
var result common.Hash
hasher := sha256.New()
hasher.Write(address[:])
hasher.Sum(result[:0])
return result
}
// topicValue returns the log value hash of a log topic.
func topicValue(topic common.Hash) common.Hash {
var result common.Hash
hasher := sha256.New()
hasher.Write(topic[:])
hasher.Sum(result[:0])
return result
}
// rowIndex returns the row index in which the given log value should be marked
// during the given epoch. Note that row assignments are re-shuffled in every
// epoch in order to ensure that even though there are always a few more heavily
// used rows due to very popular addresses and topics, these will not make search
// for other log values very expensive. Even if certain values are occasionally
// sorted into these heavy rows, in most of the epochs they are placed in average
// length rows.
func rowIndex(epochIndex uint32, logValue common.Hash) uint32 {
hasher := sha256.New()
hasher.Write(logValue[:])
var indexEnc [4]byte
binary.LittleEndian.PutUint32(indexEnc[:], epochIndex)
hasher.Write(indexEnc[:])
var hash common.Hash
hasher.Sum(hash[:0])
return binary.LittleEndian.Uint32(hash[:4]) % mapHeight
}
// columnIndex returns the column index that should be added to the appropriate
// row in order to place a mark for the next log value.
func columnIndex(lvIndex uint64, logValue common.Hash) uint32 {
x := uint32(lvIndex % valuesPerMap) // log value sub-index
transformHash := transformHash(uint32(lvIndex/valuesPerMap), logValue)
// apply column index transformation function
x += binary.LittleEndian.Uint32(transformHash[0:4])
x *= binary.LittleEndian.Uint32(transformHash[4:8])*2 + 1
x ^= binary.LittleEndian.Uint32(transformHash[8:12])
x *= binary.LittleEndian.Uint32(transformHash[12:16])*2 + 1
x += binary.LittleEndian.Uint32(transformHash[16:20])
x *= binary.LittleEndian.Uint32(transformHash[20:24])*2 + 1
x ^= binary.LittleEndian.Uint32(transformHash[24:28])
x *= binary.LittleEndian.Uint32(transformHash[28:32])*2 + 1
return x
}
// transformHash calculates a hash specific to a given map and log value hash
// that defines a bijective function on the uint32 range. This function is used
// to transform the log value sub-index (distance from the first index of the map)
// into a 32 bit column index, then applied in reverse when searching for potential
// matches for a given log value.
func transformHash(mapIndex uint32, logValue common.Hash) (result common.Hash) {
hasher := sha256.New()
hasher.Write(logValue[:])
var indexEnc [4]byte
binary.LittleEndian.PutUint32(indexEnc[:], mapIndex)
hasher.Write(indexEnc[:])
hasher.Sum(result[:0])
return
}
// potentialMatches returns the list of log value indices potentially matching
// the given log value hash in the range of the filter map the row belongs to.
// Note that the list of indices is always sorted and potential duplicates are
// removed. Though the column indices are stored in the same order they were
// added and therefore the true matches are automatically reverse transformed
// in the right order, false positives can ruin this property. Since these can
// only be separated from true matches after the combined pattern matching of the
// outputs of individual log value matchers and this pattern matcher assumes a
// sorted and duplicate-free list of indices, we should ensure these properties
// here.
func (row FilterRow) potentialMatches(mapIndex uint32, logValue common.Hash) potentialMatches {
results := make(potentialMatches, 0, 8)
transformHash := transformHash(mapIndex, logValue)
sub1 := binary.LittleEndian.Uint32(transformHash[0:4])
mul1 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[4:8])*2 + 1)
xor1 := binary.LittleEndian.Uint32(transformHash[8:12])
mul2 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[12:16])*2 + 1)
sub2 := binary.LittleEndian.Uint32(transformHash[16:20])
mul3 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[20:24])*2 + 1)
xor2 := binary.LittleEndian.Uint32(transformHash[24:28])
mul4 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[28:32])*2 + 1)
// perform reverse column index transformation on all column indices of the row.
// if a column index was added by the searched log value then the reverse
// transform will yield a valid log value sub-index of the given map.
// Column index is 32 bits long while there are 2**16 valid log value indices
// in the map's range, so this can also happen by accident with 1 in 2**16
// chance, in which case we have a false positive.
for _, columnIndex := range row {
if potentialSubIndex := (((((((columnIndex * mul4) ^ xor2) * mul3) - sub2) * mul2) ^ xor1) * mul1) - sub1; potentialSubIndex < valuesPerMap {
results = append(results, uint64(mapIndex)*valuesPerMap+uint64(potentialSubIndex))
}
}
sort.Sort(results)
// remove duplicates
j := 0
for i, match := range results {
if i == 0 || match != results[i-1] {
results[j] = results[i]
j++
}
}
return results[:j]
}
// potentialMatches is a strictly monotonically increasing list of log value
// indices in the range of a filter map that are potential matches for certain
// filter criteria.
// Note that nil is used as a wildcard and therefore means that all log value
// indices in the filter map range are potential matches. If there are no
// potential matches in the given map's range then an empty slice should be used.
type potentialMatches []uint64
// noMatches means there are no potential matches in a given filter map's range.
var noMatches = potentialMatches{}
func (p potentialMatches) Len() int { return len(p) }
func (p potentialMatches) Less(i, j int) bool { return p[i] < p[j] }
func (p potentialMatches) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// uint32ModInverse takes an odd 32 bit number and returns its modular
// multiplicative inverse (mod 2**32), meaning that for any uint32 x and odd y
// x * y * uint32ModInverse(y) == 1.
func uint32ModInverse(v uint32) uint32 {
if v&1 == 0 {
panic("uint32ModInverse called with even argument")
}
m := int64(1) << 32
m0 := m
a := int64(v)
x, y := int64(1), int64(0)
for a > 1 {
q := a / m
m, a = a%m, m
x, y = y, x-q*y
}
if x < 0 {
x += m0
}
return uint32(x)
}

@ -0,0 +1,618 @@
package filtermaps
import (
"errors"
"math"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
const (
startLvPointer = valuesPerMap << 31 // log value index assigned to init block
removedPointer = math.MaxUint64 // used in updateBatch to signal removed items
revertPointFrequency = 256 // frequency of revert points in database
cachedRevertPoints = 64 // revert points for most recent blocks in memory
)
// updateLoop initializes and updates the log index structure according to the
// canonical chain.
func (f *FilterMaps) updateLoop() {
headEventCh := make(chan core.ChainHeadEvent)
sub := f.chain.SubscribeChainHeadEvent(headEventCh)
defer sub.Unsubscribe()
head := f.chain.CurrentBlock()
if head == nil {
select {
case ev := <-headEventCh:
head = ev.Block.Header()
case ch := <-f.closeCh:
close(ch)
return
}
}
fmr := f.getRange()
var stop bool
wait := func() {
if stop {
return
}
select {
case ev := <-headEventCh:
head = ev.Block.Header()
case <-time.After(time.Second * 20):
// keep updating log index during syncing
head = f.chain.CurrentBlock()
case ch := <-f.closeCh:
close(ch)
stop = true
}
}
for !stop {
if !fmr.initialized {
f.tryInit(head)
fmr = f.getRange()
if !fmr.initialized {
wait()
continue
}
}
// log index is initialized
if fmr.headBlockHash != head.Hash() {
f.tryUpdateHead(head)
fmr = f.getRange()
if fmr.headBlockHash != head.Hash() {
wait()
continue
}
}
// log index head is at latest chain head; process tail blocks if possible
f.tryExtendTail(func() bool {
// return true if tail processing needs to be stopped
select {
case ev := <-headEventCh:
head = ev.Block.Header()
case ch := <-f.closeCh:
close(ch)
stop = true
return true
default:
head = f.chain.CurrentBlock()
}
// stop if there is a new chain head (always prioritize head updates)
return fmr.headBlockHash != head.Hash()
})
if fmr.headBlockHash == head.Hash() {
// if tail processing exited while there is no new head then no more
// tail blocks can be processed
wait()
}
}
}
// getRange returns the current filterMapsRange.
func (f *FilterMaps) getRange() filterMapsRange {
f.lock.RLock()
defer f.lock.RUnlock()
return f.filterMapsRange
}
// tryInit attempts to initialize the log index structure.
func (f *FilterMaps) tryInit(head *types.Header) {
receipts := f.chain.GetReceiptsByHash(head.Hash())
if receipts == nil {
log.Error("Could not retrieve block receipts for init block", "number", head.Number, "hash", head.Hash())
return
}
update := f.newUpdateBatch()
if err := update.initWithBlock(head, receipts); err != nil {
log.Error("Could not initialize log index", "error", err)
}
f.applyUpdateBatch(update)
}
// tryUpdateHead attempts to update the log index with a new head. If necessary,
// it reverts to a common ancestor with the old head before adding new block logs.
// If no suitable revert point is available (probably a reorg just after init)
// then it resets the index and tries to re-initialize with the new head.
func (f *FilterMaps) tryUpdateHead(newHead *types.Header) {
// iterate back from new head until the log index head or a revert point and
// collect headers of blocks to be added
var (
newHeaders []*types.Header
chainPtr = newHead
rp *revertPoint
)
for {
if rp == nil || chainPtr.Number.Uint64() < rp.blockNumber {
var err error
rp, err = f.getRevertPoint(chainPtr.Number.Uint64())
if err != nil {
log.Error("Error fetching revert point", "block number", chainPtr.Number.Uint64(), "error", err)
return
}
if rp == nil {
// there are no more revert points available so we should reset and re-initialize
log.Warn("No suitable revert point exists; re-initializing log index", "block number", newHead.Number.Uint64())
f.reset()
f.tryInit(newHead)
return
}
}
if chainPtr.Hash() == rp.blockHash {
// revert point found at an ancestor of the new head
break
}
// keep iterating backwards and collecting headers
newHeaders = append(newHeaders, chainPtr)
chainPtr = f.chain.GetHeader(chainPtr.ParentHash, chainPtr.Number.Uint64()-1)
if chainPtr == nil {
log.Error("Canonical header not found", "number", chainPtr.Number.Uint64()-1, "hash", chainPtr.ParentHash)
return
}
}
if rp.blockHash != f.headBlockHash {
if rp.blockNumber+128 <= f.headBlockNumber {
log.Warn("Rolling back log index", "old head", f.headBlockNumber, "new head", chainPtr.Number.Uint64())
}
if err := f.revertTo(rp); err != nil {
log.Error("Error applying revert point", "block number", chainPtr.Number.Uint64(), "error", err)
return
}
}
if newHeaders == nil {
return
}
// add logs of new blocks in reverse order
update := f.newUpdateBatch()
for i := len(newHeaders) - 1; i >= 0; i-- {
newHeader := newHeaders[i]
receipts := f.chain.GetReceiptsByHash(newHeader.Hash())
if receipts == nil {
log.Error("Could not retrieve block receipts for new block", "number", newHeader.Number, "hash", newHeader.Hash())
break
}
if err := update.addBlockToHead(newHeader, receipts); err != nil {
log.Error("Error adding new block", "number", newHeader.Number, "hash", newHeader.Hash(), "error", err)
break
}
if update.updatedRangeLength() >= mapsPerEpoch {
// limit the amount of data updated in a single batch
f.applyUpdateBatch(update)
update = f.newUpdateBatch()
}
}
f.applyUpdateBatch(update)
}
// tryExtendTail attempts to extend the log index backwards until it indexes the
// genesis block or cannot find more block receipts. Since this is a long process,
// stopFn is called after adding each tail block and if it returns true, the
// latest batch is written and the function returns.
func (f *FilterMaps) tryExtendTail(stopFn func() bool) {
fmr := f.getRange()
number, parentHash := fmr.tailBlockNumber, fmr.tailParentHash
if number == 0 {
return
}
update := f.newUpdateBatch()
lastTailEpoch := update.tailEpoch()
for number > 0 && !stopFn() {
if tailEpoch := update.tailEpoch(); tailEpoch < lastTailEpoch {
// limit the amount of data updated in a single batch
f.applyUpdateBatch(update)
update = f.newUpdateBatch()
lastTailEpoch = tailEpoch
}
newTail := f.chain.GetHeader(parentHash, number-1)
if newTail == nil {
log.Error("Tail header not found", "number", number-1, "hash", parentHash)
break
}
receipts := f.chain.GetReceiptsByHash(newTail.Hash())
if receipts == nil {
log.Error("Could not retrieve block receipts for tail block", "number", newTail.Number, "hash", newTail.Hash())
break
}
if err := update.addBlockToTail(newTail, receipts); err != nil {
log.Error("Error adding tail block", "number", newTail.Number, "hash", newTail.Hash(), "error", err)
break
}
number, parentHash = newTail.Number.Uint64(), newTail.ParentHash
}
f.applyUpdateBatch(update)
}
// updateBatch is a memory overlay collecting changes to the index log structure
// that can be written to the database in a single batch while the in-memory
// representations in FilterMaps are also updated.
type updateBatch struct {
filterMapsRange
maps map[uint32]*filterMap // nil rows are unchanged
getFilterMapRow func(mapIndex, rowIndex uint32) (FilterRow, error)
blockLvPointer map[uint64]uint64 // removedPointer means delete
mapBlockPtr map[uint32]uint64 // removedPointer means delete
revertPoints map[uint64]*revertPoint
firstMap, afterLastMap uint32
}
// newUpdateBatch creates a new updateBatch.
func (f *FilterMaps) newUpdateBatch() *updateBatch {
f.lock.RLock()
defer f.lock.RUnlock()
return &updateBatch{
filterMapsRange: f.filterMapsRange,
maps: make(map[uint32]*filterMap),
getFilterMapRow: f.getFilterMapRow,
blockLvPointer: make(map[uint64]uint64),
mapBlockPtr: make(map[uint32]uint64),
revertPoints: make(map[uint64]*revertPoint),
}
}
// applyUpdateBatch writes creates a batch and writes all changes to the database
// and also updates the in-memory representations of log index data.
func (f *FilterMaps) applyUpdateBatch(u *updateBatch) {
f.lock.Lock()
defer f.lock.Unlock()
batch := f.db.NewBatch()
// write or remove block to log value index pointers
for blockNumber, lvPointer := range u.blockLvPointer {
if lvPointer != removedPointer {
f.storeBlockLvPointer(batch, blockNumber, lvPointer)
} else {
f.deleteBlockLvPointer(batch, blockNumber)
}
}
// write or remove filter map to block number pointers
for mapIndex, blockNumber := range u.mapBlockPtr {
if blockNumber != removedPointer {
f.storeMapBlockPtr(batch, mapIndex, blockNumber)
} else {
f.deleteMapBlockPtr(batch, mapIndex)
}
}
// write filter map rows
for rowIndex := uint32(0); rowIndex < mapHeight; rowIndex++ {
for mapIndex := u.firstMap; mapIndex < u.afterLastMap; mapIndex++ {
if fm := u.maps[mapIndex]; fm != nil {
if row := (*fm)[rowIndex]; row != nil {
f.storeFilterMapRow(batch, mapIndex, rowIndex, row)
}
}
}
}
// delete removed revert points from the database
if u.headBlockNumber < f.headBlockNumber {
for b := u.headBlockNumber + 1; b <= f.headBlockNumber; b++ {
delete(f.revertPoints, b)
if b%revertPointFrequency == 0 {
rawdb.DeleteRevertPoint(batch, b)
}
}
}
// delete removed revert points from the memory cache
if u.headBlockNumber > f.headBlockNumber {
for b := f.headBlockNumber + 1; b <= u.headBlockNumber; b++ {
delete(f.revertPoints, b-cachedRevertPoints)
}
}
// store new revert points in database and/or memory
for b, rp := range u.revertPoints {
if b+cachedRevertPoints > u.headBlockNumber {
f.revertPoints[b] = rp
}
if b%revertPointFrequency == 0 {
rawdb.WriteRevertPoint(batch, b, &rawdb.RevertPoint{
BlockHash: rp.blockHash,
MapIndex: rp.mapIndex,
RowLength: rp.rowLength[:],
})
}
}
// update filterMapsRange
f.setRange(batch, u.filterMapsRange)
if err := batch.Write(); err != nil {
log.Crit("Could not write update batch", "error", err)
}
log.Info("Log index block range updated", "tail", u.tailBlockNumber, "head", u.headBlockNumber, "log values", u.headLvPointer-u.tailLvPointer)
}
// updatedRangeLength returns the lenght of the updated filter map range.
func (u *updateBatch) updatedRangeLength() uint32 {
return u.afterLastMap - u.firstMap
}
// tailEpoch returns the tail epoch index.
func (u *updateBatch) tailEpoch() uint32 {
return uint32(u.tailLvPointer >> (logValuesPerMap + logMapsPerEpoch))
}
// getRowPtr returns a pointer to a FilterRow that can be modified. If the batch
// did not have a modified version of the given row yet, it is retrieved using the
// request function from the backing FilterMaps cache or database and copied
// before modification.
func (u *updateBatch) getRowPtr(mapIndex, rowIndex uint32) (*FilterRow, error) {
fm := u.maps[mapIndex]
if fm == nil {
fm = new(filterMap)
u.maps[mapIndex] = fm
if mapIndex < u.firstMap || u.afterLastMap == 0 {
u.firstMap = mapIndex
}
if mapIndex >= u.afterLastMap {
u.afterLastMap = mapIndex + 1
}
}
rowPtr := &(*fm)[rowIndex]
if *rowPtr == nil {
if filterRow, err := u.getFilterMapRow(mapIndex, rowIndex); err == nil {
// filterRow is read only, copy before write
*rowPtr = make(FilterRow, len(filterRow), len(filterRow)+8)
copy(*rowPtr, filterRow)
} else {
return nil, err
}
}
return rowPtr, nil
}
// initWithBlock initializes the log index with the given block as head.
func (u *updateBatch) initWithBlock(header *types.Header, receipts types.Receipts) error {
if u.initialized {
return errors.New("already initialized")
}
u.initialized = true
u.headLvPointer, u.tailLvPointer = startLvPointer, startLvPointer
u.headBlockNumber, u.tailBlockNumber = header.Number.Uint64()-1, header.Number.Uint64() //TODO genesis?
u.headBlockHash, u.tailParentHash = header.ParentHash, header.ParentHash
u.addBlockToHead(header, receipts)
return nil
}
// addValueToHead adds a single log value to the head of the log index.
func (u *updateBatch) addValueToHead(logValue common.Hash) error {
mapIndex := uint32(u.headLvPointer >> logValuesPerMap)
rowPtr, err := u.getRowPtr(mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, logValue))
if err != nil {
return err
}
column := columnIndex(u.headLvPointer, logValue)
*rowPtr = append(*rowPtr, column)
u.headLvPointer++
return nil
}
// addBlockToHead adds the logs of the given block to the head of the log index.
// It also adds block to log value index and filter map to block pointers and
// a new revert point.
func (u *updateBatch) addBlockToHead(header *types.Header, receipts types.Receipts) error {
if !u.initialized {
return errors.New("not initialized")
}
if header.ParentHash != u.headBlockHash {
return errors.New("addBlockToHead parent mismatch")
}
number := header.Number.Uint64()
u.blockLvPointer[number] = u.headLvPointer
startMap := uint32((u.headLvPointer + valuesPerMap - 1) >> logValuesPerMap)
if err := iterateReceipts(receipts, u.addValueToHead); err != nil {
return err
}
stopMap := uint32((u.headLvPointer + valuesPerMap - 1) >> logValuesPerMap)
for m := startMap; m < stopMap; m++ {
u.mapBlockPtr[m] = number
}
u.headBlockNumber, u.headBlockHash = number, header.Hash()
if (u.headBlockNumber-cachedRevertPoints)%revertPointFrequency != 0 {
delete(u.revertPoints, u.headBlockNumber-cachedRevertPoints)
}
if rp, err := u.makeRevertPoint(); err != nil {
return err
} else if rp != nil {
u.revertPoints[u.headBlockNumber] = rp
}
return nil
}
// addValueToTail adds a single log value to the tail of the log index.
func (u *updateBatch) addValueToTail(logValue common.Hash) error {
if u.tailLvPointer == 0 {
return errors.New("tail log value pointer underflow")
}
u.tailLvPointer--
mapIndex := uint32(u.tailLvPointer >> logValuesPerMap)
rowPtr, err := u.getRowPtr(mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, logValue))
if err != nil {
return err
}
column := columnIndex(u.tailLvPointer, logValue)
*rowPtr = append(*rowPtr, 0)
copy((*rowPtr)[1:], (*rowPtr)[:len(*rowPtr)-1])
(*rowPtr)[0] = column
return nil
}
// addBlockToTail adds the logs of the given block to the tail of the log index.
// It also adds block to log value index and filter map to block pointers.
func (u *updateBatch) addBlockToTail(header *types.Header, receipts types.Receipts) error {
if !u.initialized {
return errors.New("not initialized")
}
if header.Hash() != u.tailParentHash {
return errors.New("addBlockToTail parent mismatch")
}
number := header.Number.Uint64()
stopMap := uint32((u.tailLvPointer + valuesPerMap - 1) >> logValuesPerMap)
var cnt int
if err := iterateReceiptsReverse(receipts, func(lv common.Hash) error {
cnt++
return u.addValueToTail(lv)
}); err != nil {
return err
}
startMap := uint32(u.tailLvPointer >> logValuesPerMap)
for m := startMap; m < stopMap; m++ {
u.mapBlockPtr[m] = number
}
u.blockLvPointer[number] = u.tailLvPointer
u.tailBlockNumber, u.tailParentHash = number, header.ParentHash
return nil
}
// iterateReceipts iterates the given block receipts, generates log value hashes
// and passes them to the given callback function as a parameter.
func iterateReceipts(receipts types.Receipts, valueCb func(common.Hash) error) error {
for _, receipt := range receipts {
for _, log := range receipt.Logs {
if err := valueCb(addressValue(log.Address)); err != nil {
return err
}
for _, topic := range log.Topics {
if err := valueCb(topicValue(topic)); err != nil {
return err
}
}
}
}
return nil
}
// iterateReceiptsReverse iterates the given block receipts, generates log value
// hashes in reverse order and passes them to the given callback function as a
// parameter.
func iterateReceiptsReverse(receipts types.Receipts, valueCb func(common.Hash) error) error {
for i := len(receipts) - 1; i >= 0; i-- {
logs := receipts[i].Logs
for j := len(logs) - 1; j >= 0; j-- {
log := logs[j]
for k := len(log.Topics) - 1; k >= 0; k-- {
if err := valueCb(topicValue(log.Topics[k])); err != nil {
return err
}
}
if err := valueCb(addressValue(log.Address)); err != nil {
return err
}
}
}
return nil
}
// revertPoint can be used to revert the log index to a certain head block.
type revertPoint struct {
blockNumber uint64
blockHash common.Hash
mapIndex uint32
rowLength [mapHeight]uint
}
// makeRevertPoint creates a new revertPoint.
func (u *updateBatch) makeRevertPoint() (*revertPoint, error) {
rp := &revertPoint{
blockNumber: u.headBlockNumber,
blockHash: u.headBlockHash,
mapIndex: uint32(u.headLvPointer >> logValuesPerMap),
}
if u.tailLvPointer > uint64(rp.mapIndex)<<logValuesPerMap {
return nil, nil
}
for i := range rp.rowLength[:] {
var row FilterRow
if m := u.maps[rp.mapIndex]; m != nil {
row = (*m)[i]
}
if row == nil {
var err error
row, err = u.getFilterMapRow(rp.mapIndex, uint32(i))
if err != nil {
return nil, err
}
}
rp.rowLength[i] = uint(len(row))
}
return rp, nil
}
// getRevertPoint retrieves the latest revert point at or before the given block
// number from memory cache or from the database if available. If no such revert
// point is available then it returns no result and no error.
func (f *FilterMaps) getRevertPoint(blockNumber uint64) (*revertPoint, error) {
if blockNumber > f.headBlockNumber {
blockNumber = f.headBlockNumber
}
if rp := f.revertPoints[blockNumber]; rp != nil {
return rp, nil
}
blockNumber -= blockNumber % revertPointFrequency
rps, err := rawdb.ReadRevertPoint(f.db, blockNumber)
if err != nil {
return nil, err
}
if rps == nil {
return nil, nil
}
if len(rps.RowLength) != mapHeight {
return nil, errors.New("invalid number of rows in stored revert point")
}
rp := &revertPoint{
blockNumber: blockNumber,
blockHash: rps.BlockHash,
mapIndex: rps.MapIndex,
}
copy(rp.rowLength[:], rps.RowLength)
return rp, nil
}
// revertTo reverts the log index to the given revert point.
func (f *FilterMaps) revertTo(rp *revertPoint) error {
batch := f.db.NewBatch()
afterLastMap := uint32((f.headLvPointer + valuesPerMap - 1) >> logValuesPerMap)
if rp.mapIndex >= afterLastMap {
return errors.New("cannot revert (head map behind revert point)")
}
lvPointer := uint64(rp.mapIndex) << logValuesPerMap
for rowIndex, rowLen := range rp.rowLength[:] {
rowIndex := uint32(rowIndex)
row, err := f.getFilterMapRow(rp.mapIndex, rowIndex)
if err != nil {
return err
}
if uint(len(row)) < rowLen {
return errors.New("cannot revert (row too short)")
}
if uint(len(row)) > rowLen {
f.storeFilterMapRow(batch, rp.mapIndex, rowIndex, row[:rowLen])
}
for mapIndex := rp.mapIndex + 1; mapIndex < afterLastMap; mapIndex++ {
f.storeFilterMapRow(batch, mapIndex, rowIndex, emptyRow)
}
lvPointer += uint64(rowLen)
}
for mapIndex := rp.mapIndex + 1; mapIndex < afterLastMap; mapIndex++ {
f.deleteMapBlockPtr(batch, mapIndex)
}
for blockNumber := rp.blockNumber + 1; blockNumber <= f.headBlockNumber; blockNumber++ {
f.deleteBlockLvPointer(batch, blockNumber)
}
newRange := f.filterMapsRange
newRange.headLvPointer = lvPointer
newRange.headBlockNumber = rp.blockNumber
newRange.headBlockHash = rp.blockHash
f.setRange(batch, newRange)
if err := batch.Write(); err != nil {
log.Crit("Could not write update batch", "error", err)
}
return nil
}

@ -0,0 +1,500 @@
package filtermaps
import (
"context"
"math"
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
// MatcherBackend defines the functions required for searching in the log index
// data structure. It is currently implemented by FilterMapsMatcherBackend but
// once EIP-7745 is implemented and active, these functions can also be trustlessly
// served by a remote prover.
type MatcherBackend interface {
GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error)
GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error)
GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error)
}
// GetPotentialMatches returns a list of logs that are potential matches for the
// given filter criteria. Note that the returned list may still contain false
// positives.
//TODO add protection against reorgs during search
func GetPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, error) {
// find the log value index range to search
firstIndex, err := backend.GetBlockLvPointer(ctx, firstBlock)
if err != nil {
return nil, err
}
lastIndex, err := backend.GetBlockLvPointer(ctx, lastBlock+1)
if err != nil {
return nil, err
}
if lastIndex > 0 {
lastIndex--
}
firstMap, lastMap := uint32(firstIndex>>logValuesPerMap), uint32(lastIndex>>logValuesPerMap)
firstEpoch, lastEpoch := firstMap>>logMapsPerEpoch, lastMap>>logMapsPerEpoch
// build matcher according to the given filter criteria
matchers := make([]matcher, len(topics)+1)
// matchAddress signals a match when there is a match for any of the given
// addresses.
// If the list of addresses is empty then it creates a "wild card" matcher
// that signals every index as a potential match.
matchAddress := make(matchAny, len(addresses))
for i, address := range addresses {
matchAddress[i] = &singleMatcher{backend: backend, value: addressValue(address)}
}
matchers[0] = matchAddress
for i, topicList := range topics {
// matchTopic signals a match when there is a match for any of the topics
// specified for the given position (topicList).
// If topicList is empty then it creates a "wild card" matcher that signals
// every index as a potential match.
matchTopic := make(matchAny, len(topicList))
for j, topic := range topicList {
matchTopic[j] = &singleMatcher{backend: backend, value: topicValue(topic)}
}
matchers[i+1] = matchTopic
}
// matcher is the final sequence matcher that signals a match when all underlying
// matchers signal a match for consecutive log value indices.
matcher := newMatchSequence(matchers)
// processEpoch returns the potentially matching logs from the given epoch.
processEpoch := func(epochIndex uint32) ([]*types.Log, error) {
var logs []*types.Log
// create a list of map indices to process
fm, lm := epochIndex<<logMapsPerEpoch, (epochIndex+1)<<logMapsPerEpoch-1
if fm < firstMap {
fm = firstMap
}
if lm > lastMap {
lm = lastMap
}
//
mapIndices := make([]uint32, lm+1-fm)
for i := range mapIndices {
mapIndices[i] = fm + uint32(i)
}
// find potential matches
matches, err := matcher.getMatches(ctx, mapIndices)
if err != nil {
return logs, err
}
// get the actual logs located at the matching log value indices
for _, m := range matches {
mlogs, err := getLogsFromMatches(ctx, backend, firstIndex, lastIndex, m)
if err != nil {
return logs, err
}
logs = append(logs, mlogs...)
}
return logs, nil
}
type task struct {
epochIndex uint32
logs []*types.Log
err error
done chan struct{}
}
taskCh := make(chan *task)
var wg sync.WaitGroup
defer func() {
close(taskCh)
wg.Wait()
}()
worker := func() {
for task := range taskCh {
if task == nil {
break
}
task.logs, task.err = processEpoch(task.epochIndex)
close(task.done)
}
wg.Done()
return
}
for i := 0; i < 4; i++ {
wg.Add(1)
go worker()
}
var logs []*types.Log
// startEpoch is the next task to send whenever a worker can accept it.
// waitEpoch is the next task we are waiting for to finish in order to append
// results in the correct order.
startEpoch, waitEpoch := firstEpoch, firstEpoch
tasks := make(map[uint32]*task)
tasks[startEpoch] = &task{epochIndex: startEpoch, done: make(chan struct{})}
for waitEpoch <= lastEpoch {
select {
case taskCh <- tasks[startEpoch]:
startEpoch++
if startEpoch <= lastEpoch {
if tasks[startEpoch] == nil {
tasks[startEpoch] = &task{epochIndex: startEpoch, done: make(chan struct{})}
}
}
case <-tasks[waitEpoch].done:
logs = append(logs, tasks[waitEpoch].logs...)
if err := tasks[waitEpoch].err; err != nil {
return logs, err
}
delete(tasks, waitEpoch)
waitEpoch++
if waitEpoch <= lastEpoch {
if tasks[waitEpoch] == nil {
tasks[waitEpoch] = &task{epochIndex: waitEpoch, done: make(chan struct{})}
}
}
}
}
return logs, nil
}
// getLogsFromMatches returns the list of potentially matching logs located at
// the given list of matching log indices. Matches outside the firstIndex to
// lastIndex range are not returned.
func getLogsFromMatches(ctx context.Context, backend MatcherBackend, firstIndex, lastIndex uint64, matches potentialMatches) ([]*types.Log, error) {
var logs []*types.Log
for _, match := range matches {
if match < firstIndex || match > lastIndex {
continue
}
log, err := backend.GetLogByLvIndex(ctx, match)
if err != nil {
return logs, err
}
if log != nil {
logs = append(logs, log)
}
}
return logs, nil
}
// matcher interface is defined so that individual address/topic matchers can be
// combined into a pattern matcher (see matchAny and matchSequence).
type matcher interface {
// getMatches takes a list of map indices and returns an equal number of
// potentialMatches, one for each corresponding map index.
// Note that the map index list is typically a list of the potentially
// interesting maps from an epoch, plus sometimes the first map of the next
// epoch if it is required for sequence matching.
getMatches(ctx context.Context, mapIndices []uint32) ([]potentialMatches, error)
}
// singleMatcher implements matcher by returning matches for a single log value hash.
type singleMatcher struct {
backend MatcherBackend
value common.Hash
}
// getMatches implements matcher
func (s *singleMatcher) getMatches(ctx context.Context, mapIndices []uint32) ([]potentialMatches, error) {
results := make([]potentialMatches, len(mapIndices))
for i, mapIndex := range mapIndices {
filterRow, err := s.backend.GetFilterMapRow(ctx, mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, s.value))
if err != nil {
return nil, err
}
results[i] = filterRow.potentialMatches(mapIndex, s.value)
}
return results, nil
}
// matchAny combinines a set of matchers and returns a match for every position
// where any of the underlying matchers signaled a match. A zero-length matchAny
// acts as a "wild card" that signals a potential match at every position.
type matchAny []matcher
// getMatches implements matcher
func (m matchAny) getMatches(ctx context.Context, mapIndices []uint32) ([]potentialMatches, error) {
if len(m) == 0 {
// return "wild card" results (potentialMatches(nil) is interpreted as a
// potential match at every log value index of the map).
return make([]potentialMatches, len(mapIndices)), nil
}
if len(m) == 1 {
return m[0].getMatches(ctx, mapIndices)
}
matches := make([][]potentialMatches, len(m))
for i, matcher := range m {
var err error
if matches[i], err = matcher.getMatches(ctx, mapIndices); err != nil {
return nil, err
}
}
results := make([]potentialMatches, len(mapIndices))
merge := make([]potentialMatches, len(m))
for i := range results {
for j := range merge {
merge[j] = matches[j][i]
}
results[i] = mergeResults(merge)
}
return results, nil
}
// mergeResults merges multiple lists of matches into a single one, preserving
// ascending order and filtering out any duplicates.
func mergeResults(results []potentialMatches) potentialMatches {
if len(results) == 0 {
return nil
}
var sumLen int
for _, res := range results {
if res == nil {
// nil is a wild card; all indices in map range are potential matches
return nil
}
sumLen += len(res)
}
merged := make(potentialMatches, 0, sumLen)
for {
best := -1
for i, res := range results {
if len(res) == 0 {
continue
}
if best < 0 || res[0] < results[best][0] {
best = i
}
}
if best < 0 {
return merged
}
if len(merged) == 0 || results[best][0] > merged[len(merged)-1] {
merged = append(merged, results[best][0])
}
results[best] = results[best][1:]
}
}
// matchSequence combines two matchers, a "base" and a "next" matcher with a
// positive integer offset so that the resulting matcher signals a match at log
// value index X when the base matcher returns a match at X and the next matcher
// gives a match at X+offset. Note that matchSequence can be used recursively to
// detect any log value sequence.
type matchSequence struct {
base, next matcher
offset uint64
// *EmptyRate == totalCount << 32 + emptyCount (atomically accessed)
baseEmptyRate, nextEmptyRate uint64
}
// newMatchSequence creates a recursive sequence matcher from a list of underlying
// matchers. The resulting matcher signals a match at log value index X when each
// underlying matcher matchers[i] returns a match at X+i.
func newMatchSequence(matchers []matcher) matcher {
if len(matchers) == 0 {
panic("zero length sequence matchers are not allowed")
}
if len(matchers) == 1 {
return matchers[0]
}
return &matchSequence{
base: newMatchSequence(matchers[:len(matchers)-1]),
next: matchers[len(matchers)-1],
offset: uint64(len(matchers) - 1),
}
}
// getMatches implements matcher
func (m *matchSequence) getMatches(ctx context.Context, mapIndices []uint32) ([]potentialMatches, error) {
// decide whether to evaluate base or next matcher first
baseEmptyRate := atomic.LoadUint64(&m.baseEmptyRate)
nextEmptyRate := atomic.LoadUint64(&m.nextEmptyRate)
baseTotal, baseEmpty := baseEmptyRate>>32, uint64(uint32(baseEmptyRate))
nextTotal, nextEmpty := nextEmptyRate>>32, uint64(uint32(nextEmptyRate))
baseFirst := baseEmpty*nextTotal >= nextEmpty*baseTotal/2
var (
baseRes, nextRes []potentialMatches
baseIndices []uint32
)
if baseFirst {
// base first mode; request base matcher
baseIndices = mapIndices
var err error
baseRes, err = m.base.getMatches(ctx, baseIndices)
if err != nil {
return nil, err
}
}
// determine set of indices to request from next matcher
nextIndices := make([]uint32, 0, len(mapIndices)*3/2)
lastAdded := uint32(math.MaxUint32)
for i, mapIndex := range mapIndices {
if baseFirst && baseRes[i] != nil && len(baseRes[i]) == 0 {
// do not request map index from next matcher if no results from base matcher
continue
}
if lastAdded != mapIndex {
nextIndices = append(nextIndices, mapIndex)
lastAdded = mapIndex
}
if !baseFirst || baseRes[i] == nil || baseRes[i][len(baseRes[i])-1] >= (uint64(mapIndex+1)<<logValuesPerMap)-m.offset {
nextIndices = append(nextIndices, mapIndex+1)
lastAdded = mapIndex + 1
}
}
if len(nextIndices) != 0 {
// request next matcher
var err error
nextRes, err = m.next.getMatches(ctx, nextIndices)
if err != nil {
return nil, err
}
}
if !baseFirst {
// next first mode; determine set of indices to request from base matcher
baseIndices = make([]uint32, 0, len(mapIndices))
var nextPtr int
for _, mapIndex := range mapIndices {
// find corresponding results in nextRes
for nextPtr+1 < len(nextIndices) && nextIndices[nextPtr] < mapIndex {
nextPtr++
}
if nextPtr+1 >= len(nextIndices) {
break
}
if nextIndices[nextPtr] != mapIndex || nextIndices[nextPtr+1] != mapIndex+1 {
panic("invalid nextIndices")
}
next1, next2 := nextRes[nextPtr], nextRes[nextPtr+1]
if next1 == nil || (len(next1) > 0 && next1[len(next1)-1] >= (uint64(mapIndex)<<logValuesPerMap)+m.offset) ||
next2 == nil || (len(next2) > 0 && next2[0] < (uint64(mapIndex+1)<<logValuesPerMap)+m.offset) {
baseIndices = append(baseIndices, mapIndex)
}
}
if len(baseIndices) != 0 {
// request base matcher
var err error
baseRes, err = m.base.getMatches(ctx, baseIndices)
if err != nil {
return nil, err
}
}
}
// all potential matches of base and next matchers obtained, update empty rates
for _, res := range baseRes {
if res != nil && len(res) == 0 {
atomic.AddUint64(&m.baseEmptyRate, 0x100000001)
} else {
atomic.AddUint64(&m.baseEmptyRate, 0x100000000)
}
}
for _, res := range nextRes {
if res != nil && len(res) == 0 {
atomic.AddUint64(&m.nextEmptyRate, 0x100000001)
} else {
atomic.AddUint64(&m.nextEmptyRate, 0x100000000)
}
}
// define iterator functions to find base/next matcher results by map index
var basePtr int
baseResult := func(mapIndex uint32) potentialMatches {
for basePtr < len(baseIndices) && baseIndices[basePtr] <= mapIndex {
if baseIndices[basePtr] == mapIndex {
return baseRes[basePtr]
}
basePtr++
}
return noMatches
}
var nextPtr int
nextResult := func(mapIndex uint32) potentialMatches {
for nextPtr < len(nextIndices) && nextIndices[nextPtr] <= mapIndex {
if nextIndices[nextPtr] == mapIndex {
return nextRes[nextPtr]
}
nextPtr++
}
return noMatches
}
// match corresponding base and next matcher results
results := make([]potentialMatches, len(mapIndices))
for i, mapIndex := range mapIndices {
results[i] = matchSequenceResults(mapIndex, m.offset, baseResult(mapIndex), nextResult(mapIndex), nextResult(mapIndex+1))
}
return results, nil
}
// matchSequenceResults returns a list of sequence matches for the given mapIndex
// and offset based on the base matcher's results at mapIndex and the next matcher's
// results at mapIndex and mapIndex+1. Note that acquiring nextNextRes may be
// skipped and it can be substituted with an empty list if baseRes has no potential
// matches that could be sequence matched with anything that could be in nextNextRes.
func matchSequenceResults(mapIndex uint32, offset uint64, baseRes, nextRes, nextNextRes potentialMatches) potentialMatches {
if nextRes == nil || (baseRes != nil && len(baseRes) == 0) {
// if nextRes is a wild card or baseRes is empty then the sequence matcher
// result equals baseRes.
return baseRes
}
if len(nextRes) > 0 {
// discard items from nextRes whose corresponding base matcher results
// with the negative offset applied would be located at mapIndex-1.
start := 0
for start < len(nextRes) && nextRes[start] < uint64(mapIndex)<<logValuesPerMap+offset {
start++
}
nextRes = nextRes[start:]
}
if len(nextNextRes) > 0 {
// discard items from nextNextRes whose corresponding base matcher results
// with the negative offset applied would still be located at mapIndex+1.
stop := 0
for stop < len(nextNextRes) && nextNextRes[stop] < uint64(mapIndex+1)<<logValuesPerMap+offset {
stop++
}
nextNextRes = nextNextRes[:stop]
}
maxLen := len(nextRes) + len(nextNextRes)
if maxLen == 0 {
return nextRes
}
if len(baseRes) < maxLen {
maxLen = len(baseRes)
}
// iterate through baseRes, nextRes and nextNextRes and collect matching results.
matchedRes := make(potentialMatches, 0, maxLen)
for _, nextRes := range []potentialMatches{nextRes, nextNextRes} {
if baseRes != nil {
for len(nextRes) > 0 && len(baseRes) > 0 {
if nextRes[0] > baseRes[0]+offset {
baseRes = baseRes[1:]
} else if nextRes[0] < baseRes[0]+offset {
nextRes = nextRes[1:]
} else {
matchedRes = append(matchedRes, baseRes[0])
baseRes = baseRes[1:]
nextRes = nextRes[1:]
}
}
} else {
// baseRes is a wild card so just return next matcher results with
// negative offset.
for len(nextRes) > 0 {
matchedRes = append(matchedRes, nextRes[0]-offset)
nextRes = nextRes[1:]
}
}
}
return matchedRes
}

@ -18,6 +18,8 @@ package rawdb
import (
"bytes"
"encoding/binary"
"errors"
"math/big"
"github.com/ethereum/go-ethereum/common"
@ -179,3 +181,207 @@ func DeleteBloombits(db ethdb.Database, bit uint, from uint64, to uint64) {
log.Crit("Failed to delete bloom bits", "err", it.Error())
}
}
var emptyRow = []uint32{}
// ReadFilterMapRow retrieves a filter map row at the given mapRowIndex
// (see filtermaps.mapRowIndex for the storage index encoding).
// Note that zero length rows are not stored in the database and therefore all
// non-existent entries are interpreted as empty rows and return no error.
// Also note that the mapRowIndex indexing scheme is the same as the one
// proposed in EIP-7745 for tree-hashing the filter map structure and for the
// same data proximity reasons it is also suitable for database representation.
// See also:
// https://eips.ethereum.org/EIPS/eip-7745#hash-tree-structure
func ReadFilterMapRow(db ethdb.KeyValueReader, mapRowIndex uint64) ([]uint32, error) {
key := filterMapRowKey(mapRowIndex)
has, err := db.Has(key)
if err != nil {
return nil, err
}
if !has {
return emptyRow, nil
}
encRow, err := db.Get(key)
if err != nil {
return nil, err
}
if len(encRow)&3 != 0 {
return nil, errors.New("Invalid encoded filter row length")
}
row := make([]uint32, len(encRow)/4)
for i := range row {
row[i] = binary.LittleEndian.Uint32(encRow[i*4 : (i+1)*4])
}
return row, nil
}
// WriteFilterMapRow stores a filter map row at the given mapRowIndex or deletes
// any existing entry if the row is empty.
func WriteFilterMapRow(db ethdb.KeyValueWriter, mapRowIndex uint64, row []uint32) {
var err error
if len(row) > 0 {
encRow := make([]byte, len(row)*4)
for i, c := range row {
binary.LittleEndian.PutUint32(encRow[i*4:(i+1)*4], c)
}
err = db.Put(filterMapRowKey(mapRowIndex), encRow)
} else {
err = db.Delete(filterMapRowKey(mapRowIndex))
}
if err != nil {
log.Crit("Failed to store filter map row", "err", err)
}
}
// ReadFilterMapBlockPtr retrieves the number of the block that generated the
// first log value entry of the given map.
func ReadFilterMapBlockPtr(db ethdb.KeyValueReader, mapIndex uint32) (uint64, error) {
encPtr, err := db.Get(filterMapBlockPtrKey(mapIndex))
if err != nil {
return 0, err
}
if len(encPtr) != 8 {
return 0, errors.New("Invalid block number encoding")
}
return binary.BigEndian.Uint64(encPtr), nil
}
// WriteFilterMapBlockPtr stores the number of the block that generated the
// first log value entry of the given map.
func WriteFilterMapBlockPtr(db ethdb.KeyValueWriter, mapIndex uint32, blockNumber uint64) {
var encPtr [8]byte
binary.BigEndian.PutUint64(encPtr[:], blockNumber)
if err := db.Put(filterMapBlockPtrKey(mapIndex), encPtr[:]); err != nil {
log.Crit("Failed to store filter map block pointer", "err", err)
}
}
// DeleteFilterMapBlockPtr deletes the number of the block that generated the
// first log value entry of the given map.
func DeleteFilterMapBlockPtr(db ethdb.KeyValueWriter, mapIndex uint32) {
if err := db.Delete(filterMapBlockPtrKey(mapIndex)); err != nil {
log.Crit("Failed to delete filter map block pointer", "err", err)
}
}
// ReadBlockLvPointer retrieves the starting log value index where the log values
// generated by the given block are located.
func ReadBlockLvPointer(db ethdb.KeyValueReader, blockNumber uint64) (uint64, error) {
encPtr, err := db.Get(blockLVKey(blockNumber))
if err != nil {
return 0, err
}
if len(encPtr) != 8 {
return 0, errors.New("Invalid log value pointer encoding")
}
return binary.BigEndian.Uint64(encPtr), nil
}
// WriteBlockLvPointer stores the starting log value index where the log values
// generated by the given block are located.
func WriteBlockLvPointer(db ethdb.KeyValueWriter, blockNumber, lvPointer uint64) {
var encPtr [8]byte
binary.BigEndian.PutUint64(encPtr[:], lvPointer)
if err := db.Put(blockLVKey(blockNumber), encPtr[:]); err != nil {
log.Crit("Failed to store block log value pointer", "err", err)
}
}
// DeleteBlockLvPointer deletes the starting log value index where the log values
// generated by the given block are located.
func DeleteBlockLvPointer(db ethdb.KeyValueWriter, blockNumber uint64) {
if err := db.Delete(blockLVKey(blockNumber)); err != nil {
log.Crit("Failed to delete block log value pointer", "err", err)
}
}
// FilterMapsRange is a storage representation of the block range covered by the
// filter maps structure and the corresponting log value index range.
type FilterMapsRange struct {
Initialized bool
HeadLvPointer, TailLvPointer uint64
HeadBlockNumber, TailBlockNumber uint64
HeadBlockHash, TailParentHash common.Hash
}
// ReadFilterMapsRange retrieves the filter maps range data. Note that if the
// database entry is not present, that is interpreted as a valid non-initialized
// state and returns a blank range structure and no error.
func ReadFilterMapsRange(db ethdb.KeyValueReader) (FilterMapsRange, error) {
if has, err := db.Has(filterMapsRangeKey); !has || err != nil {
return FilterMapsRange{}, err
}
encRange, err := db.Get(filterMapsRangeKey)
if err != nil {
return FilterMapsRange{}, err
}
var fmRange FilterMapsRange
if err := rlp.DecodeBytes(encRange, &fmRange); err != nil {
return FilterMapsRange{}, err
}
return fmRange, err
}
// WriteFilterMapsRange stores the filter maps range data.
func WriteFilterMapsRange(db ethdb.KeyValueWriter, fmRange FilterMapsRange) {
encRange, err := rlp.EncodeToBytes(&fmRange)
if err != nil {
log.Crit("Failed to encode filter maps range", "err", err)
}
if err := db.Put(filterMapsRangeKey, encRange); err != nil {
log.Crit("Failed to store filter maps range", "err", err)
}
}
// DeleteFilterMapsRange deletes the filter maps range data which is interpreted
// as reverting to the un-initialized state.
func DeleteFilterMapsRange(db ethdb.KeyValueWriter) {
if err := db.Delete(filterMapsRangeKey); err != nil {
log.Crit("Failed to delete filter maps range", "err", err)
}
}
// RevertPoint is the storage representation of a filter maps revert point.
type RevertPoint struct {
BlockHash common.Hash
MapIndex uint32
RowLength []uint
}
// ReadRevertPoint retrieves the revert point for the given block number if
// present. Note that revert points may or may not exist for any block number
// and a non-existent entry causes no error.
func ReadRevertPoint(db ethdb.KeyValueReader, blockNumber uint64) (*RevertPoint, error) {
key := revertPointKey(blockNumber)
if has, err := db.Has(key); !has || err != nil {
return nil, err
}
enc, err := db.Get(key)
if err != nil {
return nil, err
}
rp := new(RevertPoint)
if err := rlp.DecodeBytes(enc, rp); err != nil {
return nil, err
}
return rp, nil
}
// WriteRevertPoint stores a revert point for the given block number.
func WriteRevertPoint(db ethdb.KeyValueWriter, blockNumber uint64, rp *RevertPoint) {
enc, err := rlp.EncodeToBytes(rp)
if err != nil {
log.Crit("Failed to encode revert point", "err", err)
}
if err := db.Put(revertPointKey(blockNumber), enc); err != nil {
log.Crit("Failed to store revert point", "err", err)
}
}
// DeleteRevertPoint deletes the given revert point.
func DeleteRevertPoint(db ethdb.KeyValueWriter, blockNumber uint64) {
if err := db.Delete(revertPointKey(blockNumber)); err != nil {
log.Crit("Failed to delete revert point", "err", err)
}
}

@ -145,6 +145,13 @@ var (
FixedCommitteeRootKey = []byte("fixedRoot-") // bigEndian64(syncPeriod) -> committee root hash
SyncCommitteeKey = []byte("committee-") // bigEndian64(syncPeriod) -> serialized committee
FilterMapsPrefix = []byte("fT5-") //TODO fm-
filterMapsRangeKey = append(FilterMapsPrefix, byte('R'))
filterMapRowPrefix = append(FilterMapsPrefix, byte('r')) // filterMapRowPrefix + mapRowIndex (uint64 big endian) -> filter row
filterMapBlockPtrPrefix = append(FilterMapsPrefix, byte('b')) // filterMapBlockPtrPrefix + mapIndex (uint32 big endian) -> block number (uint64 big endian)
blockLVPrefix = append(FilterMapsPrefix, byte('p')) // blockLVPrefix + num (uint64 big endian) -> log value pointer (uint64 big endian)
revertPointPrefix = append(FilterMapsPrefix, byte('v')) // revertPointPrefix + num (uint64 big endian) -> revert data
preimageCounter = metrics.NewRegisteredCounter("db/preimage/total", nil)
preimageHitCounter = metrics.NewRegisteredCounter("db/preimage/hits", nil)
)
@ -346,3 +353,27 @@ func IsStorageTrieNode(key []byte) bool {
ok, _, _ := ResolveStorageTrieNode(key)
return ok
}
// filterMapRowKey = filterMapRowPrefix + mapRowIndex (uint64 big endian)
func filterMapRowKey(mapRowIndex uint64) []byte {
key := append(filterMapRowPrefix, make([]byte, 8)...)
binary.BigEndian.PutUint64(key[1:], mapRowIndex)
return key
}
// filterMapBlockPtrKey = filterMapBlockPtrPrefix + mapIndex (uint32 big endian)
func filterMapBlockPtrKey(mapIndex uint32) []byte {
key := append(filterMapBlockPtrPrefix, make([]byte, 4)...)
binary.BigEndian.PutUint32(key[1:], mapIndex)
return key
}
// blockLVKey = blockLVPrefix + num (uint64 big endian)
func blockLVKey(number uint64) []byte {
return append(blockLVPrefix, encodeBlockNumber(number)...)
}
// revertPointKey = revertPointPrefix + num (uint64 big endian)
func revertPointKey(number uint64) []byte {
return append(revertPointPrefix, encodeBlockNumber(number)...)
}

@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/txpool"
@ -44,6 +45,7 @@ import (
// EthAPIBackend implements ethapi.Backend and tracers.Backend for full nodes
type EthAPIBackend struct {
*filtermaps.FilterMapsMatcherBackend
extRPCEnabled bool
allowUnprotectedTxs bool
eth *Ethereum

@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/pruner"
"github.com/ethereum/go-ethereum/core/txpool"
@ -83,6 +84,8 @@ type Ethereum struct {
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
closeBloomHandler chan struct{}
filterMaps *filtermaps.FilterMaps
APIBackend *EthAPIBackend
miner *miner.Miner
@ -221,6 +224,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
return nil, err
}
eth.bloomIndexer.Start(eth.blockchain)
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain)
if config.BlobPool.Datadir != "" {
config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)
@ -255,7 +259,13 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
eth.miner = miner.New(eth, config.Miner, eth.engine)
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil}
eth.APIBackend = &EthAPIBackend{
FilterMapsMatcherBackend: (*filtermaps.FilterMapsMatcherBackend)(eth.filterMaps),
extRPCEnabled: stack.Config().ExtRPCEnabled(),
allowUnprotectedTxs: stack.Config().AllowUnprotectedTxs,
eth: eth,
gpo: nil,
}
if eth.APIBackend.allowUnprotectedTxs {
log.Info("Unprotected transactions allowed")
}
@ -407,6 +417,7 @@ func (s *Ethereum) Stop() error {
// Then stop everything else.
s.bloomIndexer.Close()
s.filterMaps.Close()
close(s.closeBloomHandler)
s.txPool.Close()
s.blockchain.Stop()

@ -19,11 +19,16 @@ package filters
import (
"context"
"errors"
"fmt"
"math/big"
//"reflect"
"slices"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
)
@ -35,8 +40,9 @@ type Filter struct {
addresses []common.Address
topics [][]common.Hash
block *common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks
block *common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks
bbMatchCount uint64
matcher *bloombits.Matcher
}
@ -148,16 +154,28 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
return nil, err
}
logChan, errChan := f.rangeLogsAsync(ctx)
var logs []*types.Log
for {
select {
case log := <-logChan:
logs = append(logs, log)
case err := <-errChan:
return logs, err
start := time.Now()
logs, err := filtermaps.GetPotentialMatches(ctx, f.sys.backend, uint64(f.begin), uint64(f.end), f.addresses, f.topics)
fmLogs := filterLogs(logs, nil, nil, f.addresses, f.topics)
fmt.Println("filtermaps (new) runtime", time.Since(start), "true matches", len(fmLogs), "false positives", len(logs)-len(fmLogs))
//TODO remove
/*f.bbMatchCount = 0
start = time.Now()
logChan, errChan := f.rangeLogsAsync(ctx)
var bbLogs []*types.Log
loop:
for {
select {
case log := <-logChan:
bbLogs = append(bbLogs, log)
case <-errChan:
break loop
}
}
}
fmt.Println("bloombits (old) runtime", time.Since(start), "true matches", len(bbLogs), "false positives", f.bbMatchCount-uint64(len(bbLogs)))
fmt.Println("DeepEqual", reflect.DeepEqual(fmLogs, bbLogs))*/
return fmLogs, err
}
// rangeLogsAsync retrieves block-range logs that match the filter criteria asynchronously,
@ -218,6 +236,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64, logChan chan *type
for {
select {
case number, ok := <-matches:
f.bbMatchCount++
// Abort if all matches have been fulfilled
if !ok {
err := session.Error()

@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@ -71,6 +72,10 @@ type Backend interface {
BloomStatus() (uint64, uint64)
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error)
GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (filtermaps.FilterRow, error)
GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error)
}
// FilterSystem holds resources shared by all filters.

@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
@ -97,6 +98,10 @@ type Backend interface {
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
BloomStatus() (uint64, uint64)
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error)
GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (filtermaps.FilterRow, error)
GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error)
}
func GetAPIs(apiBackend Backend) []rpc.API {

Loading…
Cancel
Save