core/filtermaps: simplified locking scheme

pull/30370/head
Zsolt Felfoldi 2 months ago
parent 989f2c2a3d
commit 5f3903c869
  1. 64
      core/filtermaps/filtermaps.go
  2. 94
      core/filtermaps/indexer.go
  3. 21
      core/filtermaps/indexer_test.go
  4. 40
      core/filtermaps/matcher_backend.go
  5. 4
      core/filtermaps/matcher_test.go

@ -57,24 +57,32 @@ type FilterMaps struct {
chain blockchain
matcherSyncCh chan *FilterMapsMatcherBackend
// db and range are only modified by indexer under write lock; indexer can
// read them without a lock while matchers can access them under read lock
lock sync.RWMutex
db ethdb.KeyValueStore
filterMapsRange
matchers map[*FilterMapsMatcherBackend]struct{}
// fields written by the indexer and read by matcher backend. Indexer can
// read them without a lock and write them under indexLock write lock.
// Matcher backend can read them under indexLock read lock.
indexLock sync.RWMutex
filterMapsRange
// filterMapCache caches certain filter maps (headCacheSize most recent maps
// and one tail map) that are expected to be frequently accessed and modified
// while updating the structure. Note that the set of cached maps depends
// only on filterMapsRange and rows of other maps are not cached here.
filterMapLock sync.Mutex
filterMapCache map[uint32]filterMap
// also accessed by indexer and matcher backend but no locking needed.
blockPtrCache *lru.Cache[uint32, uint64]
lvPointerCache *lru.Cache[uint64, uint64]
revertPoints map[uint64]*revertPoint
// the matchers set and the fields of FilterMapsMatcherBackend instances are
// read and written both by exported functions and the indexer.
// Note that if both indexLock and matchersLock needs to be locked then
// indexLock should be locked first.
matchersLock sync.Mutex
matchers map[*FilterMapsMatcherBackend]struct{}
// fields only accessed by the indexer (no mutex required).
revertPoints map[uint64]*revertPoint
startHeadUpdate, loggedHeadUpdate, loggedTailExtend, loggedTailUnindex bool
startedHeadUpdate, startedTailExtend, startedTailUnindex time.Time
lastLogHeadUpdate, lastLogTailExtend, lastLogTailUnindex time.Time
@ -177,16 +185,16 @@ func (f *FilterMaps) Stop() {
// reset un-initializes the FilterMaps structure and removes all related data from
// the database. The function returns true if everything was successfully removed.
func (f *FilterMaps) reset() bool {
f.lock.Lock()
f.indexLock.Lock()
f.filterMapsRange = filterMapsRange{}
f.filterMapCache = make(map[uint32]filterMap)
f.revertPoints = make(map[uint64]*revertPoint)
f.blockPtrCache.Purge()
f.lvPointerCache.Purge()
f.indexLock.Unlock()
// deleting the range first ensures that resetDb will be called again at next
// startup and any leftover data will be removed even if it cannot finish now.
rawdb.DeleteFilterMapsRange(f.db)
f.lock.Unlock()
return f.removeDbWithPrefix(rawdb.FilterMapsPrefix, "Resetting log index database")
}
@ -240,7 +248,7 @@ func (f *FilterMaps) removeDbWithPrefix(prefix []byte, action string) bool {
}
// setRange updates the covered range and also adds the changes to the given batch.
// Note that this function assumes that the read/write lock is being held.
// Note that this function assumes that the index write lock is being held.
func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newRange filterMapsRange) {
f.filterMapsRange = newRange
rs := rawdb.FilterMapsRange{
@ -259,14 +267,11 @@ func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newRange filterMapsRan
// updateMapCache updates the maps covered by the filterMapCache according to the
// covered range.
// Note that this function assumes that the read lock is being held.
// Note that this function assumes that the index write lock is being held.
func (f *FilterMaps) updateMapCache() {
if !f.initialized {
return
}
f.filterMapLock.Lock()
defer f.filterMapLock.Unlock()
newFilterMapCache := make(map[uint32]filterMap)
firstMap, afterLastMap := uint32(f.tailBlockLvPointer>>f.logValuesPerMap), uint32((f.headLvPointer+f.valuesPerMap-1)>>f.logValuesPerMap)
headCacheFirst := firstMap + 1
@ -294,7 +299,8 @@ func (f *FilterMaps) updateMapCache() {
// Note that this function assumes that the log index structure is consistent
// with the canonical chain at the point where the given log value index points.
// If this is not the case then an invalid result or an error may be returned.
// Note that this function assumes that the read lock is being held.
// Note that this function assumes that the indexer read lock is being held when
// called from outside the updateLoop goroutine.
func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
if lvIndex < f.tailBlockLvPointer || lvIndex > f.headLvPointer {
return nil, nil
@ -361,10 +367,9 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
// then a non-nil zero length row is returned.
// Note that the returned slices should not be modified, they should be copied
// on write.
// Note that the function assumes that the indexLock is not being held (should
// only be called from the updateLoop goroutine).
func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32) (FilterRow, error) {
f.filterMapLock.Lock()
defer f.filterMapLock.Unlock()
fm := f.filterMapCache[mapIndex]
if fm != nil && fm[rowIndex] != nil {
return fm[rowIndex], nil
@ -374,19 +379,31 @@ func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32) (FilterRow, erro
return nil, err
}
if fm != nil {
f.indexLock.Lock()
fm[rowIndex] = FilterRow(row)
f.indexLock.Unlock()
}
return FilterRow(row), nil
}
// getFilterMapRowUncached returns the given row of the given map. If the row is
// empty then a non-nil zero length row is returned.
// This function bypasses the memory cache which is mostly useful for processing
// the head and tail maps during the indexing process and should be used by the
// matcher backend which rarely accesses the same row twice and therefore does
// not really benefit from caching anyways.
// The function is unaffected by the indexLock mutex.
func (f *FilterMaps) getFilterMapRowUncached(mapIndex, rowIndex uint32) (FilterRow, error) {
row, err := rawdb.ReadFilterMapRow(f.db, f.mapRowIndex(mapIndex, rowIndex))
return FilterRow(row), err
}
// storeFilterMapRow stores a row at the given row index of the given map and also
// caches it in filterMapCache if the given map is cached.
// Note that empty rows are not stored in the database and therefore there is no
// separate delete function; deleting a row is the same as storing an empty row.
// Note that this function assumes that the indexer write lock is being held.
func (f *FilterMaps) storeFilterMapRow(batch ethdb.Batch, mapIndex, rowIndex uint32, row FilterRow) {
f.filterMapLock.Lock()
defer f.filterMapLock.Unlock()
if fm := f.filterMapCache[mapIndex]; fm != nil {
fm[rowIndex] = row
}
@ -407,7 +424,8 @@ func (f *FilterMaps) mapRowIndex(mapIndex, rowIndex uint32) uint64 {
// getBlockLvPointer returns the starting log value index where the log values
// generated by the given block are located. If blockNumber is beyond the current
// head then the first unoccupied log value index is returned.
// Note that this function assumes that the read lock is being held.
// Note that this function assumes that the indexer read lock is being held when
// called from outside the updateLoop goroutine.
func (f *FilterMaps) getBlockLvPointer(blockNumber uint64) (uint64, error) {
if blockNumber > f.headBlockNumber {
return f.headLvPointer, nil

@ -46,9 +46,9 @@ func (f *FilterMaps) updateLoop() {
return
}
f.lock.Lock()
f.indexLock.Lock()
f.updateMapCache()
f.lock.Unlock()
f.indexLock.Unlock()
if rp, err := f.newUpdateBatch().makeRevertPoint(); err == nil {
f.revertPoints[rp.blockNumber] = rp
} else {
@ -61,11 +61,10 @@ func (f *FilterMaps) updateLoop() {
head = f.chain.CurrentBlock()
stop bool
syncMatcher *FilterMapsMatcherBackend
fmr = f.getRange()
)
matcherSync := func() {
if syncMatcher != nil && fmr.initialized && fmr.headBlockHash == head.Hash() {
if syncMatcher != nil && f.initialized && f.headBlockHash == head.Hash() {
syncMatcher.synced(head)
syncMatcher = nil
}
@ -92,7 +91,7 @@ func (f *FilterMaps) updateLoop() {
stop = true
case ch := <-f.waitIdleCh:
head = f.chain.CurrentBlock()
if head.Hash() == f.getRange().headBlockHash {
if head.Hash() == f.headBlockHash {
ch <- true
continue loop
}
@ -110,27 +109,24 @@ func (f *FilterMaps) updateLoop() {
return
}
}
fmr = f.getRange()
for !stop {
if !fmr.initialized {
if !f.initialized {
if !f.tryInit(head) {
return
}
fmr = f.getRange()
if !fmr.initialized {
if !f.initialized {
wait()
continue
}
}
// log index is initialized
if fmr.headBlockHash != head.Hash() {
if f.headBlockHash != head.Hash() {
if !f.tryUpdateHead(head) {
return
}
fmr = f.getRange()
if fmr.headBlockHash != head.Hash() {
if f.headBlockHash != head.Hash() {
wait()
continue
}
@ -151,8 +147,8 @@ func (f *FilterMaps) updateLoop() {
head = f.chain.CurrentBlock()
}
// stop if there is a new chain head (always prioritize head updates)
return fmr.headBlockHash != head.Hash()
}) && fmr.headBlockHash == head.Hash() {
return f.headBlockHash != head.Hash() || syncMatcher != nil
}) && f.headBlockHash == head.Hash() {
// if tail processing reached its final state and there is no new
// head then wait for more events
wait()
@ -176,14 +172,6 @@ func (f *FilterMaps) WaitIdle() {
}
}
// getRange returns the current filterMapsRange.
func (f *FilterMaps) getRange() filterMapsRange {
f.lock.RLock()
defer f.lock.RUnlock()
return f.filterMapsRange
}
// tryInit attempts to initialize the log index structure.
// Returns false if indexer was stopped during a database reset. In this case the
// indexer should exit and remaining parts of the old database will be removed
@ -215,17 +203,16 @@ func (f *FilterMaps) tryInit(head *types.Header) bool {
// at next startup.
func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
defer func() {
fmr := f.getRange()
if newHead.Hash() == fmr.headBlockHash {
if newHead.Hash() == f.headBlockHash {
if f.loggedHeadUpdate {
log.Info("Forward log indexing finished", "processed", fmr.headBlockNumber-f.ptrHeadUpdate,
log.Info("Forward log indexing finished", "processed", f.headBlockNumber-f.ptrHeadUpdate,
"elapsed", common.PrettyDuration(time.Since(f.lastLogHeadUpdate)))
f.loggedHeadUpdate, f.startHeadUpdate = false, false
}
} else {
if time.Since(f.lastLogHeadUpdate) > logFrequency || !f.loggedHeadUpdate {
log.Info("Forward log indexing in progress", "processed", fmr.headBlockNumber-f.ptrHeadUpdate,
"remaining", newHead.Number.Uint64()-fmr.headBlockNumber,
log.Info("Forward log indexing in progress", "processed", f.headBlockNumber-f.ptrHeadUpdate,
"remaining", newHead.Number.Uint64()-f.headBlockNumber,
"elapsed", common.PrettyDuration(time.Since(f.startedHeadUpdate)))
f.loggedHeadUpdate = true
f.lastLogHeadUpdate = time.Now()
@ -238,7 +225,7 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
f.lastLogHeadUpdate = time.Now()
f.startedHeadUpdate = f.lastLogHeadUpdate
f.startHeadUpdate = true
f.ptrHeadUpdate = f.getRange().headBlockNumber
f.ptrHeadUpdate = f.headBlockNumber
}
// iterate back from new head until the log index head or a revert point and
// collect headers of blocks to be added
@ -321,7 +308,7 @@ func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool
tailTarget = headNum + 1 - f.history
}
}
tailNum := f.getRange().tailBlockNumber
tailNum := f.tailBlockNumber
if tailNum > tailTarget {
if !f.tryExtendTail(tailTarget, stopFn) {
return false
@ -337,23 +324,21 @@ func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool
// indexed history length is achieved. Returns true if finished.
func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
defer func() {
fmr := f.getRange()
if fmr.tailBlockNumber <= tailTarget {
if f.tailBlockNumber <= tailTarget {
if f.loggedTailExtend {
log.Info("Reverse log indexing finished", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber,
"processed", f.ptrTailExtend-fmr.tailBlockNumber, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailExtend)))
log.Info("Reverse log indexing finished", "history", f.headBlockNumber+1-f.tailBlockNumber,
"processed", f.ptrTailExtend-f.tailBlockNumber, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailExtend)))
f.loggedTailExtend = false
}
}
}()
fmr := f.getRange()
number, parentHash := fmr.tailBlockNumber, fmr.tailParentHash
number, parentHash := f.tailBlockNumber, f.tailParentHash
if !f.loggedTailExtend {
f.lastLogTailExtend = time.Now()
f.startedTailExtend = f.lastLogTailExtend
f.ptrTailExtend = fmr.tailBlockNumber
f.ptrTailExtend = f.tailBlockNumber
}
update := f.newUpdateBatch()
@ -400,20 +385,18 @@ func (f *FilterMaps) tryUnindexTail(tailTarget uint64, stopFn func() bool) bool
if !f.loggedTailUnindex {
f.lastLogTailUnindex = time.Now()
f.startedTailUnindex = f.lastLogTailUnindex
f.ptrTailUnindex = f.getRange().tailBlockNumber
f.ptrTailUnindex = f.tailBlockNumber
}
for {
if f.unindexTailEpoch(tailTarget) {
fmr := f.getRange()
log.Info("Log unindexing finished", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber,
"removed", fmr.tailBlockNumber-f.ptrTailUnindex, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailUnindex)))
log.Info("Log unindexing finished", "history", f.headBlockNumber+1-f.tailBlockNumber,
"removed", f.tailBlockNumber-f.ptrTailUnindex, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailUnindex)))
f.loggedTailUnindex = false
return true
}
if time.Since(f.lastLogTailUnindex) > logFrequency || !f.loggedTailUnindex {
fmr := f.getRange()
log.Info("Log unindexing in progress", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber,
"removed", fmr.tailBlockNumber-f.ptrTailUnindex, "remaining", tailTarget-fmr.tailBlockNumber,
log.Info("Log unindexing in progress", "history", f.headBlockNumber+1-f.tailBlockNumber,
"removed", f.tailBlockNumber-f.ptrTailUnindex, "remaining", tailTarget-f.tailBlockNumber,
"elapsed", common.PrettyDuration(time.Since(f.startedTailUnindex)))
f.loggedTailUnindex = true
f.lastLogTailUnindex = time.Now()
@ -427,11 +410,9 @@ func (f *FilterMaps) tryUnindexTail(tailTarget uint64, stopFn func() bool) bool
// unindexTailEpoch unindexes at most an epoch of tail log index data until the
// desired tail target is reached.
func (f *FilterMaps) unindexTailEpoch(tailTarget uint64) (finished bool) {
f.lock.Lock()
oldRange := f.filterMapsRange
newTailMap, changed := f.unindexTailPtr(tailTarget)
newRange := f.filterMapsRange
f.lock.Unlock()
if !changed {
return true // nothing more to do
@ -441,6 +422,7 @@ func (f *FilterMaps) unindexTailEpoch(tailTarget uint64) (finished bool) {
oldTailMap := uint32(oldRange.tailLvPointer >> f.logValuesPerMap)
// remove map data [oldTailMap, newTailMap) and block data
// [oldRange.tailBlockNumber, newRange.tailBlockNumber)
f.indexLock.Lock()
batch := f.db.NewBatch()
for blockNumber := oldRange.tailBlockNumber; blockNumber < newRange.tailBlockNumber; blockNumber++ {
f.deleteBlockLvPointer(batch, blockNumber)
@ -459,14 +441,15 @@ func (f *FilterMaps) unindexTailEpoch(tailTarget uint64) (finished bool) {
newRange.tailLvPointer = uint64(newTailMap) << f.logValuesPerMap
if newRange.tailLvPointer > newRange.tailBlockLvPointer {
log.Error("Cannot unindex filter maps beyond tail block log value pointer", "tailLvPointer", newRange.tailLvPointer, "tailBlockLvPointer", newRange.tailBlockLvPointer)
f.indexLock.Unlock()
return
}
f.lock.Lock()
f.setRange(batch, newRange)
f.indexLock.Unlock()
if err := batch.Write(); err != nil {
log.Crit("Could not write update batch", "error", err)
}
f.lock.Unlock()
return
}
@ -539,9 +522,6 @@ type updateBatch struct {
// newUpdateBatch creates a new updateBatch.
func (f *FilterMaps) newUpdateBatch() *updateBatch {
f.lock.RLock()
defer f.lock.RUnlock()
return &updateBatch{
f: f,
filterMapsRange: f.filterMapsRange,
@ -555,8 +535,7 @@ func (f *FilterMaps) newUpdateBatch() *updateBatch {
// applyUpdateBatch writes creates a batch and writes all changes to the database
// and also updates the in-memory representations of log index data.
func (f *FilterMaps) applyUpdateBatch(u *updateBatch) {
f.lock.Lock()
defer f.lock.Unlock()
f.indexLock.Lock()
batch := f.db.NewBatch()
// write or remove block to log value index pointers
@ -615,6 +594,8 @@ func (f *FilterMaps) applyUpdateBatch(u *updateBatch) {
}
// update filterMapsRange
f.setRange(batch, u.filterMapsRange)
f.indexLock.Unlock()
if err := batch.Write(); err != nil {
log.Crit("Could not write update batch", "error", err)
}
@ -849,9 +830,6 @@ func (u *updateBatch) makeRevertPoint() (*revertPoint, error) {
// number from memory cache or from the database if available. If no such revert
// point is available then it returns no result and no error.
func (f *FilterMaps) getRevertPoint(blockNumber uint64) (*revertPoint, error) {
f.lock.RLock()
defer f.lock.RUnlock()
if blockNumber > f.headBlockNumber {
blockNumber = f.headBlockNumber
}
@ -879,9 +857,6 @@ func (f *FilterMaps) getRevertPoint(blockNumber uint64) (*revertPoint, error) {
// revertTo reverts the log index to the given revert point.
func (f *FilterMaps) revertTo(rp *revertPoint) error {
f.lock.Lock()
defer f.lock.Unlock()
batch := f.db.NewBatch()
afterLastMap := uint32((f.headLvPointer + f.valuesPerMap - 1) >> f.logValuesPerMap)
if rp.mapIndex > afterLastMap {
@ -918,7 +893,10 @@ func (f *FilterMaps) revertTo(rp *revertPoint) error {
newRange.headLvPointer = lvPointer
newRange.headBlockNumber = rp.blockNumber
newRange.headBlockHash = rp.blockHash
f.indexLock.Lock()
f.setRange(batch, newRange)
f.indexLock.Unlock()
if err := batch.Write(); err != nil {
log.Crit("Could not write update batch", "error", err)
}

@ -79,14 +79,13 @@ func TestIndexerRandomRange(t *testing.T) {
ts.chain.setCanonicalChain(forks[fork][:head+1])
}
ts.fm.WaitIdle()
fmr := ts.fm.getRange()
if noHistory {
if fmr.initialized {
if ts.fm.initialized {
t.Fatalf("filterMapsRange initialized while indexing is disabled")
}
continue
}
if !fmr.initialized {
if !ts.fm.initialized {
t.Fatalf("filterMapsRange not initialized while indexing is enabled")
}
var (
@ -99,21 +98,21 @@ func TestIndexerRandomRange(t *testing.T) {
if tail > 0 {
tpHash = forks[fork][tail-1]
}
if fmr.headBlockNumber != uint64(head) || fmr.headBlockHash != forks[fork][head] {
ts.t.Fatalf("Invalid index head (expected #%d %v, got #%d %v)", head, forks[fork][head], fmr.headBlockNumber, fmr.headBlockHash)
if ts.fm.headBlockNumber != uint64(head) || ts.fm.headBlockHash != forks[fork][head] {
ts.t.Fatalf("Invalid index head (expected #%d %v, got #%d %v)", head, forks[fork][head], ts.fm.headBlockNumber, ts.fm.headBlockHash)
}
if fmr.tailBlockNumber != uint64(tail) || fmr.tailParentHash != tpHash {
ts.t.Fatalf("Invalid index head (expected #%d %v, got #%d %v)", tail, tpHash, fmr.tailBlockNumber, fmr.tailParentHash)
if ts.fm.tailBlockNumber != uint64(tail) || ts.fm.tailParentHash != tpHash {
ts.t.Fatalf("Invalid index head (expected #%d %v, got #%d %v)", tail, tpHash, ts.fm.tailBlockNumber, ts.fm.tailParentHash)
}
expLvCount := uint64(head+1-tail) * 50
if tail == 0 {
expLvCount -= 50 // no logs in genesis block
}
if fmr.headLvPointer-fmr.tailBlockLvPointer != expLvCount {
ts.t.Fatalf("Invalid number of log values (expected %d, got %d)", expLvCount, fmr.headLvPointer-fmr.tailBlockLvPointer)
if ts.fm.headLvPointer-ts.fm.tailBlockLvPointer != expLvCount {
ts.t.Fatalf("Invalid number of log values (expected %d, got %d)", expLvCount, ts.fm.headLvPointer-ts.fm.tailBlockLvPointer)
}
if fmr.tailBlockLvPointer-fmr.tailLvPointer >= ts.params.valuesPerMap {
ts.t.Fatalf("Invalid number of leftover tail log values (expected < %d, got %d)", ts.params.valuesPerMap, fmr.tailBlockLvPointer-fmr.tailLvPointer)
if ts.fm.tailBlockLvPointer-ts.fm.tailLvPointer >= ts.params.valuesPerMap {
ts.t.Fatalf("Invalid number of leftover tail log values (expected < %d, got %d)", ts.params.valuesPerMap, ts.fm.tailBlockLvPointer-ts.fm.tailLvPointer)
}
}
}

@ -26,6 +26,7 @@ import (
// FilterMapsMatcherBackend implements MatcherBackend.
type FilterMapsMatcherBackend struct {
f *FilterMaps
// these fields should be accessed under f.matchersLock mutex.
valid bool
firstValid, lastValid uint64
syncCh chan SyncRange
@ -35,8 +36,12 @@ type FilterMapsMatcherBackend struct {
// the active matcher set.
// Note that Close should always be called when the matcher is no longer used.
func (f *FilterMaps) NewMatcherBackend() *FilterMapsMatcherBackend {
f.lock.Lock()
defer f.lock.Unlock()
f.indexLock.RLock()
f.matchersLock.Lock()
defer func() {
f.matchersLock.Unlock()
f.indexLock.RUnlock()
}()
fm := &FilterMapsMatcherBackend{
f: f,
@ -58,8 +63,8 @@ func (fm *FilterMapsMatcherBackend) GetParams() *Params {
// any SyncLogIndex calls are cancelled.
// Close implements MatcherBackend.
func (fm *FilterMapsMatcherBackend) Close() {
fm.f.lock.Lock()
defer fm.f.lock.Unlock()
fm.f.matchersLock.Lock()
defer fm.f.matchersLock.Unlock()
delete(fm.f.matchers, fm)
}
@ -70,7 +75,7 @@ func (fm *FilterMapsMatcherBackend) Close() {
// on write.
// GetFilterMapRow implements MatcherBackend.
func (fm *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error) {
return fm.f.getFilterMapRow(mapIndex, rowIndex)
return fm.f.getFilterMapRowUncached(mapIndex, rowIndex)
}
// GetBlockLvPointer returns the starting log value index where the log values
@ -78,8 +83,8 @@ func (fm *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapInde
// head then the first unoccupied log value index is returned.
// GetBlockLvPointer implements MatcherBackend.
func (fm *FilterMapsMatcherBackend) GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) {
fm.f.lock.RLock()
defer fm.f.lock.RUnlock()
fm.f.indexLock.RLock()
defer fm.f.indexLock.RUnlock()
return fm.f.getBlockLvPointer(blockNumber)
}
@ -94,8 +99,8 @@ func (fm *FilterMapsMatcherBackend) GetBlockLvPointer(ctx context.Context, block
// using SyncLogIndex and re-process certain blocks if necessary.
// GetLogByLvIndex implements MatcherBackend.
func (fm *FilterMapsMatcherBackend) GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) {
fm.f.lock.RLock()
defer fm.f.lock.RUnlock()
fm.f.indexLock.RLock()
defer fm.f.indexLock.RUnlock()
return fm.f.getLogByLvIndex(lvIndex)
}
@ -108,8 +113,12 @@ func (fm *FilterMapsMatcherBackend) GetLogByLvIndex(ctx context.Context, lvIndex
// should be passed as a parameter and the existing log index should be consistent
// with that chain.
func (fm *FilterMapsMatcherBackend) synced(head *types.Header) {
fm.f.lock.Lock()
defer fm.f.lock.Unlock()
fm.f.indexLock.RLock()
fm.f.matchersLock.Lock()
defer func() {
fm.f.matchersLock.Unlock()
fm.f.indexLock.RUnlock()
}()
fm.syncCh <- SyncRange{
Head: head,
@ -143,9 +152,9 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange
}
// add SyncRange return channel, ensuring that
syncCh := make(chan SyncRange, 1)
fm.f.lock.Lock()
fm.f.matchersLock.Lock()
fm.syncCh = syncCh
fm.f.lock.Unlock()
fm.f.matchersLock.Unlock()
select {
case fm.f.matcherSyncCh <- fm:
@ -167,8 +176,11 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange
// valid range with the current indexed range. This function should be called
// whenever a part of the log index has been removed, before adding new blocks
// to it.
// Note that this function assumes that the read lock is being held.
// Note that this function assumes that the index read lock is being held.
func (f *FilterMaps) updateMatchersValidRange() {
f.matchersLock.Lock()
defer f.matchersLock.Unlock()
for fm := range f.matchers {
if !f.initialized {
fm.valid = false

@ -28,11 +28,11 @@ func TestMatcher(t *testing.T) {
ts := newTestSetup(t)
defer ts.close()
ts.chain.addBlocks(1000, 10, 10, 4, true)
ts.chain.addBlocks(100, 10, 10, 4, true)
ts.setHistory(0, false)
ts.fm.WaitIdle()
for i := 0; i < 500; i++ {
for i := 0; i < 5000; i++ {
bhash := ts.chain.canonical[rand.Intn(len(ts.chain.canonical))]
receipts := ts.chain.receipts[bhash]
if len(receipts) == 0 {

Loading…
Cancel
Save