From e82c9994c1e76bd170e29a3fd8a7bf1b00629578 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Fri, 27 Sep 2024 22:35:28 +0200 Subject: [PATCH] core/filtermaps: added history.logs parameter --- cmd/geth/chaincmd.go | 2 + cmd/geth/main.go | 2 + cmd/utils/flags.go | 17 ++++ core/filtermaps/filtermaps.go | 49 +++++++--- core/filtermaps/indexer.go | 168 ++++++++++++++++++++++++++++++---- core/rawdb/schema.go | 2 +- eth/backend.go | 2 +- eth/ethconfig/config.go | 3 + 8 files changed, 212 insertions(+), 33 deletions(-) diff --git a/cmd/geth/chaincmd.go b/cmd/geth/chaincmd.go index d85e4a83c8..1849692a74 100644 --- a/cmd/geth/chaincmd.go +++ b/cmd/geth/chaincmd.go @@ -101,6 +101,8 @@ if one is set. Otherwise it prints the genesis from the datadir.`, utils.VMTraceFlag, utils.VMTraceJsonConfigFlag, utils.TransactionHistoryFlag, + utils.LogHistoryFlag, + utils.LogNoHistoryFlag, utils.StateHistoryFlag, }, utils.DatabaseFlags), Description: ` diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 2675a61675..ef0d1ed2e4 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -88,6 +88,8 @@ var ( utils.SnapshotFlag, utils.TxLookupLimitFlag, // deprecated utils.TransactionHistoryFlag, + utils.LogHistoryFlag, + utils.LogNoHistoryFlag, utils.StateHistoryFlag, utils.LightServeFlag, // deprecated utils.LightIngressFlag, // deprecated diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index e98d5cc8e3..3ba90fda80 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -279,6 +279,17 @@ var ( Value: ethconfig.Defaults.TransactionHistory, Category: flags.StateCategory, } + LogHistoryFlag = &cli.Uint64Flag{ + Name: "history.logs", + Usage: "Number of recent blocks to maintain log search index for (default = about one year, 0 = entire chain)", + Value: ethconfig.Defaults.LogHistory, + Category: flags.StateCategory, + } + LogNoHistoryFlag = &cli.BoolFlag{ + Name: "history.logs.disable", + Usage: "Do not maintain log search index", + Category: flags.StateCategory, + } // Beacon client light sync settings BeaconApiFlag = &cli.StringSliceFlag{ Name: "beacon.api", @@ -1728,6 +1739,12 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { cfg.StateScheme = rawdb.HashScheme log.Warn("Forcing hash state-scheme for archive mode") } + if ctx.IsSet(LogHistoryFlag.Name) { + cfg.LogHistory = ctx.Uint64(LogHistoryFlag.Name) + } + if ctx.IsSet(LogNoHistoryFlag.Name) { + cfg.LogNoHistory = true + } if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheTrieFlag.Name) { cfg.TrieCleanCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheTrieFlag.Name) / 100 } diff --git a/core/filtermaps/filtermaps.go b/core/filtermaps/filtermaps.go index 3b19cc2cd2..3ba2856e29 100644 --- a/core/filtermaps/filtermaps.go +++ b/core/filtermaps/filtermaps.go @@ -44,10 +44,13 @@ type blockchain interface { // without the tree hashing and consensus changes: // https://eips.ethereum.org/EIPS/eip-7745 type FilterMaps struct { - lock sync.RWMutex - db ethdb.Database - closeCh chan struct{} - closeWg sync.WaitGroup + lock sync.RWMutex + db ethdb.Database + closeCh chan struct{} + closeWg sync.WaitGroup + history uint64 + noHistory bool + filterMapsRange chain blockchain matcherSyncCh chan *FilterMapsMatcherBackend @@ -87,24 +90,32 @@ var emptyRow = FilterRow{} // filterMapsRange describes the block range that has been indexed and the log // value index range it has been mapped to. +// Note that tailBlockLvPointer points to the earliest log value index belonging +// to the tail block while tailLvPointer points to the earliest log value index +// added to the corresponding filter map. The latter might point to an earlier +// index after tail blocks have been pruned because we do not remove tail values +// one by one, rather delete entire maps when all blocks that had log values in +// those maps are unindexed. type filterMapsRange struct { - initialized bool - headLvPointer, tailLvPointer uint64 - headBlockNumber, tailBlockNumber uint64 - headBlockHash, tailParentHash common.Hash + initialized bool + headLvPointer, tailLvPointer, tailBlockLvPointer uint64 + headBlockNumber, tailBlockNumber uint64 + headBlockHash, tailParentHash common.Hash } // NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep // the structure in sync with the given blockchain. -func NewFilterMaps(db ethdb.Database, chain blockchain) *FilterMaps { +func NewFilterMaps(db ethdb.Database, chain blockchain, history uint64, noHistory bool) *FilterMaps { rs, err := rawdb.ReadFilterMapsRange(db) if err != nil { log.Error("Error reading log index range", "error", err) } fm := &FilterMaps{ - db: db, - chain: chain, - closeCh: make(chan struct{}), + db: db, + chain: chain, + closeCh: make(chan struct{}), + history: history, + noHistory: noHistory, filterMapsRange: filterMapsRange{ initialized: rs.Initialized, headLvPointer: rs.HeadLvPointer, @@ -121,6 +132,11 @@ func NewFilterMaps(db ethdb.Database, chain blockchain) *FilterMaps { lvPointerCache: lru.NewCache[uint64, uint64](1000), revertPoints: make(map[uint64]*revertPoint), } + fm.tailBlockLvPointer, err = fm.getBlockLvPointer(fm.tailBlockNumber) + if err != nil { + log.Error("Error fetching tail block pointer, resetting log index", "error", err) + fm.filterMapsRange = filterMapsRange{} // updateLoop resets the database + } fm.closeWg.Add(2) go fm.removeBloomBits() go fm.updateLoop() @@ -200,7 +216,7 @@ func (f *FilterMaps) removeDbWithPrefix(prefix []byte, action string) bool { // setRange updates the covered range and also adds the changes to the given batch. // Note that this function assumes that the read/write lock is being held. -func (f *FilterMaps) setRange(batch ethdb.Batch, newRange filterMapsRange) { +func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newRange filterMapsRange) { f.filterMapsRange = newRange rs := rawdb.FilterMapsRange{ Initialized: newRange.initialized, @@ -227,7 +243,7 @@ func (f *FilterMaps) updateMapCache() { defer f.filterMapLock.Unlock() newFilterMapCache := make(map[uint32]*filterMap) - firstMap, afterLastMap := uint32(f.tailLvPointer>>logValuesPerMap), uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap) + firstMap, afterLastMap := uint32(f.tailBlockLvPointer>>logValuesPerMap), uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap) headCacheFirst := firstMap + 1 if afterLastMap > headCacheFirst+headCacheSize { headCacheFirst = afterLastMap - headCacheSize @@ -255,7 +271,7 @@ func (f *FilterMaps) updateMapCache() { // If this is not the case then an invalid result or an error may be returned. // Note that this function assumes that the read lock is being held. func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) { - if lvIndex < f.tailLvPointer || lvIndex > f.headLvPointer { + if lvIndex < f.tailBlockLvPointer || lvIndex > f.headLvPointer { return nil, nil } // find possible block range based on map to block pointers @@ -264,6 +280,9 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) { if err != nil { return nil, err } + if firstBlockNumber < f.tailBlockNumber { + firstBlockNumber = f.tailBlockNumber + } var lastBlockNumber uint64 if mapIndex+1 < uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap) { lastBlockNumber, err = f.getMapBlockPtr(mapIndex + 1) diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index 9c2235a03f..d29debe135 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -23,6 +23,11 @@ const ( // canonical chain. func (f *FilterMaps) updateLoop() { defer f.closeWg.Done() + + if f.noHistory { + f.reset() + return + } f.updateMapCache() if rp, err := f.newUpdateBatch().makeRevertPoint(); err == nil { f.revertPoints[rp.blockNumber] = rp @@ -106,7 +111,7 @@ func (f *FilterMaps) updateLoop() { syncMatcher = nil } // log index head is at latest chain head; process tail blocks if possible - f.tryExtendTail(func() bool { + f.tryUpdateTail(head, func() bool { // return true if tail processing needs to be stopped select { case ev := <-headEventCh: @@ -236,19 +241,35 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool { return true } -// tryExtendTail attempts to extend the log index backwards until it indexes the -// genesis block or cannot find more block receipts. Since this is a long process, -// stopFn is called after adding each tail block and if it returns true, the +// tryUpdateTail attempts to extend or prune the log index according to the +// current head block number and the log history settings. +// stopFn is called regularly during the process, and if it returns true, the // latest batch is written and the function returns. -func (f *FilterMaps) tryExtendTail(stopFn func() bool) { +func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) { + var tailTarget uint64 + if f.history > 0 { + if headNum := head.Number.Uint64(); headNum >= f.history { + tailTarget = headNum + 1 - f.history + } + } + tailNum := f.getRange().tailBlockNumber + if tailNum > tailTarget { + f.tryExtendTail(tailTarget, stopFn) + } + if tailNum < tailTarget { + f.pruneTailPtr(tailTarget) + f.tryPruneTailMaps(tailTarget, stopFn) + } +} + +// tryExtendTail attempts to extend the log index backwards until it indexes the +// tail target block or cannot find more block receipts. +func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) { fmr := f.getRange() number, parentHash := fmr.tailBlockNumber, fmr.tailParentHash - if number == 0 { - return - } update := f.newUpdateBatch() lastTailEpoch := update.tailEpoch() - for number > 0 && !stopFn() { + for number > tailTarget && !stopFn() { if tailEpoch := update.tailEpoch(); tailEpoch < lastTailEpoch { // limit the amount of data updated in a single batch f.applyUpdateBatch(update) @@ -274,6 +295,114 @@ func (f *FilterMaps) tryExtendTail(stopFn func() bool) { f.applyUpdateBatch(update) } +// pruneTailPtr updates the tail block number and hash and the corresponding +// tailBlockLvPointer according to the given tail target block number. +// Note that this function does not remove old index data, only marks it unused +// by updating the tail pointers, except for targetLvPointer which is unchanged +// as it marks the tail of the log index data stored in the database. +func (f *FilterMaps) pruneTailPtr(tailTarget uint64) { + f.lock.Lock() + defer f.lock.Unlock() + + // obtain target log value pointer + if tailTarget <= f.tailBlockNumber || tailTarget > f.headBlockNumber { + return // nothing to do + } + targetLvPointer, err := f.getBlockLvPointer(tailTarget) + fmr := f.filterMapsRange + + if err != nil { + log.Error("Error fetching tail target log value pointer", "block number", tailTarget, "error", err) + } + + // obtain tail target's parent hash + var tailParentHash common.Hash + if tailTarget > 0 { + if f.chain.GetCanonicalHash(fmr.headBlockNumber) != fmr.headBlockHash { + return // if a reorg is happening right now then try again later + } + tailParentHash = f.chain.GetCanonicalHash(tailTarget - 1) + if f.chain.GetCanonicalHash(fmr.headBlockNumber) != fmr.headBlockHash { + return // check again to make sure that tailParentHash is consistent with the indexed chain + } + } + + fmr.tailBlockNumber, fmr.tailParentHash = tailTarget, tailParentHash + fmr.tailBlockLvPointer = targetLvPointer + f.setRange(f.db, fmr) +} + +// tryPruneTailMaps removes unused filter maps and corresponding log index +// pointers from the database. This function also updates targetLvPointer. +func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) { + fmr := f.getRange() + tailMap := uint32(fmr.tailLvPointer >> logValuesPerMap) + targetMap := uint32(fmr.tailBlockLvPointer >> logValuesPerMap) + if tailMap >= targetMap { + return + } + lastEpoch := (targetMap - 1) >> logMapsPerEpoch + removeLvPtr, err := f.getMapBlockPtr(tailMap) + if err != nil { + log.Error("Error fetching tail map block pointer", "map index", tailMap, "error", err) + removeLvPtr = math.MaxUint64 // do not remove anything + } + var ( + logged bool + lastLogged time.Time + ) + for tailMap < targetMap && !stopFn() { + tailEpoch := tailMap >> logMapsPerEpoch + if tailEpoch == lastEpoch { + f.pruneMaps(tailMap, targetMap, &removeLvPtr) + break + } + nextTailMap := (tailEpoch + 1) << logMapsPerEpoch + f.pruneMaps(tailMap, nextTailMap, &removeLvPtr) + tailMap = nextTailMap + if !logged || time.Since(lastLogged) >= time.Second*10 { + log.Info("Pruning log index tail...", "filter maps left", targetMap-tailMap) + logged, lastLogged = true, time.Now() + } + } + if logged { + log.Info("Finished pruning log index tail", "filter maps left", targetMap-tailMap) + } +} + +// pruneMaps removes filter maps and corresponding log index pointers in the +// specified range in a single batch. +func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) { + nextBlockNumber, err := f.getMapBlockPtr(afterLast) + if err != nil { + log.Error("Error fetching next map block pointer", "map index", afterLast, "error", err) + nextBlockNumber = 0 // do not remove anything + } + batch := f.db.NewBatch() + for *removeLvPtr < nextBlockNumber { + f.deleteBlockLvPointer(batch, *removeLvPtr) + (*removeLvPtr)++ + } + for mapIndex := first; mapIndex < afterLast; mapIndex++ { + f.deleteMapBlockPtr(batch, mapIndex) + } + for rowIndex := uint32(0); rowIndex < mapHeight; rowIndex++ { + for mapIndex := first; mapIndex < afterLast; mapIndex++ { + f.storeFilterMapRow(batch, mapIndex, rowIndex, emptyRow) + } + } + fmr := f.getRange() + fmr.tailLvPointer = uint64(afterLast) << logValuesPerMap + if fmr.tailLvPointer > fmr.tailBlockLvPointer { + log.Error("Cannot prune filter maps beyond tail block log value pointer", "tailLvPointer", fmr.tailLvPointer, "tailBlockLvPointer", fmr.tailBlockLvPointer) + return + } + f.setRange(batch, fmr) + if err := batch.Write(); err != nil { + log.Crit("Could not write update batch", "error", err) + } +} + // updateBatch is a memory overlay collecting changes to the index log structure // that can be written to the database in a single batch while the in-memory // representations in FilterMaps are also updated. @@ -368,7 +497,7 @@ func (f *FilterMaps) applyUpdateBatch(u *updateBatch) { if err := batch.Write(); err != nil { log.Crit("Could not write update batch", "error", err) } - log.Info("Log index block range updated", "tail", u.tailBlockNumber, "head", u.headBlockNumber, "log values", u.headLvPointer-u.tailLvPointer) + log.Info("Log index block range updated", "tail", u.tailBlockNumber, "head", u.headBlockNumber, "log values", u.headLvPointer-u.tailBlockLvPointer) } // updatedRangeLength returns the lenght of the updated filter map range. @@ -378,7 +507,7 @@ func (u *updateBatch) updatedRangeLength() uint32 { // tailEpoch returns the tail epoch index. func (u *updateBatch) tailEpoch() uint32 { - return uint32(u.tailLvPointer >> (logValuesPerMap + logMapsPerEpoch)) + return uint32(u.tailBlockLvPointer >> (logValuesPerMap + logMapsPerEpoch)) } // getRowPtr returns a pointer to a FilterRow that can be modified. If the batch @@ -416,8 +545,8 @@ func (u *updateBatch) initWithBlock(header *types.Header, receipts types.Receipt return errors.New("already initialized") } u.initialized = true - u.headLvPointer, u.tailLvPointer = startLvPointer, startLvPointer - u.headBlockNumber, u.tailBlockNumber = header.Number.Uint64()-1, header.Number.Uint64() //TODO genesis? + u.headLvPointer, u.tailLvPointer, u.tailBlockLvPointer = startLvPointer, startLvPointer, startLvPointer + u.headBlockNumber, u.tailBlockNumber = header.Number.Uint64()-1, header.Number.Uint64() u.headBlockHash, u.tailParentHash = header.ParentHash, header.ParentHash u.addBlockToHead(header, receipts) return nil @@ -470,16 +599,23 @@ func (u *updateBatch) addBlockToHead(header *types.Header, receipts types.Receip // addValueToTail adds a single log value to the tail of the log index. func (u *updateBatch) addValueToTail(logValue common.Hash) error { - if u.tailLvPointer == 0 { + if u.tailBlockLvPointer == 0 { return errors.New("tail log value pointer underflow") } + if u.tailBlockLvPointer < u.tailLvPointer { + panic("tailBlockLvPointer < tailLvPointer") + } + u.tailBlockLvPointer-- + if u.tailBlockLvPointer >= u.tailLvPointer { + return nil // already added to the map + } u.tailLvPointer-- - mapIndex := uint32(u.tailLvPointer >> logValuesPerMap) + mapIndex := uint32(u.tailBlockLvPointer >> logValuesPerMap) rowPtr, err := u.getRowPtr(mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, logValue)) if err != nil { return err } - column := columnIndex(u.tailLvPointer, logValue) + column := columnIndex(u.tailBlockLvPointer, logValue) *rowPtr = append(*rowPtr, 0) copy((*rowPtr)[1:], (*rowPtr)[:len(*rowPtr)-1]) (*rowPtr)[0] = column diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 29e9adb42b..0948fa9d98 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -145,7 +145,7 @@ var ( FixedCommitteeRootKey = []byte("fixedRoot-") // bigEndian64(syncPeriod) -> committee root hash SyncCommitteeKey = []byte("committee-") // bigEndian64(syncPeriod) -> serialized committee - FilterMapsPrefix = []byte("fT5-") //TODO fm- + FilterMapsPrefix = []byte("fm-") filterMapsRangeKey = append(FilterMapsPrefix, byte('R')) filterMapRowPrefix = append(FilterMapsPrefix, byte('r')) // filterMapRowPrefix + mapRowIndex (uint64 big endian) -> filter row filterMapBlockPtrPrefix = append(FilterMapsPrefix, byte('b')) // filterMapBlockPtrPrefix + mapIndex (uint32 big endian) -> block number (uint64 big endian) diff --git a/eth/backend.go b/eth/backend.go index 1ffca08ea7..442f97eb2d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -216,7 +216,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if err != nil { return nil, err } - eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain) + eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain, config.LogHistory, config.LogNoHistory) if config.BlobPool.Datadir != "" { config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir) diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index c325e5010c..bcf1d1752a 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -52,6 +52,7 @@ var Defaults = Config{ NetworkId: 0, // enable auto configuration of networkID == chainID TxLookupLimit: 2350000, TransactionHistory: 2350000, + LogHistory: 2350000, StateHistory: params.FullImmutabilityThreshold, DatabaseCache: 512, TrieCleanCache: 154, @@ -94,6 +95,8 @@ type Config struct { TxLookupLimit uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved. TransactionHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved. + LogHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head where a log search index is maintained. + LogNoHistory bool `toml:",omitempty"` // No log search index is maintained. StateHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose state histories are reserved. // State scheme represents the scheme used to store ethereum states and trie