core/filtermaps: trigger undindexing after 1000 blocks

pull/30370/head
Zsolt Felfoldi 2 months ago
parent 3ce3b80bb3
commit e9959dd878
  1. 10
      core/filtermaps/filtermaps.go
  2. 24
      core/filtermaps/indexer.go
  3. 2
      core/filtermaps/indexer_test.go
  4. 2
      eth/backend.go
  5. 2
      eth/filters/filter_system_test.go

@ -53,7 +53,7 @@ type FilterMaps struct {
db ethdb.KeyValueStore db ethdb.KeyValueStore
closeCh chan struct{} closeCh chan struct{}
closeWg sync.WaitGroup closeWg sync.WaitGroup
history uint64 history, unindexLimit uint64
noHistory bool noHistory bool
Params Params
@ -101,9 +101,9 @@ var emptyRow = FilterRow{}
// Note that tailBlockLvPointer points to the earliest log value index belonging // Note that tailBlockLvPointer points to the earliest log value index belonging
// to the tail block while tailLvPointer points to the earliest log value index // to the tail block while tailLvPointer points to the earliest log value index
// added to the corresponding filter map. The latter might point to an earlier // added to the corresponding filter map. The latter might point to an earlier
// index after tail blocks have been pruned because we do not remove tail values // index after tail blocks have been unindexed because we do not remove tail
// one by one, rather delete entire maps when all blocks that had log values in // values one by one, rather delete entire maps when all blocks that had log
// those maps are unindexed. // values in those maps are unindexed.
type filterMapsRange struct { type filterMapsRange struct {
initialized bool initialized bool
headLvPointer, tailLvPointer, tailBlockLvPointer uint64 headLvPointer, tailLvPointer, tailBlockLvPointer uint64
@ -113,7 +113,7 @@ type filterMapsRange struct {
// NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep // NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep
// the structure in sync with the given blockchain. // the structure in sync with the given blockchain.
func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, history uint64, noHistory bool) *FilterMaps { func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, history, unindexLimit uint64, noHistory bool) *FilterMaps {
rs, err := rawdb.ReadFilterMapsRange(db) rs, err := rawdb.ReadFilterMapsRange(db)
if err != nil { if err != nil {
log.Error("Error reading log index range", "error", err) log.Error("Error reading log index range", "error", err)

@ -295,10 +295,10 @@ func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool
return false return false
} }
} }
if tailNum < tailTarget { if tailNum+f.unindexLimit <= tailTarget {
f.pruneTailPtr(tailTarget) f.unindexTailPtr(tailTarget)
} }
return f.tryPruneTailMaps(tailTarget, stopFn) return f.tryUnindexTailMaps(tailTarget, stopFn)
} }
// tryExtendTail attempts to extend the log index backwards until it indexes the // tryExtendTail attempts to extend the log index backwards until it indexes the
@ -335,12 +335,12 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
return number <= tailTarget return number <= tailTarget
} }
// pruneTailPtr updates the tail block number and hash and the corresponding // unindexTailPtr updates the tail block number and hash and the corresponding
// tailBlockLvPointer according to the given tail target block number. // tailBlockLvPointer according to the given tail target block number.
// Note that this function does not remove old index data, only marks it unused // Note that this function does not remove old index data, only marks it unused
// by updating the tail pointers, except for targetLvPointer which is unchanged // by updating the tail pointers, except for targetLvPointer which is unchanged
// as it marks the tail of the log index data stored in the database. // as it marks the tail of the log index data stored in the database.
func (f *FilterMaps) pruneTailPtr(tailTarget uint64) { func (f *FilterMaps) unindexTailPtr(tailTarget uint64) {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
@ -372,9 +372,9 @@ func (f *FilterMaps) pruneTailPtr(tailTarget uint64) {
f.setRange(f.db, fmr) f.setRange(f.db, fmr)
} }
// tryPruneTailMaps removes unused filter maps and corresponding log index // tryUnindexTailMaps removes unused filter maps and corresponding log index
// pointers from the database. This function also updates targetLvPointer. // pointers from the database. This function also updates targetLvPointer.
func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) bool { func (f *FilterMaps) tryUnindexTailMaps(tailTarget uint64, stopFn func() bool) bool {
fmr := f.getRange() fmr := f.getRange()
tailMap := uint32(fmr.tailLvPointer >> f.logValuesPerMap) tailMap := uint32(fmr.tailLvPointer >> f.logValuesPerMap)
targetMap := uint32(fmr.tailBlockLvPointer >> f.logValuesPerMap) targetMap := uint32(fmr.tailBlockLvPointer >> f.logValuesPerMap)
@ -394,11 +394,11 @@ func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) boo
for tailMap < targetMap && !stopFn() { for tailMap < targetMap && !stopFn() {
tailEpoch := tailMap >> f.logMapsPerEpoch tailEpoch := tailMap >> f.logMapsPerEpoch
if tailEpoch == lastEpoch { if tailEpoch == lastEpoch {
f.pruneMaps(tailMap, targetMap, &removeLvPtr) f.unindexMaps(tailMap, targetMap, &removeLvPtr)
break break
} }
nextTailMap := (tailEpoch + 1) << f.logMapsPerEpoch nextTailMap := (tailEpoch + 1) << f.logMapsPerEpoch
f.pruneMaps(tailMap, nextTailMap, &removeLvPtr) f.unindexMaps(tailMap, nextTailMap, &removeLvPtr)
tailMap = nextTailMap tailMap = nextTailMap
if !logged || time.Since(lastLogged) >= time.Second*10 { if !logged || time.Since(lastLogged) >= time.Second*10 {
log.Info("Pruning log index tail...", "filter maps left", targetMap-tailMap) log.Info("Pruning log index tail...", "filter maps left", targetMap-tailMap)
@ -411,9 +411,9 @@ func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) boo
return tailMap >= targetMap return tailMap >= targetMap
} }
// pruneMaps removes filter maps and corresponding log index pointers in the // unindexMaps removes filter maps and corresponding log index pointers in the
// specified range in a single batch. // specified range in a single batch.
func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) { func (f *FilterMaps) unindexMaps(first, afterLast uint32, removeLvPtr *uint64) {
nextBlockNumber, err := f.getMapBlockPtr(afterLast) nextBlockNumber, err := f.getMapBlockPtr(afterLast)
if err != nil { if err != nil {
log.Error("Error fetching next map block pointer", "map index", afterLast, "error", err) log.Error("Error fetching next map block pointer", "map index", afterLast, "error", err)
@ -438,7 +438,7 @@ func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) {
fmr := f.getRange() fmr := f.getRange()
fmr.tailLvPointer = uint64(afterLast) << f.logValuesPerMap fmr.tailLvPointer = uint64(afterLast) << f.logValuesPerMap
if fmr.tailLvPointer > fmr.tailBlockLvPointer { if fmr.tailLvPointer > fmr.tailBlockLvPointer {
log.Error("Cannot prune filter maps beyond tail block log value pointer", "tailLvPointer", fmr.tailLvPointer, "tailBlockLvPointer", fmr.tailBlockLvPointer) log.Error("Cannot unindex filter maps beyond tail block log value pointer", "tailLvPointer", fmr.tailLvPointer, "tailBlockLvPointer", fmr.tailBlockLvPointer)
return return
} }
f.setRange(batch, fmr) f.setRange(batch, fmr)

@ -207,7 +207,7 @@ func (ts *testSetup) setHistory(history uint64, noHistory bool) {
if ts.fm != nil { if ts.fm != nil {
ts.fm.Stop() ts.fm.Stop()
} }
ts.fm = NewFilterMaps(ts.db, ts.chain, ts.params, history, noHistory) ts.fm = NewFilterMaps(ts.db, ts.chain, ts.params, history, 1, noHistory)
ts.fm.Start() ts.fm.Start()
} }

@ -216,7 +216,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain, filtermaps.DefaultParams, config.LogHistory, config.LogNoHistory) eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain, filtermaps.DefaultParams, config.LogHistory, 1000, config.LogNoHistory)
if config.BlobPool.Datadir != "" { if config.BlobPool.Datadir != "" {
config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir) config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)

@ -160,7 +160,7 @@ func (b *testBackend) NewMatcherBackend() filtermaps.MatcherBackend {
} }
func (b *testBackend) startFilterMaps(history uint64, noHistory bool) { func (b *testBackend) startFilterMaps(history uint64, noHistory bool) {
b.fm = filtermaps.NewFilterMaps(b.db, b, filtermaps.DefaultParams, history, noHistory) b.fm = filtermaps.NewFilterMaps(b.db, b, filtermaps.DefaultParams, history, 1, noHistory)
b.fm.Start() b.fm.Start()
b.fm.WaitIdle() b.fm.WaitIdle()
} }

Loading…
Cancel
Save