core/filtermaps: added history.logs parameter

pull/30370/head
Zsolt Felfoldi 1 month ago
parent 4196e34a0d
commit e82c9994c1
  1. 2
      cmd/geth/chaincmd.go
  2. 2
      cmd/geth/main.go
  3. 17
      cmd/utils/flags.go
  4. 49
      core/filtermaps/filtermaps.go
  5. 168
      core/filtermaps/indexer.go
  6. 2
      core/rawdb/schema.go
  7. 2
      eth/backend.go
  8. 3
      eth/ethconfig/config.go

@ -101,6 +101,8 @@ if one is set. Otherwise it prints the genesis from the datadir.`,
utils.VMTraceFlag,
utils.VMTraceJsonConfigFlag,
utils.TransactionHistoryFlag,
utils.LogHistoryFlag,
utils.LogNoHistoryFlag,
utils.StateHistoryFlag,
}, utils.DatabaseFlags),
Description: `

@ -88,6 +88,8 @@ var (
utils.SnapshotFlag,
utils.TxLookupLimitFlag, // deprecated
utils.TransactionHistoryFlag,
utils.LogHistoryFlag,
utils.LogNoHistoryFlag,
utils.StateHistoryFlag,
utils.LightServeFlag, // deprecated
utils.LightIngressFlag, // deprecated

@ -279,6 +279,17 @@ var (
Value: ethconfig.Defaults.TransactionHistory,
Category: flags.StateCategory,
}
LogHistoryFlag = &cli.Uint64Flag{
Name: "history.logs",
Usage: "Number of recent blocks to maintain log search index for (default = about one year, 0 = entire chain)",
Value: ethconfig.Defaults.LogHistory,
Category: flags.StateCategory,
}
LogNoHistoryFlag = &cli.BoolFlag{
Name: "history.logs.disable",
Usage: "Do not maintain log search index",
Category: flags.StateCategory,
}
// Beacon client light sync settings
BeaconApiFlag = &cli.StringSliceFlag{
Name: "beacon.api",
@ -1728,6 +1739,12 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
cfg.StateScheme = rawdb.HashScheme
log.Warn("Forcing hash state-scheme for archive mode")
}
if ctx.IsSet(LogHistoryFlag.Name) {
cfg.LogHistory = ctx.Uint64(LogHistoryFlag.Name)
}
if ctx.IsSet(LogNoHistoryFlag.Name) {
cfg.LogNoHistory = true
}
if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheTrieFlag.Name) {
cfg.TrieCleanCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheTrieFlag.Name) / 100
}

