mirror of https://github.com/ethereum/go-ethereum
parent
8c73523812
commit
3e36b4d56a
@ -0,0 +1,582 @@ |
||||
package filtermaps |
||||
|
||||
import ( |
||||
"context" |
||||
"crypto/sha256" |
||||
"encoding/binary" |
||||
"errors" |
||||
"sort" |
||||
"sync" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/common/lru" |
||||
"github.com/ethereum/go-ethereum/core" |
||||
"github.com/ethereum/go-ethereum/core/rawdb" |
||||
"github.com/ethereum/go-ethereum/core/types" |
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
"github.com/ethereum/go-ethereum/log" |
||||
) |
||||
|
||||
const ( |
||||
logMapHeight = 12 // log2(mapHeight)
|
||||
mapHeight = 1 << logMapHeight // filter map height (number of rows)
|
||||
logMapsPerEpoch = 6 // log2(mmapsPerEpochapsPerEpoch)
|
||||
mapsPerEpoch = 1 << logMapsPerEpoch // number of maps in an epoch
|
||||
logValuesPerMap = 16 // log2(logValuesPerMap)
|
||||
valuesPerMap = 1 << logValuesPerMap // number of log values marked on each filter map
|
||||
|
||||
headCacheSize = 8 // maximum number of recent filter maps cached in memory
|
||||
) |
||||
|
||||
// FilterMaps is the in-memory representation of the log index structure that is
|
||||
// responsible for building and updating the index according to the canonical
|
||||
// chain.
|
||||
// Note that FilterMaps implements the same data structure as proposed in EIP-7745
|
||||
// without the tree hashing and consensus changes:
|
||||
// https://eips.ethereum.org/EIPS/eip-7745
|
||||
type FilterMaps struct { |
||||
lock sync.RWMutex |
||||
db ethdb.KeyValueStore |
||||
closeCh chan chan struct{} |
||||
|
||||
filterMapsRange |
||||
chain *core.BlockChain |
||||
|
||||
// filterMapCache caches certain filter maps (headCacheSize most recent maps
|
||||
// and one tail map) that are expected to be frequently accessed and modified
|
||||
// while updating the structure. Note that the set of cached maps depends
|
||||
// only on filterMapsRange and rows of other maps are not cached here.
|
||||
filterMapLock sync.Mutex |
||||
filterMapCache map[uint32]*filterMap |
||||
blockPtrCache *lru.Cache[uint32, uint64] |
||||
lvPointerCache *lru.Cache[uint64, uint64] |
||||
revertPoints map[uint64]*revertPoint |
||||
} |
||||
|
||||
// filterMap is a full or partial in-memory representation of a filter map where
|
||||
// rows are allowed to have a nil value meaning the row is not stored in the
|
||||
// structure. Note that therefore a known empty row should be represented with
|
||||
// a zero-length slice.
|
||||
// It can be used as a memory cache or an overlay while preparing a batch of
|
||||
// changes to the structure. In either case a nil value should be interpreted
|
||||
// as transparent (uncached/unchanged).
|
||||
type filterMap [mapHeight]FilterRow |
||||
|
||||
// FilterRow encodes a single row of a filter map as a list of column indices.
|
||||
// Note that the values are always stored in the same order as they were added
|
||||
// and if the same column index is added twice, it is also stored twice.
|
||||
// Order of column indices and potential duplications do not matter when searching
|
||||
// for a value but leaving the original order makes reverting to a previous state
|
||||
// simpler.
|
||||
type FilterRow []uint32 |
||||
|
||||
// emptyRow represents an empty FilterRow. Note that in case of decoded FilterRows
|
||||
// nil has a special meaning (transparent; not stored in the cache/overlay map)
|
||||
// and therefore an empty row is represented by a zero length slice.
|
||||
var emptyRow = FilterRow{} |
||||
|
||||
// filterMapsRange describes the block range that has been indexed and the log
|
||||
// value index range it has been mapped to.
|
||||
type filterMapsRange struct { |
||||
initialized bool |
||||
headLvPointer, tailLvPointer uint64 |
||||
headBlockNumber, tailBlockNumber uint64 |
||||
headBlockHash, tailParentHash common.Hash |
||||
} |
||||
|
||||
// NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep
|
||||
// the structure in sync with the given blockchain.
|
||||
func NewFilterMaps(db ethdb.KeyValueStore, chain *core.BlockChain) *FilterMaps { |
||||
rs, err := rawdb.ReadFilterMapsRange(db) |
||||
if err != nil { |
||||
log.Error("Error reading log index range", "error", err) |
||||
} |
||||
fm := &FilterMaps{ |
||||
db: db, |
||||
chain: chain, |
||||
closeCh: make(chan chan struct{}), |
||||
filterMapsRange: filterMapsRange{ |
||||
initialized: rs.Initialized, |
||||
headLvPointer: rs.HeadLvPointer, |
||||
tailLvPointer: rs.TailLvPointer, |
||||
headBlockNumber: rs.HeadBlockNumber, |
||||
tailBlockNumber: rs.TailBlockNumber, |
||||
headBlockHash: rs.HeadBlockHash, |
||||
tailParentHash: rs.TailParentHash, |
||||
}, |
||||
filterMapCache: make(map[uint32]*filterMap), |
||||
blockPtrCache: lru.NewCache[uint32, uint64](1000), |
||||
lvPointerCache: lru.NewCache[uint64, uint64](1000), |
||||
revertPoints: make(map[uint64]*revertPoint), |
||||
} |
||||
if !fm.initialized { |
||||
fm.resetDb() |
||||
} |
||||
fm.updateMapCache() |
||||
if rp, err := fm.newUpdateBatch().makeRevertPoint(); err == nil { |
||||
fm.revertPoints[rp.blockNumber] = rp |
||||
} else { |
||||
log.Error("Error creating head revert point", "error", err) |
||||
} |
||||
go fm.updateLoop() |
||||
return fm |
||||
} |
||||
|
||||
// Close ensures that the indexer is fully stopped before returning.
|
||||
func (f *FilterMaps) Close() { |
||||
ch := make(chan struct{}) |
||||
f.closeCh <- ch |
||||
<-ch |
||||
} |
||||
|
||||
// FilterMapsMatcherBackend implements MatcherBackend.
|
||||
type FilterMapsMatcherBackend FilterMaps |
||||
|
||||
// GetFilterMapRow returns the given row of the given map. If the row is empty
|
||||
// then a non-nil zero length row is returned.
|
||||
// Note that the returned slices should not be modified, they should be copied
|
||||
// on write.
|
||||
// GetFilterMapRow implements MatcherBackend.
|
||||
func (ff *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error) { |
||||
f := (*FilterMaps)(ff) |
||||
return f.getFilterMapRow(mapIndex, rowIndex) |
||||
} |
||||
|
||||
// GetBlockLvPointer returns the starting log value index where the log values
|
||||
// generated by the given block are located. If blockNumber is beyond the current
|
||||
// head then the first unoccupied log value index is returned.
|
||||
// GetBlockLvPointer implements MatcherBackend.
|
||||
func (ff *FilterMapsMatcherBackend) GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) { |
||||
f := (*FilterMaps)(ff) |
||||
f.lock.RLock() |
||||
defer f.lock.RUnlock() |
||||
|
||||
return f.getBlockLvPointer(blockNumber) |
||||
} |
||||
|
||||
// GetLogByLvIndex returns the log at the given log value index. If the index does
|
||||
// not point to the first log value entry of a log then no log and no error are
|
||||
// returned as this can happen when the log value index was a false positive.
|
||||
// Note that this function assumes that the log index structure is consistent
|
||||
// with the canonical chain at the point where the given log value index points.
|
||||
// If this is not the case then an invalid result or an error may be returned.
|
||||
// GetLogByLvIndex implements MatcherBackend.
|
||||
func (ff *FilterMapsMatcherBackend) GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) { |
||||
f := (*FilterMaps)(ff) |
||||
f.lock.RLock() |
||||
defer f.lock.RUnlock() |
||||
|
||||
return f.getLogByLvIndex(lvIndex) |
||||
} |
||||
|
||||
// reset un-initializes the FilterMaps structure and removes all related data from
|
||||
// the database.
|
||||
// Note that this function assumes that the read/write lock is being held.
|
||||
func (f *FilterMaps) reset() { |
||||
// deleting the range first ensures that resetDb will be called again at next
|
||||
// startup and any leftover data will be removed even if it cannot finish now.
|
||||
rawdb.DeleteFilterMapsRange(f.db) |
||||
f.resetDb() |
||||
f.filterMapsRange = filterMapsRange{} |
||||
f.filterMapCache = make(map[uint32]*filterMap) |
||||
f.revertPoints = make(map[uint64]*revertPoint) |
||||
f.blockPtrCache.Purge() |
||||
f.lvPointerCache.Purge() |
||||
} |
||||
|
||||
// resetDb removes all log index data from the database.
|
||||
func (f *FilterMaps) resetDb() { |
||||
var logged bool |
||||
for { |
||||
it := f.db.NewIterator(rawdb.FilterMapsPrefix, nil) |
||||
batch := f.db.NewBatch() |
||||
var count int |
||||
for ; count < 10000 && it.Next(); count++ { |
||||
batch.Delete(it.Key()) |
||||
} |
||||
it.Release() |
||||
if count == 0 { |
||||
break |
||||
} |
||||
if !logged { |
||||
log.Info("Resetting log index database...") |
||||
logged = true |
||||
} |
||||
batch.Write() |
||||
} |
||||
if logged { |
||||
log.Info("Resetting log index database finished") |
||||
} |
||||
} |
||||
|
||||
// setRange updates the covered range and also adds the changes to the given batch.
|
||||
// Note that this function assumes that the read/write lock is being held.
|
||||
func (f *FilterMaps) setRange(batch ethdb.Batch, newRange filterMapsRange) { |
||||
f.filterMapsRange = newRange |
||||
rs := rawdb.FilterMapsRange{ |
||||
Initialized: newRange.initialized, |
||||
HeadLvPointer: newRange.headLvPointer, |
||||
TailLvPointer: newRange.tailLvPointer, |
||||
HeadBlockNumber: newRange.headBlockNumber, |
||||
TailBlockNumber: newRange.tailBlockNumber, |
||||
HeadBlockHash: newRange.headBlockHash, |
||||
TailParentHash: newRange.tailParentHash, |
||||
} |
||||
rawdb.WriteFilterMapsRange(batch, rs) |
||||
f.updateMapCache() |
||||
} |
||||
|
||||
// updateMapCache updates the maps covered by the filterMapCache according to the
|
||||
// covered range.
|
||||
// Note that this function assumes that the read lock is being held.
|
||||
func (f *FilterMaps) updateMapCache() { |
||||
if !f.initialized { |
||||
return |
||||
} |
||||
f.filterMapLock.Lock() |
||||
defer f.filterMapLock.Unlock() |
||||
|
||||
newFilterMapCache := make(map[uint32]*filterMap) |
||||
firstMap, afterLastMap := uint32(f.tailLvPointer>>logValuesPerMap), uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap) |
||||
headCacheFirst := firstMap + 1 |
||||
if afterLastMap > headCacheFirst+headCacheSize { |
||||
headCacheFirst = afterLastMap - headCacheSize |
||||
} |
||||
fm := f.filterMapCache[firstMap] |
||||
if fm == nil { |
||||
fm = new(filterMap) |
||||
} |
||||
newFilterMapCache[firstMap] = fm |
||||
for mapIndex := headCacheFirst; mapIndex < afterLastMap; mapIndex++ { |
||||
fm := f.filterMapCache[mapIndex] |
||||
if fm == nil { |
||||
fm = new(filterMap) |
||||
} |
||||
newFilterMapCache[mapIndex] = fm |
||||
} |
||||
f.filterMapCache = newFilterMapCache |
||||
} |
||||
|
||||
// getLogByLvIndex returns the log at the given log value index. If the index does
|
||||
// not point to the first log value entry of a log then no log and no error are
|
||||
// returned as this can happen when the log value index was a false positive.
|
||||
// Note that this function assumes that the log index structure is consistent
|
||||
// with the canonical chain at the point where the given log value index points.
|
||||
// If this is not the case then an invalid result or an error may be returned.
|
||||
// Note that this function assumes that the read lock is being held.
|
||||
func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) { |
||||
if lvIndex < f.tailLvPointer || lvIndex > f.headLvPointer { |
||||
return nil, errors.New("log value index outside available range") |
||||
} |
||||
// find possible block range based on map to block pointers
|
||||
mapIndex := uint32(lvIndex >> logValuesPerMap) |
||||
firstBlockNumber, err := f.getMapBlockPtr(mapIndex) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
var lastBlockNumber uint64 |
||||
if mapIndex+1 < uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap) { |
||||
lastBlockNumber, err = f.getMapBlockPtr(mapIndex + 1) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} else { |
||||
lastBlockNumber = f.headBlockNumber |
||||
} |
||||
// find block with binary search based on block to log value index pointers
|
||||
for firstBlockNumber < lastBlockNumber { |
||||
midBlockNumber := (firstBlockNumber + lastBlockNumber + 1) / 2 |
||||
midLvPointer, err := f.getBlockLvPointer(midBlockNumber) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if lvIndex < midLvPointer { |
||||
lastBlockNumber = midBlockNumber - 1 |
||||
} else { |
||||
firstBlockNumber = midBlockNumber |
||||
} |
||||
} |
||||
// get block receipts
|
||||
hash := f.chain.GetCanonicalHash(firstBlockNumber) |
||||
receipts := f.chain.GetReceiptsByHash(hash) //TODO small cache
|
||||
if receipts == nil { |
||||
return nil, errors.New("receipts not found") |
||||
} |
||||
lvPointer, err := f.getBlockLvPointer(firstBlockNumber) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
// iterate through receipts to find the exact log starting at lvIndex
|
||||
for _, receipt := range receipts { |
||||
for _, log := range receipt.Logs { |
||||
if lvPointer > lvIndex { |
||||
// lvIndex does not point to the first log value (address value)
|
||||
// generated by a log as true matches should always do, so it
|
||||
// is considered a false positive (no log and no error returned).
|
||||
return nil, nil |
||||
} |
||||
if lvPointer == lvIndex { |
||||
return log, nil // potential match
|
||||
} |
||||
lvPointer += uint64(len(log.Topics) + 1) |
||||
} |
||||
} |
||||
return nil, errors.New("log value index not found") |
||||
} |
||||
|
||||
// getFilterMapRow returns the given row of the given map. If the row is empty
|
||||
// then a non-nil zero length row is returned.
|
||||
// Note that the returned slices should not be modified, they should be copied
|
||||
// on write.
|
||||
func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32) (FilterRow, error) { |
||||
f.filterMapLock.Lock() |
||||
defer f.filterMapLock.Unlock() |
||||
|
||||
fm := f.filterMapCache[mapIndex] |
||||
if fm != nil && fm[rowIndex] != nil { |
||||
return fm[rowIndex], nil |
||||
} |
||||
row, err := rawdb.ReadFilterMapRow(f.db, mapRowIndex(mapIndex, rowIndex)) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if fm != nil { |
||||
fm[rowIndex] = FilterRow(row) |
||||
} |
||||
return FilterRow(row), nil |
||||
} |
||||
|
||||
// storeFilterMapRow stores a row at the given row index of the given map and also
|
||||
// caches it in filterMapCache if the given map is cached.
|
||||
// Note that empty rows are not stored in the database and therefore there is no
|
||||
// separate delete function; deleting a row is the same as storing an empty row.
|
||||
func (f *FilterMaps) storeFilterMapRow(batch ethdb.Batch, mapIndex, rowIndex uint32, row FilterRow) { |
||||
f.filterMapLock.Lock() |
||||
defer f.filterMapLock.Unlock() |
||||
|
||||
if fm := f.filterMapCache[mapIndex]; fm != nil { |
||||
(*fm)[rowIndex] = row |
||||
} |
||||
rawdb.WriteFilterMapRow(batch, mapRowIndex(mapIndex, rowIndex), []uint32(row)) |
||||
} |
||||
|
||||
// mapRowIndex calculates the unified storage index where the given row of the
|
||||
// given map is stored. Note that this indexing scheme is the same as the one
|
||||
// proposed in EIP-7745 for tree-hashing the filter map structure and for the
|
||||
// same data proximity reasons it is also suitable for database representation.
|
||||
// See also:
|
||||
// https://eips.ethereum.org/EIPS/eip-7745#hash-tree-structure
|
||||
func mapRowIndex(mapIndex, rowIndex uint32) uint64 { |
||||
epochIndex, mapSubIndex := mapIndex>>logMapsPerEpoch, mapIndex%mapsPerEpoch |
||||
return (uint64(epochIndex)<<logMapHeight+uint64(rowIndex))<<logMapsPerEpoch + uint64(mapSubIndex) |
||||
} |
||||
|
||||
// getBlockLvPointer returns the starting log value index where the log values
|
||||
// generated by the given block are located. If blockNumber is beyond the current
|
||||
// head then the first unoccupied log value index is returned.
|
||||
// Note that this function assumes that the read lock is being held.
|
||||
func (f *FilterMaps) getBlockLvPointer(blockNumber uint64) (uint64, error) { |
||||
if blockNumber > f.headBlockNumber { |
||||
return f.headLvPointer, nil |
||||
} |
||||
if lvPointer, ok := f.lvPointerCache.Get(blockNumber); ok { |
||||
return lvPointer, nil |
||||
} |
||||
lvPointer, err := rawdb.ReadBlockLvPointer(f.db, blockNumber) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
f.lvPointerCache.Add(blockNumber, lvPointer) |
||||
return lvPointer, nil |
||||
} |
||||
|
||||
// storeBlockLvPointer stores the starting log value index where the log values
|
||||
// generated by the given block are located.
|
||||
func (f *FilterMaps) storeBlockLvPointer(batch ethdb.Batch, blockNumber, lvPointer uint64) { |
||||
f.lvPointerCache.Add(blockNumber, lvPointer) |
||||
rawdb.WriteBlockLvPointer(batch, blockNumber, lvPointer) |
||||
} |
||||
|
||||
// deleteBlockLvPointer deletes the starting log value index where the log values
|
||||
// generated by the given block are located.
|
||||
func (f *FilterMaps) deleteBlockLvPointer(batch ethdb.Batch, blockNumber uint64) { |
||||
f.lvPointerCache.Remove(blockNumber) |
||||
rawdb.DeleteBlockLvPointer(batch, blockNumber) |
||||
} |
||||
|
||||
// getMapBlockPtr returns the number of the block that generated the first log
|
||||
// value entry of the given map.
|
||||
func (f *FilterMaps) getMapBlockPtr(mapIndex uint32) (uint64, error) { |
||||
if blockPtr, ok := f.blockPtrCache.Get(mapIndex); ok { |
||||
return blockPtr, nil |
||||
} |
||||
blockPtr, err := rawdb.ReadFilterMapBlockPtr(f.db, mapIndex) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
f.blockPtrCache.Add(mapIndex, blockPtr) |
||||
return blockPtr, nil |
||||
} |
||||
|
||||
// storeMapBlockPtr stores the number of the block that generated the first log
|
||||
// value entry of the given map.
|
||||
func (f *FilterMaps) storeMapBlockPtr(batch ethdb.Batch, mapIndex uint32, blockPtr uint64) { |
||||
f.blockPtrCache.Add(mapIndex, blockPtr) |
||||
rawdb.WriteFilterMapBlockPtr(batch, mapIndex, blockPtr) |
||||
} |
||||
|
||||
// deleteMapBlockPtr deletes the number of the block that generated the first log
|
||||
// value entry of the given map.
|
||||
func (f *FilterMaps) deleteMapBlockPtr(batch ethdb.Batch, mapIndex uint32) { |
||||
f.blockPtrCache.Remove(mapIndex) |
||||
rawdb.DeleteFilterMapBlockPtr(batch, mapIndex) |
||||
} |
||||
|
||||
// addressValue returns the log value hash of a log emitting address.
|
||||
func addressValue(address common.Address) common.Hash { |
||||
var result common.Hash |
||||
hasher := sha256.New() |
||||
hasher.Write(address[:]) |
||||
hasher.Sum(result[:0]) |
||||
return result |
||||
} |
||||
|
||||
// topicValue returns the log value hash of a log topic.
|
||||
func topicValue(topic common.Hash) common.Hash { |
||||
var result common.Hash |
||||
hasher := sha256.New() |
||||
hasher.Write(topic[:]) |
||||
hasher.Sum(result[:0]) |
||||
return result |
||||
} |
||||
|
||||
// rowIndex returns the row index in which the given log value should be marked
|
||||
// during the given epoch. Note that row assignments are re-shuffled in every
|
||||
// epoch in order to ensure that even though there are always a few more heavily
|
||||
// used rows due to very popular addresses and topics, these will not make search
|
||||
// for other log values very expensive. Even if certain values are occasionally
|
||||
// sorted into these heavy rows, in most of the epochs they are placed in average
|
||||
// length rows.
|
||||
func rowIndex(epochIndex uint32, logValue common.Hash) uint32 { |
||||
hasher := sha256.New() |
||||
hasher.Write(logValue[:]) |
||||
var indexEnc [4]byte |
||||
binary.LittleEndian.PutUint32(indexEnc[:], epochIndex) |
||||
hasher.Write(indexEnc[:]) |
||||
var hash common.Hash |
||||
hasher.Sum(hash[:0]) |
||||
return binary.LittleEndian.Uint32(hash[:4]) % mapHeight |
||||
} |
||||
|
||||
// columnIndex returns the column index that should be added to the appropriate
|
||||
// row in order to place a mark for the next log value.
|
||||
func columnIndex(lvIndex uint64, logValue common.Hash) uint32 { |
||||
x := uint32(lvIndex % valuesPerMap) // log value sub-index
|
||||
transformHash := transformHash(uint32(lvIndex/valuesPerMap), logValue) |
||||
// apply column index transformation function
|
||||
x += binary.LittleEndian.Uint32(transformHash[0:4]) |
||||
x *= binary.LittleEndian.Uint32(transformHash[4:8])*2 + 1 |
||||
x ^= binary.LittleEndian.Uint32(transformHash[8:12]) |
||||
x *= binary.LittleEndian.Uint32(transformHash[12:16])*2 + 1 |
||||
x += binary.LittleEndian.Uint32(transformHash[16:20]) |
||||
x *= binary.LittleEndian.Uint32(transformHash[20:24])*2 + 1 |
||||
x ^= binary.LittleEndian.Uint32(transformHash[24:28]) |
||||
x *= binary.LittleEndian.Uint32(transformHash[28:32])*2 + 1 |
||||
return x |
||||
} |
||||
|
||||
// transformHash calculates a hash specific to a given map and log value hash
|
||||
// that defines a bijective function on the uint32 range. This function is used
|
||||
// to transform the log value sub-index (distance from the first index of the map)
|
||||
// into a 32 bit column index, then applied in reverse when searching for potential
|
||||
// matches for a given log value.
|
||||
func transformHash(mapIndex uint32, logValue common.Hash) (result common.Hash) { |
||||
hasher := sha256.New() |
||||
hasher.Write(logValue[:]) |
||||
var indexEnc [4]byte |
||||
binary.LittleEndian.PutUint32(indexEnc[:], mapIndex) |
||||
hasher.Write(indexEnc[:]) |
||||
hasher.Sum(result[:0]) |
||||
return |
||||
} |
||||
|
||||
// potentialMatches returns the list of log value indices potentially matching
|
||||
// the given log value hash in the range of the filter map the row belongs to.
|
||||
// Note that the list of indices is always sorted and potential duplicates are
|
||||
// removed. Though the column indices are stored in the same order they were
|
||||
// added and therefore the true matches are automatically reverse transformed
|
||||
// in the right order, false positives can ruin this property. Since these can
|
||||
// only be separated from true matches after the combined pattern matching of the
|
||||
// outputs of individual log value matchers and this pattern matcher assumes a
|
||||
// sorted and duplicate-free list of indices, we should ensure these properties
|
||||
// here.
|
||||
func (row FilterRow) potentialMatches(mapIndex uint32, logValue common.Hash) potentialMatches { |
||||
results := make(potentialMatches, 0, 8) |
||||
transformHash := transformHash(mapIndex, logValue) |
||||
sub1 := binary.LittleEndian.Uint32(transformHash[0:4]) |
||||
mul1 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[4:8])*2 + 1) |
||||
xor1 := binary.LittleEndian.Uint32(transformHash[8:12]) |
||||
mul2 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[12:16])*2 + 1) |
||||
sub2 := binary.LittleEndian.Uint32(transformHash[16:20]) |
||||
mul3 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[20:24])*2 + 1) |
||||
xor2 := binary.LittleEndian.Uint32(transformHash[24:28]) |
||||
mul4 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[28:32])*2 + 1) |
||||
// perform reverse column index transformation on all column indices of the row.
|
||||
// if a column index was added by the searched log value then the reverse
|
||||
// transform will yield a valid log value sub-index of the given map.
|
||||
// Column index is 32 bits long while there are 2**16 valid log value indices
|
||||
// in the map's range, so this can also happen by accident with 1 in 2**16
|
||||
// chance, in which case we have a false positive.
|
||||
for _, columnIndex := range row { |
||||
if potentialSubIndex := (((((((columnIndex * mul4) ^ xor2) * mul3) - sub2) * mul2) ^ xor1) * mul1) - sub1; potentialSubIndex < valuesPerMap { |
||||
results = append(results, uint64(mapIndex)*valuesPerMap+uint64(potentialSubIndex)) |
||||
} |
||||
} |
||||
sort.Sort(results) |
||||
// remove duplicates
|
||||
j := 0 |
||||
for i, match := range results { |
||||
if i == 0 || match != results[i-1] { |
||||
results[j] = results[i] |
||||
j++ |
||||
} |
||||
} |
||||
return results[:j] |
||||
} |
||||
|
||||
// potentialMatches is a strictly monotonically increasing list of log value
|
||||
// indices in the range of a filter map that are potential matches for certain
|
||||
// filter criteria.
|
||||
// Note that nil is used as a wildcard and therefore means that all log value
|
||||
// indices in the filter map range are potential matches. If there are no
|
||||
// potential matches in the given map's range then an empty slice should be used.
|
||||
type potentialMatches []uint64 |
||||
|
||||
// noMatches means there are no potential matches in a given filter map's range.
|
||||
var noMatches = potentialMatches{} |
||||
|
||||
func (p potentialMatches) Len() int { return len(p) } |
||||
func (p potentialMatches) Less(i, j int) bool { return p[i] < p[j] } |
||||
func (p potentialMatches) Swap(i, j int) { p[i], p[j] = p[j], p[i] } |
||||
|
||||
// uint32ModInverse takes an odd 32 bit number and returns its modular
|
||||
// multiplicative inverse (mod 2**32), meaning that for any uint32 x and odd y
|
||||
// x * y * uint32ModInverse(y) == 1.
|
||||
func uint32ModInverse(v uint32) uint32 { |
||||
if v&1 == 0 { |
||||
panic("uint32ModInverse called with even argument") |
||||
} |
||||
m := int64(1) << 32 |
||||
m0 := m |
||||
a := int64(v) |
||||
x, y := int64(1), int64(0) |
||||
for a > 1 { |
||||
q := a / m |
||||
m, a = a%m, m |
||||
x, y = y, x-q*y |
||||
} |
||||
if x < 0 { |
||||
x += m0 |
||||
} |
||||
return uint32(x) |
||||
} |
@ -0,0 +1,618 @@ |
||||
package filtermaps |
||||
|
||||
import ( |
||||
"errors" |
||||
"math" |
||||
"time" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/core" |
||||
"github.com/ethereum/go-ethereum/core/rawdb" |
||||
"github.com/ethereum/go-ethereum/core/types" |
||||
"github.com/ethereum/go-ethereum/log" |
||||
) |
||||
|
||||
const ( |
||||
startLvPointer = valuesPerMap << 31 // log value index assigned to init block
|
||||
removedPointer = math.MaxUint64 // used in updateBatch to signal removed items
|
||||
revertPointFrequency = 256 // frequency of revert points in database
|
||||
cachedRevertPoints = 64 // revert points for most recent blocks in memory
|
||||
) |
||||
|
||||
// updateLoop initializes and updates the log index structure according to the
|
||||
// canonical chain.
|
||||
func (f *FilterMaps) updateLoop() { |
||||
headEventCh := make(chan core.ChainHeadEvent) |
||||
sub := f.chain.SubscribeChainHeadEvent(headEventCh) |
||||
defer sub.Unsubscribe() |
||||
|
||||
head := f.chain.CurrentBlock() |
||||
if head == nil { |
||||
select { |
||||
case ev := <-headEventCh: |
||||
head = ev.Block.Header() |
||||
case ch := <-f.closeCh: |
||||
close(ch) |
||||
return |
||||
} |
||||
} |
||||
fmr := f.getRange() |
||||
|
||||
var stop bool |
||||
wait := func() { |
||||
if stop { |
||||
return |
||||
} |
||||
select { |
||||
case ev := <-headEventCh: |
||||
head = ev.Block.Header() |
||||
case <-time.After(time.Second * 20): |
||||
// keep updating log index during syncing
|
||||
head = f.chain.CurrentBlock() |
||||
case ch := <-f.closeCh: |
||||
close(ch) |
||||
stop = true |
||||
} |
||||
} |
||||
|
||||
for !stop { |
||||
if !fmr.initialized { |
||||
f.tryInit(head) |
||||
fmr = f.getRange() |
||||
if !fmr.initialized { |
||||
wait() |
||||
continue |
||||
} |
||||
} |
||||
// log index is initialized
|
||||
if fmr.headBlockHash != head.Hash() { |
||||
f.tryUpdateHead(head) |
||||
fmr = f.getRange() |
||||
if fmr.headBlockHash != head.Hash() { |
||||
wait() |
||||
continue |
||||
} |
||||
} |
||||
// log index head is at latest chain head; process tail blocks if possible
|
||||
f.tryExtendTail(func() bool { |
||||
// return true if tail processing needs to be stopped
|
||||
select { |
||||
case ev := <-headEventCh: |
||||
head = ev.Block.Header() |
||||
case ch := <-f.closeCh: |
||||
close(ch) |
||||
stop = true |
||||
return true |
||||
default: |
||||
head = f.chain.CurrentBlock() |
||||
} |
||||
// stop if there is a new chain head (always prioritize head updates)
|
||||
return fmr.headBlockHash != head.Hash() |
||||
}) |
||||
if fmr.headBlockHash == head.Hash() { |
||||
// if tail processing exited while there is no new head then no more
|
||||
// tail blocks can be processed
|
||||
wait() |
||||
} |
||||
} |
||||
} |
||||
|
||||
// getRange returns the current filterMapsRange.
|
||||
func (f *FilterMaps) getRange() filterMapsRange { |
||||
f.lock.RLock() |
||||
defer f.lock.RUnlock() |
||||
|
||||
return f.filterMapsRange |
||||
} |
||||
|
||||
// tryInit attempts to initialize the log index structure.
|
||||
func (f *FilterMaps) tryInit(head *types.Header) { |
||||
receipts := f.chain.GetReceiptsByHash(head.Hash()) |
||||
if receipts == nil { |
||||
log.Error("Could not retrieve block receipts for init block", "number", head.Number, "hash", head.Hash()) |
||||
return |
||||
} |
||||
update := f.newUpdateBatch() |
||||
if err := update.initWithBlock(head, receipts); err != nil { |
||||
log.Error("Could not initialize log index", "error", err) |
||||
} |
||||
f.applyUpdateBatch(update) |
||||
} |
||||
|
||||
// tryUpdateHead attempts to update the log index with a new head. If necessary,
|
||||
// it reverts to a common ancestor with the old head before adding new block logs.
|
||||
// If no suitable revert point is available (probably a reorg just after init)
|
||||
// then it resets the index and tries to re-initialize with the new head.
|
||||
func (f *FilterMaps) tryUpdateHead(newHead *types.Header) { |
||||
// iterate back from new head until the log index head or a revert point and
|
||||
// collect headers of blocks to be added
|
||||
var ( |
||||
newHeaders []*types.Header |
||||
chainPtr = newHead |
||||
rp *revertPoint |
||||
) |
||||
for { |
||||
if rp == nil || chainPtr.Number.Uint64() < rp.blockNumber { |
||||
var err error |
||||
rp, err = f.getRevertPoint(chainPtr.Number.Uint64()) |
||||
if err != nil { |
||||
log.Error("Error fetching revert point", "block number", chainPtr.Number.Uint64(), "error", err) |
||||
return |
||||
} |
||||
if rp == nil { |
||||
// there are no more revert points available so we should reset and re-initialize
|
||||
log.Warn("No suitable revert point exists; re-initializing log index", "block number", newHead.Number.Uint64()) |
||||
f.reset() |
||||
f.tryInit(newHead) |
||||
return |
||||
} |
||||
} |
||||
if chainPtr.Hash() == rp.blockHash { |
||||
// revert point found at an ancestor of the new head
|
||||
break |
||||
} |
||||
// keep iterating backwards and collecting headers
|
||||
newHeaders = append(newHeaders, chainPtr) |
||||
chainPtr = f.chain.GetHeader(chainPtr.ParentHash, chainPtr.Number.Uint64()-1) |
||||
if chainPtr == nil { |
||||
log.Error("Canonical header not found", "number", chainPtr.Number.Uint64()-1, "hash", chainPtr.ParentHash) |
||||
return |
||||
} |
||||
} |
||||
if rp.blockHash != f.headBlockHash { |
||||
if rp.blockNumber+128 <= f.headBlockNumber { |
||||
log.Warn("Rolling back log index", "old head", f.headBlockNumber, "new head", chainPtr.Number.Uint64()) |
||||
} |
||||
if err := f.revertTo(rp); err != nil { |
||||
log.Error("Error applying revert point", "block number", chainPtr.Number.Uint64(), "error", err) |
||||
return |
||||
} |
||||
} |
||||
|
||||
if newHeaders == nil { |
||||
return |
||||
} |
||||
// add logs of new blocks in reverse order
|
||||
update := f.newUpdateBatch() |
||||
for i := len(newHeaders) - 1; i >= 0; i-- { |
||||
newHeader := newHeaders[i] |
||||
receipts := f.chain.GetReceiptsByHash(newHeader.Hash()) |
||||
if receipts == nil { |
||||
log.Error("Could not retrieve block receipts for new block", "number", newHeader.Number, "hash", newHeader.Hash()) |
||||
break |
||||
} |
||||
if err := update.addBlockToHead(newHeader, receipts); err != nil { |
||||
log.Error("Error adding new block", "number", newHeader.Number, "hash", newHeader.Hash(), "error", err) |
||||
break |
||||
} |
||||
if update.updatedRangeLength() >= mapsPerEpoch { |
||||
// limit the amount of data updated in a single batch
|
||||
f.applyUpdateBatch(update) |
||||
update = f.newUpdateBatch() |
||||
} |
||||
} |
||||
f.applyUpdateBatch(update) |
||||
} |
||||
|
||||
// tryExtendTail attempts to extend the log index backwards until it indexes the
|
||||
// genesis block or cannot find more block receipts. Since this is a long process,
|
||||
// stopFn is called after adding each tail block and if it returns true, the
|
||||
// latest batch is written and the function returns.
|
||||
func (f *FilterMaps) tryExtendTail(stopFn func() bool) { |
||||
fmr := f.getRange() |
||||
number, parentHash := fmr.tailBlockNumber, fmr.tailParentHash |
||||
if number == 0 { |
||||
return |
||||
} |
||||
update := f.newUpdateBatch() |
||||
lastTailEpoch := update.tailEpoch() |
||||
for number > 0 && !stopFn() { |
||||
if tailEpoch := update.tailEpoch(); tailEpoch < lastTailEpoch { |
||||
// limit the amount of data updated in a single batch
|
||||
f.applyUpdateBatch(update) |
||||
update = f.newUpdateBatch() |
||||
lastTailEpoch = tailEpoch |
||||
} |
||||
newTail := f.chain.GetHeader(parentHash, number-1) |
||||
if newTail == nil { |
||||
log.Error("Tail header not found", "number", number-1, "hash", parentHash) |
||||
break |
||||
} |
||||
receipts := f.chain.GetReceiptsByHash(newTail.Hash()) |
||||
if receipts == nil { |
||||
log.Error("Could not retrieve block receipts for tail block", "number", newTail.Number, "hash", newTail.Hash()) |
||||
break |
||||
} |
||||
if err := update.addBlockToTail(newTail, receipts); err != nil { |
||||
log.Error("Error adding tail block", "number", newTail.Number, "hash", newTail.Hash(), "error", err) |
||||
break |
||||
} |
||||
number, parentHash = newTail.Number.Uint64(), newTail.ParentHash |
||||
} |
||||
f.applyUpdateBatch(update) |
||||
} |
||||
|
||||
// updateBatch is a memory overlay collecting changes to the index log structure
|
||||
// that can be written to the database in a single batch while the in-memory
|
||||
// representations in FilterMaps are also updated.
|
||||
type updateBatch struct { |
||||
filterMapsRange |
||||
maps map[uint32]*filterMap // nil rows are unchanged
|
||||
getFilterMapRow func(mapIndex, rowIndex uint32) (FilterRow, error) |
||||
blockLvPointer map[uint64]uint64 // removedPointer means delete
|
||||
mapBlockPtr map[uint32]uint64 // removedPointer means delete
|
||||
revertPoints map[uint64]*revertPoint |
||||
firstMap, afterLastMap uint32 |
||||
} |
||||
|
||||
// newUpdateBatch creates a new updateBatch.
|
||||
func (f *FilterMaps) newUpdateBatch() *updateBatch { |
||||
f.lock.RLock() |
||||
defer f.lock.RUnlock() |
||||
|
||||
return &updateBatch{ |
||||
filterMapsRange: f.filterMapsRange, |
||||
maps: make(map[uint32]*filterMap), |
||||
getFilterMapRow: f.getFilterMapRow, |
||||
blockLvPointer: make(map[uint64]uint64), |
||||
mapBlockPtr: make(map[uint32]uint64), |
||||
revertPoints: make(map[uint64]*revertPoint), |
||||
} |
||||
} |
||||
|
||||
// applyUpdateBatch writes creates a batch and writes all changes to the database
|
||||
// and also updates the in-memory representations of log index data.
|
||||
func (f *FilterMaps) applyUpdateBatch(u *updateBatch) { |
||||
f.lock.Lock() |
||||
defer f.lock.Unlock() |
||||
|
||||
batch := f.db.NewBatch() |
||||
// write or remove block to log value index pointers
|
||||
for blockNumber, lvPointer := range u.blockLvPointer { |
||||
if lvPointer != removedPointer { |
||||
f.storeBlockLvPointer(batch, blockNumber, lvPointer) |
||||
} else { |
||||
f.deleteBlockLvPointer(batch, blockNumber) |
||||
} |
||||
} |
||||
// write or remove filter map to block number pointers
|
||||
for mapIndex, blockNumber := range u.mapBlockPtr { |
||||
if blockNumber != removedPointer { |
||||
f.storeMapBlockPtr(batch, mapIndex, blockNumber) |
||||
} else { |
||||
f.deleteMapBlockPtr(batch, mapIndex) |
||||
} |
||||
} |
||||
// write filter map rows
|
||||
for rowIndex := uint32(0); rowIndex < mapHeight; rowIndex++ { |
||||
for mapIndex := u.firstMap; mapIndex < u.afterLastMap; mapIndex++ { |
||||
if fm := u.maps[mapIndex]; fm != nil { |
||||
if row := (*fm)[rowIndex]; row != nil { |
||||
f.storeFilterMapRow(batch, mapIndex, rowIndex, row) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
// delete removed revert points from the database
|
||||
if u.headBlockNumber < f.headBlockNumber { |
||||
for b := u.headBlockNumber + 1; b <= f.headBlockNumber; b++ { |
||||
delete(f.revertPoints, b) |
||||
if b%revertPointFrequency == 0 { |
||||
rawdb.DeleteRevertPoint(batch, b) |
||||
} |
||||
} |
||||
} |
||||
// delete removed revert points from the memory cache
|
||||
if u.headBlockNumber > f.headBlockNumber { |
||||
for b := f.headBlockNumber + 1; b <= u.headBlockNumber; b++ { |
||||
delete(f.revertPoints, b-cachedRevertPoints) |
||||
} |
||||
} |
||||
// store new revert points in database and/or memory
|
||||
for b, rp := range u.revertPoints { |
||||
if b+cachedRevertPoints > u.headBlockNumber { |
||||
f.revertPoints[b] = rp |
||||
} |
||||
if b%revertPointFrequency == 0 { |
||||
rawdb.WriteRevertPoint(batch, b, &rawdb.RevertPoint{ |
||||
BlockHash: rp.blockHash, |
||||
MapIndex: rp.mapIndex, |
||||
RowLength: rp.rowLength[:], |
||||
}) |
||||
} |
||||
} |
||||
// update filterMapsRange
|
||||
f.setRange(batch, u.filterMapsRange) |
||||
if err := batch.Write(); err != nil { |
||||
log.Crit("Could not write update batch", "error", err) |
||||
} |
||||
log.Info("Log index block range updated", "tail", u.tailBlockNumber, "head", u.headBlockNumber, "log values", u.headLvPointer-u.tailLvPointer) |
||||
} |
||||
|
||||
// updatedRangeLength returns the lenght of the updated filter map range.
|
||||
func (u *updateBatch) updatedRangeLength() uint32 { |
||||
return u.afterLastMap - u.firstMap |
||||
} |
||||
|
||||
// tailEpoch returns the tail epoch index.
|
||||
func (u *updateBatch) tailEpoch() uint32 { |
||||
return uint32(u.tailLvPointer >> (logValuesPerMap + logMapsPerEpoch)) |
||||
} |
||||
|
||||
// getRowPtr returns a pointer to a FilterRow that can be modified. If the batch
|
||||
// did not have a modified version of the given row yet, it is retrieved using the
|
||||
// request function from the backing FilterMaps cache or database and copied
|
||||
// before modification.
|
||||
func (u *updateBatch) getRowPtr(mapIndex, rowIndex uint32) (*FilterRow, error) { |
||||
fm := u.maps[mapIndex] |
||||
if fm == nil { |
||||
fm = new(filterMap) |
||||
u.maps[mapIndex] = fm |
||||
if mapIndex < u.firstMap || u.afterLastMap == 0 { |
||||
u.firstMap = mapIndex |
||||
} |
||||
if mapIndex >= u.afterLastMap { |
||||
u.afterLastMap = mapIndex + 1 |
||||
} |
||||
} |
||||
rowPtr := &(*fm)[rowIndex] |
||||
if *rowPtr == nil { |
||||
if filterRow, err := u.getFilterMapRow(mapIndex, rowIndex); err == nil { |
||||
// filterRow is read only, copy before write
|
||||
*rowPtr = make(FilterRow, len(filterRow), len(filterRow)+8) |
||||
copy(*rowPtr, filterRow) |
||||
} else { |
||||
return nil, err |
||||
} |
||||
} |
||||
return rowPtr, nil |
||||
} |
||||
|
||||
// initWithBlock initializes the log index with the given block as head.
|
||||
func (u *updateBatch) initWithBlock(header *types.Header, receipts types.Receipts) error { |
||||
if u.initialized { |
||||
return errors.New("already initialized") |
||||
} |
||||
u.initialized = true |
||||
u.headLvPointer, u.tailLvPointer = startLvPointer, startLvPointer |
||||
u.headBlockNumber, u.tailBlockNumber = header.Number.Uint64()-1, header.Number.Uint64() //TODO genesis?
|
||||
u.headBlockHash, u.tailParentHash = header.ParentHash, header.ParentHash |
||||
u.addBlockToHead(header, receipts) |
||||
return nil |
||||
} |
||||
|
||||
// addValueToHead adds a single log value to the head of the log index.
|
||||
func (u *updateBatch) addValueToHead(logValue common.Hash) error { |
||||
mapIndex := uint32(u.headLvPointer >> logValuesPerMap) |
||||
rowPtr, err := u.getRowPtr(mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, logValue)) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
column := columnIndex(u.headLvPointer, logValue) |
||||
*rowPtr = append(*rowPtr, column) |
||||
u.headLvPointer++ |
||||
return nil |
||||
} |
||||
|
||||
// addBlockToHead adds the logs of the given block to the head of the log index.
|
||||
// It also adds block to log value index and filter map to block pointers and
|
||||
// a new revert point.
|
||||
func (u *updateBatch) addBlockToHead(header *types.Header, receipts types.Receipts) error { |
||||
if !u.initialized { |
||||
return errors.New("not initialized") |
||||
} |
||||
if header.ParentHash != u.headBlockHash { |
||||
return errors.New("addBlockToHead parent mismatch") |
||||
} |
||||
number := header.Number.Uint64() |
||||
u.blockLvPointer[number] = u.headLvPointer |
||||
startMap := uint32((u.headLvPointer + valuesPerMap - 1) >> logValuesPerMap) |
||||
if err := iterateReceipts(receipts, u.addValueToHead); err != nil { |
||||
return err |
||||
} |
||||
stopMap := uint32((u.headLvPointer + valuesPerMap - 1) >> logValuesPerMap) |
||||
for m := startMap; m < stopMap; m++ { |
||||
u.mapBlockPtr[m] = number |
||||
} |
||||
u.headBlockNumber, u.headBlockHash = number, header.Hash() |
||||
if (u.headBlockNumber-cachedRevertPoints)%revertPointFrequency != 0 { |
||||
delete(u.revertPoints, u.headBlockNumber-cachedRevertPoints) |
||||
} |
||||
if rp, err := u.makeRevertPoint(); err != nil { |
||||
return err |
||||
} else if rp != nil { |
||||
u.revertPoints[u.headBlockNumber] = rp |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// addValueToTail adds a single log value to the tail of the log index.
|
||||
func (u *updateBatch) addValueToTail(logValue common.Hash) error { |
||||
if u.tailLvPointer == 0 { |
||||
return errors.New("tail log value pointer underflow") |
||||
} |
||||
u.tailLvPointer-- |
||||
mapIndex := uint32(u.tailLvPointer >> logValuesPerMap) |
||||
rowPtr, err := u.getRowPtr(mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, logValue)) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
column := columnIndex(u.tailLvPointer, logValue) |
||||
*rowPtr = append(*rowPtr, 0) |
||||
copy((*rowPtr)[1:], (*rowPtr)[:len(*rowPtr)-1]) |
||||
(*rowPtr)[0] = column |
||||
return nil |
||||
} |
||||
|
||||
// addBlockToTail adds the logs of the given block to the tail of the log index.
|
||||
// It also adds block to log value index and filter map to block pointers.
|
||||
func (u *updateBatch) addBlockToTail(header *types.Header, receipts types.Receipts) error { |
||||
if !u.initialized { |
||||
return errors.New("not initialized") |
||||
} |
||||
if header.Hash() != u.tailParentHash { |
||||
return errors.New("addBlockToTail parent mismatch") |
||||
} |
||||
number := header.Number.Uint64() |
||||
stopMap := uint32((u.tailLvPointer + valuesPerMap - 1) >> logValuesPerMap) |
||||
var cnt int |
||||
if err := iterateReceiptsReverse(receipts, func(lv common.Hash) error { |
||||
cnt++ |
||||
return u.addValueToTail(lv) |
||||
}); err != nil { |
||||
return err |
||||
} |
||||
startMap := uint32(u.tailLvPointer >> logValuesPerMap) |
||||
for m := startMap; m < stopMap; m++ { |
||||
u.mapBlockPtr[m] = number |
||||
} |
||||
u.blockLvPointer[number] = u.tailLvPointer |
||||
u.tailBlockNumber, u.tailParentHash = number, header.ParentHash |
||||
return nil |
||||
} |
||||
|
||||
// iterateReceipts iterates the given block receipts, generates log value hashes
|
||||
// and passes them to the given callback function as a parameter.
|
||||
func iterateReceipts(receipts types.Receipts, valueCb func(common.Hash) error) error { |
||||
for _, receipt := range receipts { |
||||
for _, log := range receipt.Logs { |
||||
if err := valueCb(addressValue(log.Address)); err != nil { |
||||
return err |
||||
} |
||||
for _, topic := range log.Topics { |
||||
if err := valueCb(topicValue(topic)); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// iterateReceiptsReverse iterates the given block receipts, generates log value
|
||||
// hashes in reverse order and passes them to the given callback function as a
|
||||
// parameter.
|
||||
func iterateReceiptsReverse(receipts types.Receipts, valueCb func(common.Hash) error) error { |
||||
for i := len(receipts) - 1; i >= 0; i-- { |
||||
logs := receipts[i].Logs |
||||
for j := len(logs) - 1; j >= 0; j-- { |
||||
log := logs[j] |
||||
for k := len(log.Topics) - 1; k >= 0; k-- { |
||||
if err := valueCb(topicValue(log.Topics[k])); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
if err := valueCb(addressValue(log.Address)); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// revertPoint can be used to revert the log index to a certain head block.
|
||||
type revertPoint struct { |
||||
blockNumber uint64 |
||||
blockHash common.Hash |
||||
mapIndex uint32 |
||||
rowLength [mapHeight]uint |
||||
} |
||||
|
||||
// makeRevertPoint creates a new revertPoint.
|
||||
func (u *updateBatch) makeRevertPoint() (*revertPoint, error) { |
||||
rp := &revertPoint{ |
||||
blockNumber: u.headBlockNumber, |
||||
blockHash: u.headBlockHash, |
||||
mapIndex: uint32(u.headLvPointer >> logValuesPerMap), |
||||
} |
||||
if u.tailLvPointer > uint64(rp.mapIndex)<<logValuesPerMap { |
||||
return nil, nil |
||||
} |
||||
for i := range rp.rowLength[:] { |
||||
var row FilterRow |
||||
if m := u.maps[rp.mapIndex]; m != nil { |
||||
row = (*m)[i] |
||||
} |
||||
if row == nil { |
||||
var err error |
||||
row, err = u.getFilterMapRow(rp.mapIndex, uint32(i)) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
rp.rowLength[i] = uint(len(row)) |
||||
} |
||||
return rp, nil |
||||
} |
||||
|
||||
// getRevertPoint retrieves the latest revert point at or before the given block
|
||||
// number from memory cache or from the database if available. If no such revert
|
||||
// point is available then it returns no result and no error.
|
||||
func (f *FilterMaps) getRevertPoint(blockNumber uint64) (*revertPoint, error) { |
||||
if blockNumber > f.headBlockNumber { |
||||
blockNumber = f.headBlockNumber |
||||
} |
||||
if rp := f.revertPoints[blockNumber]; rp != nil { |
||||
return rp, nil |
||||
} |
||||
blockNumber -= blockNumber % revertPointFrequency |
||||
rps, err := rawdb.ReadRevertPoint(f.db, blockNumber) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if rps == nil { |
||||
return nil, nil |
||||
} |
||||
if len(rps.RowLength) != mapHeight { |
||||
return nil, errors.New("invalid number of rows in stored revert point") |
||||
} |
||||
rp := &revertPoint{ |
||||
blockNumber: blockNumber, |
||||
blockHash: rps.BlockHash, |
||||
mapIndex: rps.MapIndex, |
||||
} |
||||
copy(rp.rowLength[:], rps.RowLength) |
||||
return rp, nil |
||||
} |
||||
|
||||
// revertTo reverts the log index to the given revert point.
|
||||
func (f *FilterMaps) revertTo(rp *revertPoint) error { |
||||
batch := f.db.NewBatch() |
||||
afterLastMap := uint32((f.headLvPointer + valuesPerMap - 1) >> logValuesPerMap) |
||||
if rp.mapIndex >= afterLastMap { |
||||
return errors.New("cannot revert (head map behind revert point)") |
||||
} |
||||
lvPointer := uint64(rp.mapIndex) << logValuesPerMap |
||||
for rowIndex, rowLen := range rp.rowLength[:] { |
||||
rowIndex := uint32(rowIndex) |
||||
row, err := f.getFilterMapRow(rp.mapIndex, rowIndex) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if uint(len(row)) < rowLen { |
||||
return errors.New("cannot revert (row too short)") |
||||
} |
||||
if uint(len(row)) > rowLen { |
||||
f.storeFilterMapRow(batch, rp.mapIndex, rowIndex, row[:rowLen]) |
||||
} |
||||
for mapIndex := rp.mapIndex + 1; mapIndex < afterLastMap; mapIndex++ { |
||||
f.storeFilterMapRow(batch, mapIndex, rowIndex, emptyRow) |
||||
} |
||||
lvPointer += uint64(rowLen) |
||||
} |
||||
for mapIndex := rp.mapIndex + 1; mapIndex < afterLastMap; mapIndex++ { |
||||
f.deleteMapBlockPtr(batch, mapIndex) |
||||
} |
||||
for blockNumber := rp.blockNumber + 1; blockNumber <= f.headBlockNumber; blockNumber++ { |
||||
f.deleteBlockLvPointer(batch, blockNumber) |
||||
} |
||||
newRange := f.filterMapsRange |
||||
newRange.headLvPointer = lvPointer |
||||
newRange.headBlockNumber = rp.blockNumber |
||||
newRange.headBlockHash = rp.blockHash |
||||
f.setRange(batch, newRange) |
||||
if err := batch.Write(); err != nil { |
||||
log.Crit("Could not write update batch", "error", err) |
||||
} |
||||
return nil |
||||
} |
@ -0,0 +1,500 @@ |
||||
package filtermaps |
||||
|
||||
import ( |
||||
"context" |
||||
"math" |
||||
"sync" |
||||
"sync/atomic" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/core/types" |
||||
) |
||||
|
||||
// MatcherBackend defines the functions required for searching in the log index
|
||||
// data structure. It is currently implemented by FilterMapsMatcherBackend but
|
||||
// once EIP-7745 is implemented and active, these functions can also be trustlessly
|
||||
// served by a remote prover.
|
||||
type MatcherBackend interface { |
||||
GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) |
||||
GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error) |
||||
GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) |
||||
} |
||||
|
||||
// GetPotentialMatches returns a list of logs that are potential matches for the
|
||||
// given filter criteria. Note that the returned list may still contain false
|
||||
// positives.
|
||||
//TODO add protection against reorgs during search
|
||||
func GetPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, error) { |
||||
// find the log value index range to search
|
||||
firstIndex, err := backend.GetBlockLvPointer(ctx, firstBlock) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
lastIndex, err := backend.GetBlockLvPointer(ctx, lastBlock+1) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if lastIndex > 0 { |
||||
lastIndex-- |
||||
} |
||||
firstMap, lastMap := uint32(firstIndex>>logValuesPerMap), uint32(lastIndex>>logValuesPerMap) |
||||
firstEpoch, lastEpoch := firstMap>>logMapsPerEpoch, lastMap>>logMapsPerEpoch |
||||
|
||||
// build matcher according to the given filter criteria
|
||||
matchers := make([]matcher, len(topics)+1) |
||||
// matchAddress signals a match when there is a match for any of the given
|
||||
// addresses.
|
||||
// If the list of addresses is empty then it creates a "wild card" matcher
|
||||
// that signals every index as a potential match.
|
||||
matchAddress := make(matchAny, len(addresses)) |
||||
for i, address := range addresses { |
||||
matchAddress[i] = &singleMatcher{backend: backend, value: addressValue(address)} |
||||
} |
||||
matchers[0] = matchAddress |
||||
for i, topicList := range topics { |
||||
// matchTopic signals a match when there is a match for any of the topics
|
||||
// specified for the given position (topicList).
|
||||
// If topicList is empty then it creates a "wild card" matcher that signals
|
||||
// every index as a potential match.
|
||||
matchTopic := make(matchAny, len(topicList)) |
||||
for j, topic := range topicList { |
||||
matchTopic[j] = &singleMatcher{backend: backend, value: topicValue(topic)} |
||||
} |
||||
matchers[i+1] = matchTopic |
||||
} |
||||
// matcher is the final sequence matcher that signals a match when all underlying
|
||||
// matchers signal a match for consecutive log value indices.
|
||||
matcher := newMatchSequence(matchers) |
||||
|
||||
// processEpoch returns the potentially matching logs from the given epoch.
|
||||
processEpoch := func(epochIndex uint32) ([]*types.Log, error) { |
||||
var logs []*types.Log |
||||
// create a list of map indices to process
|
||||
fm, lm := epochIndex<<logMapsPerEpoch, (epochIndex+1)<<logMapsPerEpoch-1 |
||||
if fm < firstMap { |
||||
fm = firstMap |
||||
} |
||||
if lm > lastMap { |
||||
lm = lastMap |
||||
} |
||||
//
|
||||
mapIndices := make([]uint32, lm+1-fm) |
||||
for i := range mapIndices { |
||||
mapIndices[i] = fm + uint32(i) |
||||
} |
||||
// find potential matches
|
||||
matches, err := matcher.getMatches(ctx, mapIndices) |
||||
if err != nil { |
||||
return logs, err |
||||
} |
||||
// get the actual logs located at the matching log value indices
|
||||
for _, m := range matches { |
||||
mlogs, err := getLogsFromMatches(ctx, backend, firstIndex, lastIndex, m) |
||||
if err != nil { |
||||
return logs, err |
||||
} |
||||
logs = append(logs, mlogs...) |
||||
} |
||||
return logs, nil |
||||
} |
||||
|
||||
type task struct { |
||||
epochIndex uint32 |
||||
logs []*types.Log |
||||
err error |
||||
done chan struct{} |
||||
} |
||||
|
||||
taskCh := make(chan *task) |
||||
var wg sync.WaitGroup |
||||
defer func() { |
||||
close(taskCh) |
||||
wg.Wait() |
||||
}() |
||||
|
||||
worker := func() { |
||||
for task := range taskCh { |
||||
if task == nil { |
||||
break |
||||
} |
||||
task.logs, task.err = processEpoch(task.epochIndex) |
||||
close(task.done) |
||||
} |
||||
wg.Done() |
||||
return |
||||
} |
||||
|
||||
for i := 0; i < 4; i++ { |
||||
wg.Add(1) |
||||
go worker() |
||||
} |
||||
|
||||
var logs []*types.Log |
||||
// startEpoch is the next task to send whenever a worker can accept it.
|
||||
// waitEpoch is the next task we are waiting for to finish in order to append
|
||||
// results in the correct order.
|
||||
startEpoch, waitEpoch := firstEpoch, firstEpoch |
||||
tasks := make(map[uint32]*task) |
||||
tasks[startEpoch] = &task{epochIndex: startEpoch, done: make(chan struct{})} |
||||
for waitEpoch <= lastEpoch { |
||||
select { |
||||
case taskCh <- tasks[startEpoch]: |
||||
startEpoch++ |
||||
if startEpoch <= lastEpoch { |
||||
if tasks[startEpoch] == nil { |
||||
tasks[startEpoch] = &task{epochIndex: startEpoch, done: make(chan struct{})} |
||||
} |
||||
} |
||||
case <-tasks[waitEpoch].done: |
||||
logs = append(logs, tasks[waitEpoch].logs...) |
||||
if err := tasks[waitEpoch].err; err != nil { |
||||
return logs, err |
||||
} |
||||
delete(tasks, waitEpoch) |
||||
waitEpoch++ |
||||
if waitEpoch <= lastEpoch { |
||||
if tasks[waitEpoch] == nil { |
||||
tasks[waitEpoch] = &task{epochIndex: waitEpoch, done: make(chan struct{})} |
||||
} |
||||
} |
||||
} |
||||
} |
||||
return logs, nil |
||||
} |
||||
|
||||
// getLogsFromMatches returns the list of potentially matching logs located at
|
||||
// the given list of matching log indices. Matches outside the firstIndex to
|
||||
// lastIndex range are not returned.
|
||||
func getLogsFromMatches(ctx context.Context, backend MatcherBackend, firstIndex, lastIndex uint64, matches potentialMatches) ([]*types.Log, error) { |
||||
var logs []*types.Log |
||||
for _, match := range matches { |
||||
if match < firstIndex || match > lastIndex { |
||||
continue |
||||
} |
||||
log, err := backend.GetLogByLvIndex(ctx, match) |
||||
if err != nil { |
||||
return logs, err |
||||
} |
||||
if log != nil { |
||||
logs = append(logs, log) |
||||
} |
||||
} |
||||
return logs, nil |
||||
} |
||||
|
||||
// matcher interface is defined so that individual address/topic matchers can be
|
||||
// combined into a pattern matcher (see matchAny and matchSequence).
|
||||
type matcher interface { |
||||
// getMatches takes a list of map indices and returns an equal number of
|
||||
// potentialMatches, one for each corresponding map index.
|
||||
// Note that the map index list is typically a list of the potentially
|
||||
// interesting maps from an epoch, plus sometimes the first map of the next
|
||||
// epoch if it is required for sequence matching.
|
||||
getMatches(ctx context.Context, mapIndices []uint32) ([]potentialMatches, error) |
||||
} |
||||
|
||||
// singleMatcher implements matcher by returning matches for a single log value hash.
|
||||
type singleMatcher struct { |
||||
backend MatcherBackend |
||||
value common.Hash |
||||
} |
||||
|
||||
// getMatches implements matcher
|
||||
func (s *singleMatcher) getMatches(ctx context.Context, mapIndices []uint32) ([]potentialMatches, error) { |
||||
results := make([]potentialMatches, len(mapIndices)) |
||||
for i, mapIndex := range mapIndices { |
||||
filterRow, err := s.backend.GetFilterMapRow(ctx, mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, s.value)) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
results[i] = filterRow.potentialMatches(mapIndex, s.value) |
||||
} |
||||
return results, nil |
||||
} |
||||
|
||||
// matchAny combinines a set of matchers and returns a match for every position
|
||||
// where any of the underlying matchers signaled a match. A zero-length matchAny
|
||||
// acts as a "wild card" that signals a potential match at every position.
|
||||
type matchAny []matcher |
||||
|
||||
// getMatches implements matcher
|
||||
func (m matchAny) getMatches(ctx context.Context, mapIndices []uint32) ([]potentialMatches, error) { |
||||
if len(m) == 0 { |
||||
// return "wild card" results (potentialMatches(nil) is interpreted as a
|
||||
// potential match at every log value index of the map).
|
||||
return make([]potentialMatches, len(mapIndices)), nil |
||||
} |
||||
if len(m) == 1 { |
||||
return m[0].getMatches(ctx, mapIndices) |
||||
} |
||||
matches := make([][]potentialMatches, len(m)) |
||||
for i, matcher := range m { |
||||
var err error |
||||
if matches[i], err = matcher.getMatches(ctx, mapIndices); err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
results := make([]potentialMatches, len(mapIndices)) |
||||
merge := make([]potentialMatches, len(m)) |
||||
for i := range results { |
||||
for j := range merge { |
||||
merge[j] = matches[j][i] |
||||
} |
||||
results[i] = mergeResults(merge) |
||||
} |
||||
return results, nil |
||||
} |
||||
|
||||
// mergeResults merges multiple lists of matches into a single one, preserving
|
||||
// ascending order and filtering out any duplicates.
|
||||
func mergeResults(results []potentialMatches) potentialMatches { |
||||
if len(results) == 0 { |
||||
return nil |
||||
} |
||||
var sumLen int |
||||
for _, res := range results { |
||||
if res == nil { |
||||
// nil is a wild card; all indices in map range are potential matches
|
||||
return nil |
||||
} |
||||
sumLen += len(res) |
||||
} |
||||
merged := make(potentialMatches, 0, sumLen) |
||||
for { |
||||
best := -1 |
||||
for i, res := range results { |
||||
if len(res) == 0 { |
||||
continue |
||||
} |
||||
if best < 0 || res[0] < results[best][0] { |
||||
best = i |
||||
} |
||||
} |
||||
if best < 0 { |
||||
return merged |
||||
} |
||||
if len(merged) == 0 || results[best][0] > merged[len(merged)-1] { |
||||
merged = append(merged, results[best][0]) |
||||
} |
||||
results[best] = results[best][1:] |
||||
} |
||||
} |
||||
|
||||
// matchSequence combines two matchers, a "base" and a "next" matcher with a
|
||||
// positive integer offset so that the resulting matcher signals a match at log
|
||||
// value index X when the base matcher returns a match at X and the next matcher
|
||||
// gives a match at X+offset. Note that matchSequence can be used recursively to
|
||||
// detect any log value sequence.
|
||||
type matchSequence struct { |
||||
base, next matcher |
||||
offset uint64 |
||||
// *EmptyRate == totalCount << 32 + emptyCount (atomically accessed)
|
||||
baseEmptyRate, nextEmptyRate uint64 |
||||
} |
||||
|
||||
// newMatchSequence creates a recursive sequence matcher from a list of underlying
|
||||
// matchers. The resulting matcher signals a match at log value index X when each
|
||||
// underlying matcher matchers[i] returns a match at X+i.
|
||||
func newMatchSequence(matchers []matcher) matcher { |
||||
if len(matchers) == 0 { |
||||
panic("zero length sequence matchers are not allowed") |
||||
} |
||||
if len(matchers) == 1 { |
||||
return matchers[0] |
||||
} |
||||
return &matchSequence{ |
||||
base: newMatchSequence(matchers[:len(matchers)-1]), |
||||
next: matchers[len(matchers)-1], |
||||
offset: uint64(len(matchers) - 1), |
||||
} |
||||
} |
||||
|
||||
// getMatches implements matcher
|
||||
func (m *matchSequence) getMatches(ctx context.Context, mapIndices []uint32) ([]potentialMatches, error) { |
||||
// decide whether to evaluate base or next matcher first
|
||||
baseEmptyRate := atomic.LoadUint64(&m.baseEmptyRate) |
||||
nextEmptyRate := atomic.LoadUint64(&m.nextEmptyRate) |
||||
baseTotal, baseEmpty := baseEmptyRate>>32, uint64(uint32(baseEmptyRate)) |
||||
nextTotal, nextEmpty := nextEmptyRate>>32, uint64(uint32(nextEmptyRate)) |
||||
baseFirst := baseEmpty*nextTotal >= nextEmpty*baseTotal/2 |
||||
|
||||
var ( |
||||
baseRes, nextRes []potentialMatches |
||||
baseIndices []uint32 |
||||
) |
||||
if baseFirst { |
||||
// base first mode; request base matcher
|
||||
baseIndices = mapIndices |
||||
var err error |
||||
baseRes, err = m.base.getMatches(ctx, baseIndices) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
|
||||
// determine set of indices to request from next matcher
|
||||
nextIndices := make([]uint32, 0, len(mapIndices)*3/2) |
||||
lastAdded := uint32(math.MaxUint32) |
||||
for i, mapIndex := range mapIndices { |
||||
if baseFirst && baseRes[i] != nil && len(baseRes[i]) == 0 { |
||||
// do not request map index from next matcher if no results from base matcher
|
||||
continue |
||||
} |
||||
if lastAdded != mapIndex { |
||||
nextIndices = append(nextIndices, mapIndex) |
||||
lastAdded = mapIndex |
||||
} |
||||
if !baseFirst || baseRes[i] == nil || baseRes[i][len(baseRes[i])-1] >= (uint64(mapIndex+1)<<logValuesPerMap)-m.offset { |
||||
nextIndices = append(nextIndices, mapIndex+1) |
||||
lastAdded = mapIndex + 1 |
||||
} |
||||
} |
||||
|
||||
if len(nextIndices) != 0 { |
||||
// request next matcher
|
||||
var err error |
||||
nextRes, err = m.next.getMatches(ctx, nextIndices) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
|
||||
if !baseFirst { |
||||
// next first mode; determine set of indices to request from base matcher
|
||||
baseIndices = make([]uint32, 0, len(mapIndices)) |
||||
var nextPtr int |
||||
for _, mapIndex := range mapIndices { |
||||
// find corresponding results in nextRes
|
||||
for nextPtr+1 < len(nextIndices) && nextIndices[nextPtr] < mapIndex { |
||||
nextPtr++ |
||||
} |
||||
if nextPtr+1 >= len(nextIndices) { |
||||
break |
||||
} |
||||
if nextIndices[nextPtr] != mapIndex || nextIndices[nextPtr+1] != mapIndex+1 { |
||||
panic("invalid nextIndices") |
||||
} |
||||
next1, next2 := nextRes[nextPtr], nextRes[nextPtr+1] |
||||
if next1 == nil || (len(next1) > 0 && next1[len(next1)-1] >= (uint64(mapIndex)<<logValuesPerMap)+m.offset) || |
||||
next2 == nil || (len(next2) > 0 && next2[0] < (uint64(mapIndex+1)<<logValuesPerMap)+m.offset) { |
||||
baseIndices = append(baseIndices, mapIndex) |
||||
} |
||||
} |
||||
if len(baseIndices) != 0 { |
||||
// request base matcher
|
||||
var err error |
||||
baseRes, err = m.base.getMatches(ctx, baseIndices) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
} |
||||
|
||||
// all potential matches of base and next matchers obtained, update empty rates
|
||||
for _, res := range baseRes { |
||||
if res != nil && len(res) == 0 { |
||||
atomic.AddUint64(&m.baseEmptyRate, 0x100000001) |
||||
} else { |
||||
atomic.AddUint64(&m.baseEmptyRate, 0x100000000) |
||||
} |
||||
} |
||||
for _, res := range nextRes { |
||||
if res != nil && len(res) == 0 { |
||||
atomic.AddUint64(&m.nextEmptyRate, 0x100000001) |
||||
} else { |
||||
atomic.AddUint64(&m.nextEmptyRate, 0x100000000) |
||||
} |
||||
} |
||||
|
||||
// define iterator functions to find base/next matcher results by map index
|
||||
var basePtr int |
||||
baseResult := func(mapIndex uint32) potentialMatches { |
||||
for basePtr < len(baseIndices) && baseIndices[basePtr] <= mapIndex { |
||||
if baseIndices[basePtr] == mapIndex { |
||||
return baseRes[basePtr] |
||||
} |
||||
basePtr++ |
||||
} |
||||
return noMatches |
||||
} |
||||
var nextPtr int |
||||
nextResult := func(mapIndex uint32) potentialMatches { |
||||
for nextPtr < len(nextIndices) && nextIndices[nextPtr] <= mapIndex { |
||||
if nextIndices[nextPtr] == mapIndex { |
||||
return nextRes[nextPtr] |
||||
} |
||||
nextPtr++ |
||||
} |
||||
return noMatches |
||||
} |
||||
|
||||
// match corresponding base and next matcher results
|
||||
results := make([]potentialMatches, len(mapIndices)) |
||||
for i, mapIndex := range mapIndices { |
||||
results[i] = matchSequenceResults(mapIndex, m.offset, baseResult(mapIndex), nextResult(mapIndex), nextResult(mapIndex+1)) |
||||
} |
||||
return results, nil |
||||
} |
||||
|
||||
// matchSequenceResults returns a list of sequence matches for the given mapIndex
|
||||
// and offset based on the base matcher's results at mapIndex and the next matcher's
|
||||
// results at mapIndex and mapIndex+1. Note that acquiring nextNextRes may be
|
||||
// skipped and it can be substituted with an empty list if baseRes has no potential
|
||||
// matches that could be sequence matched with anything that could be in nextNextRes.
|
||||
func matchSequenceResults(mapIndex uint32, offset uint64, baseRes, nextRes, nextNextRes potentialMatches) potentialMatches { |
||||
if nextRes == nil || (baseRes != nil && len(baseRes) == 0) { |
||||
// if nextRes is a wild card or baseRes is empty then the sequence matcher
|
||||
// result equals baseRes.
|
||||
return baseRes |
||||
} |
||||
if len(nextRes) > 0 { |
||||
// discard items from nextRes whose corresponding base matcher results
|
||||
// with the negative offset applied would be located at mapIndex-1.
|
||||
start := 0 |
||||
for start < len(nextRes) && nextRes[start] < uint64(mapIndex)<<logValuesPerMap+offset { |
||||
start++ |
||||
} |
||||
nextRes = nextRes[start:] |
||||
} |
||||
if len(nextNextRes) > 0 { |
||||
// discard items from nextNextRes whose corresponding base matcher results
|
||||
// with the negative offset applied would still be located at mapIndex+1.
|
||||
stop := 0 |
||||
for stop < len(nextNextRes) && nextNextRes[stop] < uint64(mapIndex+1)<<logValuesPerMap+offset { |
||||
stop++ |
||||
} |
||||
nextNextRes = nextNextRes[:stop] |
||||
} |
||||
maxLen := len(nextRes) + len(nextNextRes) |
||||
if maxLen == 0 { |
||||
return nextRes |
||||
} |
||||
if len(baseRes) < maxLen { |
||||
maxLen = len(baseRes) |
||||
} |
||||
// iterate through baseRes, nextRes and nextNextRes and collect matching results.
|
||||
matchedRes := make(potentialMatches, 0, maxLen) |
||||
for _, nextRes := range []potentialMatches{nextRes, nextNextRes} { |
||||
if baseRes != nil { |
||||
for len(nextRes) > 0 && len(baseRes) > 0 { |
||||
if nextRes[0] > baseRes[0]+offset { |
||||
baseRes = baseRes[1:] |
||||
} else if nextRes[0] < baseRes[0]+offset { |
||||
nextRes = nextRes[1:] |
||||
} else { |
||||
matchedRes = append(matchedRes, baseRes[0]) |
||||
baseRes = baseRes[1:] |
||||
nextRes = nextRes[1:] |
||||
} |
||||
} |
||||
} else { |
||||
// baseRes is a wild card so just return next matcher results with
|
||||
// negative offset.
|
||||
for len(nextRes) > 0 { |
||||
matchedRes = append(matchedRes, nextRes[0]-offset) |
||||
nextRes = nextRes[1:] |
||||
} |
||||
} |
||||
} |
||||
return matchedRes |
||||
} |
Loading…
Reference in new issue