diff --git a/core/filtermaps/filtermaps.go b/core/filtermaps/filtermaps.go index 90b6e9eb9b..ed6a00c965 100644 --- a/core/filtermaps/filtermaps.go +++ b/core/filtermaps/filtermaps.go @@ -49,18 +49,22 @@ type blockchain interface { // without the tree hashing and consensus changes: // https://eips.ethereum.org/EIPS/eip-7745 type FilterMaps struct { - lock sync.RWMutex - db ethdb.KeyValueStore closeCh chan struct{} closeWg sync.WaitGroup history, unindexLimit uint64 noHistory bool - Params - filterMapsRange chain blockchain matcherSyncCh chan *FilterMapsMatcherBackend - matchers map[*FilterMapsMatcherBackend]struct{} + + // db and range are only modified by indexer under write lock; indexer can + // read them without a lock while matchers can access them under read lock + lock sync.RWMutex + db ethdb.KeyValueStore + filterMapsRange + + matchers map[*FilterMapsMatcherBackend]struct{} + // filterMapCache caches certain filter maps (headCacheSize most recent maps // and one tail map) that are expected to be frequently accessed and modified // while updating the structure. Note that the set of cached maps depends @@ -71,6 +75,11 @@ type FilterMaps struct { lvPointerCache *lru.Cache[uint64, uint64] revertPoints map[uint64]*revertPoint + startHeadUpdate, loggedHeadUpdate, loggedTailExtend, loggedTailUnindex bool + startedHeadUpdate, startedTailExtend, startedTailUnindex time.Time + lastLogHeadUpdate, lastLogTailExtend, lastLogTailUnindex time.Time + ptrHeadUpdate, ptrTailExtend, ptrTailUnindex uint64 + waitIdleCh chan chan bool } @@ -120,13 +129,14 @@ func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, hist } params.deriveFields() fm := &FilterMaps{ - db: db, - chain: chain, - closeCh: make(chan struct{}), - waitIdleCh: make(chan chan bool), - history: history, - noHistory: noHistory, - Params: params, + db: db, + chain: chain, + closeCh: make(chan struct{}), + waitIdleCh: make(chan chan bool), + history: history, + noHistory: noHistory, + unindexLimit: unindexLimit, + Params: params, filterMapsRange: filterMapsRange{ initialized: rs.Initialized, headLvPointer: rs.HeadLvPointer, @@ -151,13 +161,14 @@ func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, hist return fm } +// Start starts the indexer. func (f *FilterMaps) Start() { f.closeWg.Add(2) go f.removeBloomBits() go f.updateLoop() } -// Close ensures that the indexer is fully stopped before returning. +// Stop ensures that the indexer is fully stopped before returning. func (f *FilterMaps) Stop() { close(f.closeCh) f.closeWg.Wait() @@ -172,10 +183,10 @@ func (f *FilterMaps) reset() bool { f.revertPoints = make(map[uint64]*revertPoint) f.blockPtrCache.Purge() f.lvPointerCache.Purge() - f.lock.Unlock() // deleting the range first ensures that resetDb will be called again at next // startup and any leftover data will be removed even if it cannot finish now. rawdb.DeleteFilterMapsRange(f.db) + f.lock.Unlock() return f.removeDbWithPrefix(rawdb.FilterMapsPrefix, "Resetting log index database") } diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index 0b1db011e8..1703c858fe 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -29,10 +29,11 @@ import ( ) const ( - startLvMap = 1 << 31 // map index assigned to init block - removedPointer = math.MaxUint64 // used in updateBatch to signal removed items - revertPointFrequency = 256 // frequency of revert points in database - cachedRevertPoints = 64 // revert points for most recent blocks in memory + startLvMap = 1 << 31 // map index assigned to init block + removedPointer = math.MaxUint64 // used in updateBatch to signal removed items + revertPointFrequency = 256 // frequency of revert points in database + cachedRevertPoints = 64 // revert points for most recent blocks in memory + logFrequency = time.Second * 8 // log info frequency during long indexing/unindexing process ) // updateLoop initializes and updates the log index structure according to the @@ -44,7 +45,10 @@ func (f *FilterMaps) updateLoop() { f.reset() return } + + f.lock.Lock() f.updateMapCache() + f.lock.Unlock() if rp, err := f.newUpdateBatch().makeRevertPoint(); err == nil { f.revertPoints[rp.blockNumber] = rp } else { @@ -198,6 +202,7 @@ func (f *FilterMaps) tryInit(head *types.Header) bool { log.Error("Could not initialize log index", "error", err) } f.applyUpdateBatch(update) + log.Info("Initialized log index", "head", head.Number.Uint64()) return true } @@ -209,6 +214,32 @@ func (f *FilterMaps) tryInit(head *types.Header) bool { // indexer should exit and remaining parts of the old database will be removed // at next startup. func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool { + defer func() { + fmr := f.getRange() + if newHead.Hash() == fmr.headBlockHash { + if f.loggedHeadUpdate { + log.Info("Forward log indexing finished", "processed", fmr.headBlockNumber-f.ptrHeadUpdate, + "elapsed", common.PrettyDuration(time.Since(f.lastLogHeadUpdate))) + f.loggedHeadUpdate, f.startHeadUpdate = false, false + } + } else { + if time.Since(f.lastLogHeadUpdate) > logFrequency || !f.loggedHeadUpdate { + log.Info("Forward log indexing in progress", "processed", fmr.headBlockNumber-f.ptrHeadUpdate, + "remaining", newHead.Number.Uint64()-fmr.headBlockNumber, + "elapsed", common.PrettyDuration(time.Since(f.startedHeadUpdate))) + f.loggedHeadUpdate = true + f.lastLogHeadUpdate = time.Now() + } + } + + }() + + if !f.startHeadUpdate { + f.lastLogHeadUpdate = time.Now() + f.startedHeadUpdate = f.lastLogHeadUpdate + f.startHeadUpdate = true + f.ptrHeadUpdate = f.getRange().headBlockNumber + } // iterate back from new head until the log index head or a revert point and // collect headers of blocks to be added var ( @@ -305,14 +336,41 @@ func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool // tryExtendTail attempts to extend the log index backwards until the desired // indexed history length is achieved. Returns true if finished. func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool { + defer func() { + fmr := f.getRange() + if fmr.tailBlockNumber <= tailTarget { + if f.loggedTailExtend { + log.Info("Reverse log indexing finished", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber, + "processed", f.ptrTailExtend-fmr.tailBlockNumber, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailExtend))) + f.loggedTailExtend = false + } + } + }() + fmr := f.getRange() number, parentHash := fmr.tailBlockNumber, fmr.tailParentHash + + if !f.loggedTailExtend { + f.lastLogTailExtend = time.Now() + f.startedTailExtend = f.lastLogTailExtend + f.ptrTailExtend = fmr.tailBlockNumber + } + update := f.newUpdateBatch() lastTailEpoch := update.tailEpoch() for number > tailTarget && !stopFn() { if tailEpoch := update.tailEpoch(); tailEpoch < lastTailEpoch { // limit the amount of data updated in a single batch f.applyUpdateBatch(update) + + if time.Since(f.lastLogTailExtend) > logFrequency || !f.loggedTailExtend { + log.Info("Reverse log indexing in progress", "history", update.headBlockNumber+1-update.tailBlockNumber, + "processed", f.ptrTailExtend-update.tailBlockNumber, "remaining", update.tailBlockNumber-tailTarget, + "elapsed", common.PrettyDuration(time.Since(f.startedTailExtend))) + f.loggedTailExtend = true + f.lastLogTailExtend = time.Now() + } + update = f.newUpdateBatch() lastTailEpoch = tailEpoch } @@ -339,10 +397,27 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool { // tryUnindexTail attempts to prune the log index tail until the desired indexed // history length is achieved. Returns true if finished. func (f *FilterMaps) tryUnindexTail(tailTarget uint64, stopFn func() bool) bool { + if !f.loggedTailUnindex { + f.lastLogTailUnindex = time.Now() + f.startedTailUnindex = f.lastLogTailUnindex + f.ptrTailUnindex = f.getRange().tailBlockNumber + } for { if f.unindexTailEpoch(tailTarget) { + fmr := f.getRange() + log.Info("Log unindexing finished", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber, + "removed", fmr.tailBlockNumber-f.ptrTailUnindex, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailUnindex))) + f.loggedTailUnindex = false return true } + if time.Since(f.lastLogTailUnindex) > logFrequency || !f.loggedTailUnindex { + fmr := f.getRange() + log.Info("Log unindexing in progress", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber, + "removed", fmr.tailBlockNumber-f.ptrTailUnindex, "remaining", tailTarget-fmr.tailBlockNumber, + "elapsed", common.PrettyDuration(time.Since(f.startedTailUnindex))) + f.loggedTailUnindex = true + f.lastLogTailUnindex = time.Now() + } if stopFn() { return false } @@ -402,6 +477,7 @@ func (f *FilterMaps) unindexTailEpoch(tailTarget uint64) (finished bool) { // by updating the tail pointers, except for targetLvPointer which is not changed // yet as it marks the tail of the log index data stored in the database and // therefore should be updated when map data is actually removed. +// Note that this function assumes that the read/write lock is being held. func (f *FilterMaps) unindexTailPtr(tailTarget uint64) (newTailMap uint32, changed bool) { // obtain target log value pointer if tailTarget <= f.tailBlockNumber || tailTarget > f.headBlockNumber { @@ -542,7 +618,6 @@ func (f *FilterMaps) applyUpdateBatch(u *updateBatch) { if err := batch.Write(); err != nil { log.Crit("Could not write update batch", "error", err) } - log.Info("Log index block range updated", "tail", u.tailBlockNumber, "head", u.headBlockNumber, "log values", u.headLvPointer-u.tailBlockLvPointer) } // updatedRangeLength returns the lenght of the updated filter map range. diff --git a/core/filtermaps/matcher_backend.go b/core/filtermaps/matcher_backend.go index 0bc87e1e93..37a51eac48 100644 --- a/core/filtermaps/matcher_backend.go +++ b/core/filtermaps/matcher_backend.go @@ -167,6 +167,7 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange // valid range with the current indexed range. This function should be called // whenever a part of the log index has been removed, before adding new blocks // to it. +// Note that this function assumes that the read lock is being held. func (f *FilterMaps) updateMatchersValidRange() { for fm := range f.matchers { if !f.initialized {