@ -44,10 +44,13 @@ type blockchain interface {
// without the tree hashing and consensus changes:
// https://eips.ethereum.org/EIPS/eip-7745
type FilterMaps struct {
lock sync.RWMutex
db ethdb.Database
closeCh chan struct{}
closeWg sync.WaitGroup
lock sync.RWMutex
db ethdb.Database
closeCh chan struct{}
closeWg sync.WaitGroup
history uint64
noHistory bool
filterMapsRange
chain blockchain
matcherSyncCh chan *FilterMapsMatcherBackend
@ -87,24 +90,32 @@ var emptyRow = FilterRow{}
// filterMapsRange describes the block range that has been indexed and the log
// value index range it has been mapped to.
// Note that tailBlockLvPointer points to the earliest log value index belonging
// to the tail block while tailLvPointer points to the earliest log value index
// added to the corresponding filter map. The latter might point to an earlier
// index after tail blocks have been pruned because we do not remove tail values
// one by one, rather delete entire maps when all blocks that had log values in
// those maps are unindexed.
type filterMapsRange struct {
initialized bool
headLvPointer, tailLvPointer uint64
headBlockNumber, tailBlockNumber uint64
headBlockHash, tailParentHash common.Hash
initialized bool
headLvPointer, tailLvPointer, tailBlockLvPointer uint64
headBlockNumber, tailBlockNumber uint64
headBlockHash, tailParentHash common.Hash
}
// NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep
// the structure in sync with the given blockchain.
func NewFilterMaps(db ethdb.Database, chain blockchain) *FilterMaps {
func NewFilterMaps(db ethdb.Database, chain blockchain, history uint64, noHistory bool) *FilterMaps {
rs, err := rawdb.ReadFilterMapsRange(db)
if err != nil {
log.Error("Error reading log index range", "error", err)
}
fm := &FilterMaps{
db: db,
chain: chain,
closeCh: make(chan struct{}),
db: db,
chain: chain,
closeCh: make(chan struct{}),
history: history,
noHistory: noHistory,
filterMapsRange: filterMapsRange{
initialized: rs.Initialized,
headLvPointer: rs.HeadLvPointer,
@ -121,6 +132,11 @@ func NewFilterMaps(db ethdb.Database, chain blockchain) *FilterMaps {
lvPointerCache: lru.NewCache[uint64, uint64](1000),
revertPoints: make(map[uint64]*revertPoint),
}
fm.tailBlockLvPointer, err = fm.getBlockLvPointer(fm.tailBlockNumber)
if err != nil {
log.Error("Error fetching tail block pointer, resetting log index", "error", err)
fm.filterMapsRange = filterMapsRange{} // updateLoop resets the database
}
fm.closeWg.Add(2)
go fm.removeBloomBits()
go fm.updateLoop()
@ -200,7 +216,7 @@ func (f *FilterMaps) removeDbWithPrefix(prefix []byte, action string) bool {
// setRange updates the covered range and also adds the changes to the given batch.
// Note that this function assumes that the read/write lock is being held.
func (f *FilterMaps) setRange(batch ethdb.Batch, newRange filterMapsRange) {
func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newRange filterMapsRange) {
f.filterMapsRange = newRange
rs := rawdb.FilterMapsRange{
Initialized: newRange.initialized,
@ -227,7 +243,7 @@ func (f *FilterMaps) updateMapCache() {
defer f.filterMapLock.Unlock()
newFilterMapCache := make(map[uint32]*filterMap)
firstMap, afterLastMap := uint32(f.tailLvPointer>>logValuesPerMap), uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap)
firstMap, afterLastMap := uint32(f.tailBlockLvPointer>>logValuesPerMap), uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap)
headCacheFirst := firstMap + 1
if afterLastMap > headCacheFirst+headCacheSize {
headCacheFirst = afterLastMap - headCacheSize
@ -255,7 +271,7 @@ func (f *FilterMaps) updateMapCache() {
// If this is not the case then an invalid result or an error may be returned.
// Note that this function assumes that the read lock is being held.
func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
if lvIndex < f.tailLvPointer || lvIndex > f.headLvPointer {
if lvIndex < f.tailBlockLvPointer || lvIndex > f.headLvPointer {
return nil, nil
}
// find possible block range based on map to block pointers
@ -264,6 +280,9 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
if err != nil {
return nil, err
}
if firstBlockNumber < f.tailBlockNumber {
firstBlockNumber = f.tailBlockNumber
}
var lastBlockNumber uint64
if mapIndex+1 < uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap) {
lastBlockNumber, err = f.getMapBlockPtr(mapIndex + 1)

@ -23,6 +23,11 @@ const (
// canonical chain.
func (f *FilterMaps) updateLoop() {
defer f.closeWg.Done()
if f.noHistory {
f.reset()
return
}
f.updateMapCache()
if rp, err := f.newUpdateBatch().makeRevertPoint(); err == nil {
f.revertPoints[rp.blockNumber] = rp
@ -106,7 +111,7 @@ func (f *FilterMaps) updateLoop() {
syncMatcher = nil
}
// log index head is at latest chain head; process tail blocks if possible
f.tryExtendTail(func() bool {
f.tryUpdateTail(head, func() bool {
// return true if tail processing needs to be stopped
select {
case ev := <-headEventCh:
@ -236,19 +241,35 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
return true
}
// tryExtendTail attempts to extend the log index backwards until it indexes the
// genesis block or cannot find more block receipts. Since this is a long process,
// stopFn is called after adding each tail block and if it returns true, the
// tryUpdateTail attempts to extend or prune the log index according to the
// current head block number and the log history settings.
// stopFn is called regularly during the process, and if it returns true, the
// latest batch is written and the function returns.
func (f *FilterMaps) tryExtendTail(stopFn func() bool) {
func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) {
var tailTarget uint64
if f.history > 0 {
if headNum := head.Number.Uint64(); headNum >= f.history {
tailTarget = headNum + 1 - f.history
}
}
tailNum := f.getRange().tailBlockNumber
if tailNum > tailTarget {
f.tryExtendTail(tailTarget, stopFn)
}
if tailNum < tailTarget {
f.pruneTailPtr(tailTarget)
f.tryPruneTailMaps(tailTarget, stopFn)
}
}
// tryExtendTail attempts to extend the log index backwards until it indexes the
// tail target block or cannot find more block receipts.
func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) {
fmr := f.getRange()
number, parentHash := fmr.tailBlockNumber, fmr.tailParentHash
if number == 0 {
return
}
update := f.newUpdateBatch()
lastTailEpoch := update.tailEpoch()
for number > 0 && !stopFn() {
for number > tailTarget && !stopFn() {
if tailEpoch := update.tailEpoch(); tailEpoch < lastTailEpoch {
// limit the amount of data updated in a single batch
f.applyUpdateBatch(update)
@ -274,6 +295,114 @@ func (f *FilterMaps) tryExtendTail(stopFn func() bool) {
f.applyUpdateBatch(update)
}
// pruneTailPtr updates the tail block number and hash and the corresponding
// tailBlockLvPointer according to the given tail target block number.
// Note that this function does not remove old index data, only marks it unused
// by updating the tail pointers, except for targetLvPointer which is unchanged
// as it marks the tail of the log index data stored in the database.
func (f *FilterMaps) pruneTailPtr(tailTarget uint64) {
f.lock.Lock()
defer f.lock.Unlock()
// obtain target log value pointer
if tailTarget <= f.tailBlockNumber || tailTarget > f.headBlockNumber {
return // nothing to do
}
targetLvPointer, err := f.getBlockLvPointer(tailTarget)
fmr := f.filterMapsRange
if err != nil {
log.Error("Error fetching tail target log value pointer", "block number", tailTarget, "error", err)
}
// obtain tail target's parent hash
var tailParentHash common.Hash
if tailTarget > 0 {
if f.chain.GetCanonicalHash(fmr.headBlockNumber) != fmr.headBlockHash {
return // if a reorg is happening right now then try again later
}
tailParentHash = f.chain.GetCanonicalHash(tailTarget - 1)
if f.chain.GetCanonicalHash(fmr.headBlockNumber) != fmr.headBlockHash {
return // check again to make sure that tailParentHash is consistent with the indexed chain
}
}
fmr.tailBlockNumber, fmr.tailParentHash = tailTarget, tailParentHash
fmr.tailBlockLvPointer = targetLvPointer
f.setRange(f.db, fmr)
}
// tryPruneTailMaps removes unused filter maps and corresponding log index
// pointers from the database. This function also updates targetLvPointer.
func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) {
fmr := f.getRange()
tailMap := uint32(fmr.tailLvPointer >> logValuesPerMap)
targetMap := uint32(fmr.tailBlockLvPointer >> logValuesPerMap)
if tailMap >= targetMap {
return
}
lastEpoch := (targetMap - 1) >> logMapsPerEpoch
removeLvPtr, err := f.getMapBlockPtr(tailMap)
if err != nil {
log.Error("Error fetching tail map block pointer", "map index", tailMap, "error", err)
removeLvPtr = math.MaxUint64 // do not remove anything
}
var (
logged bool
lastLogged time.Time
)
for tailMap < targetMap && !stopFn() {
tailEpoch := tailMap >> logMapsPerEpoch
if tailEpoch == lastEpoch {
f.pruneMaps(tailMap, targetMap, &removeLvPtr)
break
}
nextTailMap := (tailEpoch + 1) << logMapsPerEpoch
f.pruneMaps(tailMap, nextTailMap, &removeLvPtr)
tailMap = nextTailMap
if !logged || time.Since(lastLogged) >= time.Second*10 {
log.Info("Pruning log index tail...", "filter maps left", targetMap-tailMap)
logged, lastLogged = true, time.Now()
}
}
if logged {
log.Info("Finished pruning log index tail", "filter maps left", targetMap-tailMap)
}
}
// pruneMaps removes filter maps and corresponding log index pointers in the
// specified range in a single batch.
func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) {
nextBlockNumber, err := f.getMapBlockPtr(afterLast)
if err != nil {
log.Error("Error fetching next map block pointer", "map index", afterLast, "error", err)
nextBlockNumber = 0 // do not remove anything
}
batch := f.db.NewBatch()
for *removeLvPtr < nextBlockNumber {
f.deleteBlockLvPointer(batch, *removeLvPtr)
(*removeLvPtr)++
}
for mapIndex := first; mapIndex < afterLast; mapIndex++ {
f.deleteMapBlockPtr(batch, mapIndex)
}
for rowIndex := uint32(0); rowIndex < mapHeight; rowIndex++ {
for mapIndex := first; mapIndex < afterLast; mapIndex++ {
f.storeFilterMapRow(batch, mapIndex, rowIndex, emptyRow)
}
}
fmr := f.getRange()
fmr.tailLvPointer = uint64(afterLast) << logValuesPerMap
if fmr.tailLvPointer > fmr.tailBlockLvPointer {
log.Error("Cannot prune filter maps beyond tail block log value pointer", "tailLvPointer", fmr.tailLvPointer, "tailBlockLvPointer", fmr.tailBlockLvPointer)
return
}
f.setRange(batch, fmr)
if err := batch.Write(); err != nil {
log.Crit("Could not write update batch", "error", err)
}
}
// updateBatch is a memory overlay collecting changes to the index log structure
// that can be written to the database in a single batch while the in-memory
// representations in FilterMaps are also updated.
@ -368,7 +497,7 @@ func (f *FilterMaps) applyUpdateBatch(u *updateBatch) {
if err := batch.Write(); err != nil {
log.Crit("Could not write update batch", "error", err)
}
log.Info("Log index block range updated", "tail", u.tailBlockNumber, "head", u.headBlockNumber, "log values", u.headLvPointer-u.tailLvPointer)
log.Info("Log index block range updated", "tail", u.tailBlockNumber, "head", u.headBlockNumber, "log values", u.headLvPointer-u.tailBlockLvPointer)
}
// updatedRangeLength returns the lenght of the updated filter map range.
@ -378,7 +507,7 @@ func (u *updateBatch) updatedRangeLength() uint32 {
// tailEpoch returns the tail epoch index.
func (u *updateBatch) tailEpoch() uint32 {
return uint32(u.tailLvPointer >> (logValuesPerMap + logMapsPerEpoch))
return uint32(u.tailBlockLvPointer >> (logValuesPerMap + logMapsPerEpoch))
}
// getRowPtr returns a pointer to a FilterRow that can be modified. If the batch
@ -416,8 +545,8 @@ func (u *updateBatch) initWithBlock(header *types.Header, receipts types.Receipt
return errors.New("already initialized")
}
u.initialized = true
u.headLvPointer, u.tailLvPointer = startLvPointer, startLvPointer
u.headBlockNumber, u.tailBlockNumber = header.Number.Uint64()-1, header.Number.Uint64() //TODO genesis?
u.headLvPointer, u.tailLvPointer, u.tailBlockLvPointer = startLvPointer, startLvPointer, startLvPointer
u.headBlockNumber, u.tailBlockNumber = header.Number.Uint64()-1, header.Number.Uint64()
u.headBlockHash, u.tailParentHash = header.ParentHash, header.ParentHash
u.addBlockToHead(header, receipts)
return nil
@ -470,16 +599,23 @@ func (u *updateBatch) addBlockToHead(header *types.Header, receipts types.Receip
// addValueToTail adds a single log value to the tail of the log index.
func (u *updateBatch) addValueToTail(logValue common.Hash) error {
if u.tailLvPointer == 0 {
if u.tailBlockLvPointer == 0 {
return errors.New("tail log value pointer underflow")
}
if u.tailBlockLvPointer < u.tailLvPointer {
panic("tailBlockLvPointer < tailLvPointer")
}
u.tailBlockLvPointer--
if u.tailBlockLvPointer >= u.tailLvPointer {
return nil // already added to the map
}
u.tailLvPointer--
mapIndex := uint32(u.tailLvPointer >> logValuesPerMap)
mapIndex := uint32(u.tailBlockLvPointer >> logValuesPerMap)
rowPtr, err := u.getRowPtr(mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, logValue))
if err != nil {
return err
}
column := columnIndex(u.tailLvPointer, logValue)
column := columnIndex(u.tailBlockLvPointer, logValue)
*rowPtr = append(*rowPtr, 0)
copy((*rowPtr)[1:], (*rowPtr)[:len(*rowPtr)-1])
(*rowPtr)[0] = column

@ -145,7 +145,7 @@ var (
FixedCommitteeRootKey = []byte("fixedRoot-") // bigEndian64(syncPeriod) -> committee root hash
SyncCommitteeKey = []byte("committee-") // bigEndian64(syncPeriod) -> serialized committee
FilterMapsPrefix = []byte("fT5-") //TODO fm-
FilterMapsPrefix = []byte("fm-")
filterMapsRangeKey = append(FilterMapsPrefix, byte('R'))
filterMapRowPrefix = append(FilterMapsPrefix, byte('r')) // filterMapRowPrefix + mapRowIndex (uint64 big endian) -> filter row
filterMapBlockPtrPrefix = append(FilterMapsPrefix, byte('b')) // filterMapBlockPtrPrefix + mapIndex (uint32 big endian) -> block number (uint64 big endian)

@ -216,7 +216,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
if err != nil {
return nil, err
}
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain)
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain, config.LogHistory, config.LogNoHistory)
if config.BlobPool.Datadir != "" {
config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)

@ -52,6 +52,7 @@ var Defaults = Config{
NetworkId: 0, // enable auto configuration of networkID == chainID
TxLookupLimit: 2350000,
TransactionHistory: 2350000,
LogHistory: 2350000,
StateHistory: params.FullImmutabilityThreshold,
DatabaseCache: 512,
TrieCleanCache: 154,
@ -94,6 +95,8 @@ type Config struct {
TxLookupLimit uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.
TransactionHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.
LogHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head where a log search index is maintained.
LogNoHistory bool `toml:",omitempty"` // No log search index is maintained.
StateHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose state histories are reserved.
// State scheme represents the scheme used to store ethereum states and trie

Loading…
Cancel
Save