core/filtermaps: nice log info during indexing/unindexing

pull/30370/head
Zsolt Felfoldi 2 months ago
parent e907bc21c5
commit 989f2c2a3d
  1. 39
      core/filtermaps/filtermaps.go
  2. 85
      core/filtermaps/indexer.go
  3. 1
      core/filtermaps/matcher_backend.go

@ -49,18 +49,22 @@ type blockchain interface {
// without the tree hashing and consensus changes: // without the tree hashing and consensus changes:
// https://eips.ethereum.org/EIPS/eip-7745 // https://eips.ethereum.org/EIPS/eip-7745
type FilterMaps struct { type FilterMaps struct {
lock sync.RWMutex
db ethdb.KeyValueStore
closeCh chan struct{} closeCh chan struct{}
closeWg sync.WaitGroup closeWg sync.WaitGroup
history, unindexLimit uint64 history, unindexLimit uint64
noHistory bool noHistory bool
Params Params
filterMapsRange
chain blockchain chain blockchain
matcherSyncCh chan *FilterMapsMatcherBackend matcherSyncCh chan *FilterMapsMatcherBackend
matchers map[*FilterMapsMatcherBackend]struct{}
// db and range are only modified by indexer under write lock; indexer can
// read them without a lock while matchers can access them under read lock
lock sync.RWMutex
db ethdb.KeyValueStore
filterMapsRange
matchers map[*FilterMapsMatcherBackend]struct{}
// filterMapCache caches certain filter maps (headCacheSize most recent maps // filterMapCache caches certain filter maps (headCacheSize most recent maps
// and one tail map) that are expected to be frequently accessed and modified // and one tail map) that are expected to be frequently accessed and modified
// while updating the structure. Note that the set of cached maps depends // while updating the structure. Note that the set of cached maps depends
@ -71,6 +75,11 @@ type FilterMaps struct {
lvPointerCache *lru.Cache[uint64, uint64] lvPointerCache *lru.Cache[uint64, uint64]
revertPoints map[uint64]*revertPoint revertPoints map[uint64]*revertPoint
startHeadUpdate, loggedHeadUpdate, loggedTailExtend, loggedTailUnindex bool
startedHeadUpdate, startedTailExtend, startedTailUnindex time.Time
lastLogHeadUpdate, lastLogTailExtend, lastLogTailUnindex time.Time
ptrHeadUpdate, ptrTailExtend, ptrTailUnindex uint64
waitIdleCh chan chan bool waitIdleCh chan chan bool
} }
@ -120,13 +129,14 @@ func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, hist
} }
params.deriveFields() params.deriveFields()
fm := &FilterMaps{ fm := &FilterMaps{
db: db, db: db,
chain: chain, chain: chain,
closeCh: make(chan struct{}), closeCh: make(chan struct{}),
waitIdleCh: make(chan chan bool), waitIdleCh: make(chan chan bool),
history: history, history: history,
noHistory: noHistory, noHistory: noHistory,
Params: params, unindexLimit: unindexLimit,
Params: params,
filterMapsRange: filterMapsRange{ filterMapsRange: filterMapsRange{
initialized: rs.Initialized, initialized: rs.Initialized,
headLvPointer: rs.HeadLvPointer, headLvPointer: rs.HeadLvPointer,
@ -151,13 +161,14 @@ func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, hist
return fm return fm
} }
// Start starts the indexer.
func (f *FilterMaps) Start() { func (f *FilterMaps) Start() {
f.closeWg.Add(2) f.closeWg.Add(2)
go f.removeBloomBits() go f.removeBloomBits()
go f.updateLoop() go f.updateLoop()
} }
// Close ensures that the indexer is fully stopped before returning. // Stop ensures that the indexer is fully stopped before returning.
func (f *FilterMaps) Stop() { func (f *FilterMaps) Stop() {
close(f.closeCh) close(f.closeCh)
f.closeWg.Wait() f.closeWg.Wait()
@ -172,10 +183,10 @@ func (f *FilterMaps) reset() bool {
f.revertPoints = make(map[uint64]*revertPoint) f.revertPoints = make(map[uint64]*revertPoint)
f.blockPtrCache.Purge() f.blockPtrCache.Purge()
f.lvPointerCache.Purge() f.lvPointerCache.Purge()
f.lock.Unlock()
// deleting the range first ensures that resetDb will be called again at next // deleting the range first ensures that resetDb will be called again at next
// startup and any leftover data will be removed even if it cannot finish now. // startup and any leftover data will be removed even if it cannot finish now.
rawdb.DeleteFilterMapsRange(f.db) rawdb.DeleteFilterMapsRange(f.db)
f.lock.Unlock()
return f.removeDbWithPrefix(rawdb.FilterMapsPrefix, "Resetting log index database") return f.removeDbWithPrefix(rawdb.FilterMapsPrefix, "Resetting log index database")
} }

@ -29,10 +29,11 @@ import (
) )
const ( const (
startLvMap = 1 << 31 // map index assigned to init block startLvMap = 1 << 31 // map index assigned to init block
removedPointer = math.MaxUint64 // used in updateBatch to signal removed items removedPointer = math.MaxUint64 // used in updateBatch to signal removed items
revertPointFrequency = 256 // frequency of revert points in database revertPointFrequency = 256 // frequency of revert points in database
cachedRevertPoints = 64 // revert points for most recent blocks in memory cachedRevertPoints = 64 // revert points for most recent blocks in memory
logFrequency = time.Second * 8 // log info frequency during long indexing/unindexing process
) )
// updateLoop initializes and updates the log index structure according to the // updateLoop initializes and updates the log index structure according to the
@ -44,7 +45,10 @@ func (f *FilterMaps) updateLoop() {
f.reset() f.reset()
return return
} }
f.lock.Lock()
f.updateMapCache() f.updateMapCache()
f.lock.Unlock()
if rp, err := f.newUpdateBatch().makeRevertPoint(); err == nil { if rp, err := f.newUpdateBatch().makeRevertPoint(); err == nil {
f.revertPoints[rp.blockNumber] = rp f.revertPoints[rp.blockNumber] = rp
} else { } else {
@ -198,6 +202,7 @@ func (f *FilterMaps) tryInit(head *types.Header) bool {
log.Error("Could not initialize log index", "error", err) log.Error("Could not initialize log index", "error", err)
} }
f.applyUpdateBatch(update) f.applyUpdateBatch(update)
log.Info("Initialized log index", "head", head.Number.Uint64())
return true return true
} }
@ -209,6 +214,32 @@ func (f *FilterMaps) tryInit(head *types.Header) bool {
// indexer should exit and remaining parts of the old database will be removed // indexer should exit and remaining parts of the old database will be removed
// at next startup. // at next startup.
func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool { func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
defer func() {
fmr := f.getRange()
if newHead.Hash() == fmr.headBlockHash {
if f.loggedHeadUpdate {
log.Info("Forward log indexing finished", "processed", fmr.headBlockNumber-f.ptrHeadUpdate,
"elapsed", common.PrettyDuration(time.Since(f.lastLogHeadUpdate)))
f.loggedHeadUpdate, f.startHeadUpdate = false, false
}
} else {
if time.Since(f.lastLogHeadUpdate) > logFrequency || !f.loggedHeadUpdate {
log.Info("Forward log indexing in progress", "processed", fmr.headBlockNumber-f.ptrHeadUpdate,
"remaining", newHead.Number.Uint64()-fmr.headBlockNumber,
"elapsed", common.PrettyDuration(time.Since(f.startedHeadUpdate)))
f.loggedHeadUpdate = true
f.lastLogHeadUpdate = time.Now()
}
}
}()
if !f.startHeadUpdate {
f.lastLogHeadUpdate = time.Now()
f.startedHeadUpdate = f.lastLogHeadUpdate
f.startHeadUpdate = true
f.ptrHeadUpdate = f.getRange().headBlockNumber
}
// iterate back from new head until the log index head or a revert point and // iterate back from new head until the log index head or a revert point and
// collect headers of blocks to be added // collect headers of blocks to be added
var ( var (
@ -305,14 +336,41 @@ func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool
// tryExtendTail attempts to extend the log index backwards until the desired // tryExtendTail attempts to extend the log index backwards until the desired
// indexed history length is achieved. Returns true if finished. // indexed history length is achieved. Returns true if finished.
func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool { func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
defer func() {
fmr := f.getRange()
if fmr.tailBlockNumber <= tailTarget {
if f.loggedTailExtend {
log.Info("Reverse log indexing finished", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber,
"processed", f.ptrTailExtend-fmr.tailBlockNumber, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailExtend)))
f.loggedTailExtend = false
}
}
}()
fmr := f.getRange() fmr := f.getRange()
number, parentHash := fmr.tailBlockNumber, fmr.tailParentHash number, parentHash := fmr.tailBlockNumber, fmr.tailParentHash
if !f.loggedTailExtend {
f.lastLogTailExtend = time.Now()
f.startedTailExtend = f.lastLogTailExtend
f.ptrTailExtend = fmr.tailBlockNumber
}
update := f.newUpdateBatch() update := f.newUpdateBatch()
lastTailEpoch := update.tailEpoch() lastTailEpoch := update.tailEpoch()
for number > tailTarget && !stopFn() { for number > tailTarget && !stopFn() {
if tailEpoch := update.tailEpoch(); tailEpoch < lastTailEpoch { if tailEpoch := update.tailEpoch(); tailEpoch < lastTailEpoch {
// limit the amount of data updated in a single batch // limit the amount of data updated in a single batch
f.applyUpdateBatch(update) f.applyUpdateBatch(update)
if time.Since(f.lastLogTailExtend) > logFrequency || !f.loggedTailExtend {
log.Info("Reverse log indexing in progress", "history", update.headBlockNumber+1-update.tailBlockNumber,
"processed", f.ptrTailExtend-update.tailBlockNumber, "remaining", update.tailBlockNumber-tailTarget,
"elapsed", common.PrettyDuration(time.Since(f.startedTailExtend)))
f.loggedTailExtend = true
f.lastLogTailExtend = time.Now()
}
update = f.newUpdateBatch() update = f.newUpdateBatch()
lastTailEpoch = tailEpoch lastTailEpoch = tailEpoch
} }
@ -339,10 +397,27 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
// tryUnindexTail attempts to prune the log index tail until the desired indexed // tryUnindexTail attempts to prune the log index tail until the desired indexed
// history length is achieved. Returns true if finished. // history length is achieved. Returns true if finished.
func (f *FilterMaps) tryUnindexTail(tailTarget uint64, stopFn func() bool) bool { func (f *FilterMaps) tryUnindexTail(tailTarget uint64, stopFn func() bool) bool {
if !f.loggedTailUnindex {
f.lastLogTailUnindex = time.Now()
f.startedTailUnindex = f.lastLogTailUnindex
f.ptrTailUnindex = f.getRange().tailBlockNumber
}
for { for {
if f.unindexTailEpoch(tailTarget) { if f.unindexTailEpoch(tailTarget) {
fmr := f.getRange()
log.Info("Log unindexing finished", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber,
"removed", fmr.tailBlockNumber-f.ptrTailUnindex, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailUnindex)))
f.loggedTailUnindex = false
return true return true
} }
if time.Since(f.lastLogTailUnindex) > logFrequency || !f.loggedTailUnindex {
fmr := f.getRange()
log.Info("Log unindexing in progress", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber,
"removed", fmr.tailBlockNumber-f.ptrTailUnindex, "remaining", tailTarget-fmr.tailBlockNumber,
"elapsed", common.PrettyDuration(time.Since(f.startedTailUnindex)))
f.loggedTailUnindex = true
f.lastLogTailUnindex = time.Now()
}
if stopFn() { if stopFn() {
return false return false
} }
@ -402,6 +477,7 @@ func (f *FilterMaps) unindexTailEpoch(tailTarget uint64) (finished bool) {
// by updating the tail pointers, except for targetLvPointer which is not changed // by updating the tail pointers, except for targetLvPointer which is not changed
// yet as it marks the tail of the log index data stored in the database and // yet as it marks the tail of the log index data stored in the database and
// therefore should be updated when map data is actually removed. // therefore should be updated when map data is actually removed.
// Note that this function assumes that the read/write lock is being held.
func (f *FilterMaps) unindexTailPtr(tailTarget uint64) (newTailMap uint32, changed bool) { func (f *FilterMaps) unindexTailPtr(tailTarget uint64) (newTailMap uint32, changed bool) {
// obtain target log value pointer // obtain target log value pointer
if tailTarget <= f.tailBlockNumber || tailTarget > f.headBlockNumber { if tailTarget <= f.tailBlockNumber || tailTarget > f.headBlockNumber {
@ -542,7 +618,6 @@ func (f *FilterMaps) applyUpdateBatch(u *updateBatch) {
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
log.Crit("Could not write update batch", "error", err) log.Crit("Could not write update batch", "error", err)
} }
log.Info("Log index block range updated", "tail", u.tailBlockNumber, "head", u.headBlockNumber, "log values", u.headLvPointer-u.tailBlockLvPointer)
} }
// updatedRangeLength returns the lenght of the updated filter map range. // updatedRangeLength returns the lenght of the updated filter map range.

@ -167,6 +167,7 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange
// valid range with the current indexed range. This function should be called // valid range with the current indexed range. This function should be called
// whenever a part of the log index has been removed, before adding new blocks // whenever a part of the log index has been removed, before adding new blocks
// to it. // to it.
// Note that this function assumes that the read lock is being held.
func (f *FilterMaps) updateMatchersValidRange() { func (f *FilterMaps) updateMatchersValidRange() {
for fm := range f.matchers { for fm := range f.matchers {
if !f.initialized { if !f.initialized {

Loading…
Cancel
Save