core/filtermaps: always use correct absolute log index

pull/30370/head
Zsolt Felfoldi 1 month ago
parent d164b762c0
commit 7a627160f2
  1. 126
      core/filtermaps/indexer.go

@ -29,7 +29,6 @@ import (
)
const (
startLvMap = 1 << 31 // map index assigned to init block
removedPointer = math.MaxUint64 // used in updateBatch to signal removed items
revertPointFrequency = 256 // frequency of revert points in database
cachedRevertPoints = 64 // revert points for most recent blocks in memory
@ -157,7 +156,7 @@ func (f *FilterMaps) updateLoop() {
// log index is synced to the latest known chain head
matcherSync()
// process tail blocks if possible
if f.tryUpdateTail(head, func() bool {
if f.tryUpdateTail(func() bool {
// return true if tail processing needs to be stopped
select {
case ev := <-headEventCh:
@ -204,13 +203,14 @@ func (f *FilterMaps) tryInit(head *types.Header) bool {
if !f.reset() {
return false
}
head = f.chain.GetHeader(f.chain.GetCanonicalHash(0), 0)
receipts := f.chain.GetReceiptsByHash(head.Hash())
if receipts == nil {
log.Error("Could not retrieve block receipts for init block", "number", head.Number, "hash", head.Hash())
return true
}
update := f.newUpdateBatch()
if err := update.initWithBlock(head, receipts); err != nil {
if err := update.initWithBlock(head, receipts, 0); err != nil {
log.Error("Could not initialize log index", "error", err)
}
f.applyUpdateBatch(update)
@ -234,21 +234,24 @@ func (f *FilterMaps) tryUpdateHead(headFn func() *types.Header) {
defer func() {
if head.Hash() == f.headBlockHash {
if f.loggedHeadUpdate {
log.Info("Forward log indexing finished", "processed", f.headBlockNumber-f.ptrHeadUpdate,
"elapsed", common.PrettyDuration(time.Since(f.lastLogHeadUpdate)))
f.loggedHeadUpdate, f.startHeadUpdate = false, false
}
} else {
if time.Since(f.lastLogHeadUpdate) > logFrequency || !f.loggedHeadUpdate {
log.Info("Forward log indexing in progress", "processed", f.headBlockNumber-f.ptrHeadUpdate,
"remaining", head.Number.Uint64()-f.headBlockNumber,
log.Info("Forward log indexing finished", "filter maps", f.mapCount(f.logValuesPerMap), "first block", f.tailBlockNumber,
"last block", f.headBlockNumber, "processed", f.headBlockNumber-f.ptrHeadUpdate,
"elapsed", common.PrettyDuration(time.Since(f.startedHeadUpdate)))
f.loggedHeadUpdate = true
f.lastLogHeadUpdate = time.Now()
f.loggedHeadUpdate, f.startHeadUpdate = false, false
}
}
}()
printProgressLog := func() {
if time.Since(f.lastLogHeadUpdate) > logFrequency || !f.loggedHeadUpdate {
log.Info("Forward log indexing in progress", "filter maps", f.mapCount(f.logValuesPerMap), "first block", f.tailBlockNumber,
"last block", f.headBlockNumber, "processed", f.headBlockNumber-f.ptrHeadUpdate, "remaining", head.Number.Uint64()-f.headBlockNumber,
"elapsed", common.PrettyDuration(time.Since(f.startedHeadUpdate)))
f.loggedHeadUpdate = true
f.lastLogHeadUpdate = time.Now()
}
}
hc := newHeaderChain(f.chain, head.Number.Uint64(), head.Hash())
f.revertToCommonAncestor(head.Number.Uint64(), hc)
if !f.initialized {
@ -267,6 +270,7 @@ func (f *FilterMaps) tryUpdateHead(headFn func() *types.Header) {
// add new blocks
update := f.newUpdateBatch()
lastHeadEpoch := update.headEpoch()
for update.headBlockNumber < head.Number.Uint64() {
header := hc.getHeader(update.headBlockNumber + 1)
if header == nil {
@ -282,24 +286,48 @@ func (f *FilterMaps) tryUpdateHead(headFn func() *types.Header) {
log.Error("Error adding new block", "number", header.Number, "hash", header.Hash(), "error", err)
break
}
if update.updatedRangeLength() >= f.mapsPerEpoch {
if update.headBlockNumber+cachedRevertPoints > head.Number.Uint64() ||
update.headBlockNumber%revertPointFrequency == 0 {
if rp, err := update.makeRevertPoint(); err != nil {
log.Error("Error creating revert point", "block number", update.headBlockNumber, "error", err)
} else if rp != nil {
update.revertPoints[update.headBlockNumber] = rp
}
}
newHead := headFn()
if newHead == nil {
f.applyUpdateBatch(update)
printProgressLog()
return
}
if newHead.Hash() != head.Hash() {
head = newHead
hc = newHeaderChain(f.chain, head.Number.Uint64(), head.Hash())
if hc.getBlockHash(f.headBlockNumber) != f.headBlockHash {
f.applyUpdateBatch(update)
printProgressLog()
f.revertToCommonAncestor(head.Number.Uint64(), hc)
if !f.initialized {
return
}
update = f.newUpdateBatch()
}
}
if headEpoch := update.headEpoch(); headEpoch > lastHeadEpoch {
// limit the amount of data updated in a single batch
f.applyUpdateBatch(update)
newHead := headFn()
if newHead == nil {
return
// after adding 1 epoch of new log data remove at most 2 epochs of
// unwanted tail data if necessary
tailTarget := f.tailTarget()
if f.tailBlockNumber < tailTarget {
f.unindexTailEpoch(tailTarget)
}
if newHead.Hash() != head.Hash() {
head = newHead
hc = newHeaderChain(f.chain, head.Number.Uint64(), head.Hash())
if hc.getBlockHash(f.headBlockNumber) != f.headBlockHash {
f.revertToCommonAncestor(head.Number.Uint64(), hc)
if !f.initialized {
return
}
}
if f.tailBlockNumber < tailTarget {
f.unindexTailEpoch(tailTarget)
}
printProgressLog()
update = f.newUpdateBatch()
lastHeadEpoch = headEpoch
}
}
f.applyUpdateBatch(update)
@ -307,6 +335,9 @@ func (f *FilterMaps) tryUpdateHead(headFn func() *types.Header) {
// find the latest revert point that is the ancestor of the new head
func (f *FilterMaps) revertToCommonAncestor(headNum uint64, hc *headerChain) {
if hc.getBlockHash(f.headBlockNumber) == f.headBlockHash {
return
}
var (
number = headNum
rp *revertPoint
@ -332,9 +363,6 @@ func (f *FilterMaps) revertToCommonAncestor(headNum uint64, hc *headerChain) {
f.setRange(f.db, filterMapsRange{})
return
}
if rp.blockHash == f.headBlockHash {
return // found the head revert point, nothing to do
}
// revert to the common ancestor if necessary
if rp.blockNumber+128 <= f.headBlockNumber {
log.Warn("Rolling back log index", "old head", f.headBlockNumber, "new head", rp.blockNumber)
@ -349,13 +377,8 @@ func (f *FilterMaps) revertToCommonAncestor(headNum uint64, hc *headerChain) {
// stopFn is called regularly during the process, and if it returns true, the
// latest batch is written and the function returns.
// tryUpdateTail returns true if it has reached the desired history length.
func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool {
var tailTarget uint64
if f.history > 0 {
if headNum := head.Number.Uint64(); headNum >= f.history {
tailTarget = headNum + 1 - f.history
}
}
func (f *FilterMaps) tryUpdateTail(stopFn func() bool) bool {
tailTarget := f.tailTarget()
tailNum := f.tailBlockNumber
if tailNum > tailTarget {
if !f.tryExtendTail(tailTarget, stopFn) {
@ -368,14 +391,24 @@ func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool
return true
}
// tailTarget returns the target value for the tail block number according to the
// log history parameter and the current index head.
func (f *FilterMaps) tailTarget() uint64 {
if f.history == 0 || f.headBlockNumber < f.history {
return 0
}
return f.headBlockNumber + 1 - f.history
}
// tryExtendTail attempts to extend the log index backwards until the desired
// indexed history length is achieved. Returns true if finished.
func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
defer func() {
if f.tailBlockNumber <= tailTarget {
if f.loggedTailExtend {
log.Info("Reverse log indexing finished", "maps", f.mapCount(f.logValuesPerMap), "history", f.headBlockNumber+1-f.tailBlockNumber,
"processed", f.ptrTailExtend-f.tailBlockNumber, "elapsed", common.PrettyDuration(time.Since(f.startedTailExtend)))
log.Info("Reverse log indexing finished", "filter maps", f.mapCount(f.logValuesPerMap), "first block", f.tailBlockNumber,
"last block", f.headBlockNumber, "processed", f.ptrTailExtend-f.tailBlockNumber,
"elapsed", common.PrettyDuration(time.Since(f.startedTailExtend)))
f.loggedTailExtend = false
}
}
@ -397,8 +430,8 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
f.applyUpdateBatch(update)
if time.Since(f.lastLogTailExtend) > logFrequency || !f.loggedTailExtend {
log.Info("Reverse log indexing in progress", "maps", update.mapCount(f.logValuesPerMap), "history", update.headBlockNumber+1-update.tailBlockNumber,
"processed", f.ptrTailExtend-update.tailBlockNumber, "remaining", update.tailBlockNumber-tailTarget,
log.Info("Reverse log indexing in progress", "filter maps", f.mapCount(f.logValuesPerMap), "first block", f.tailBlockNumber,
"last block", f.headBlockNumber, "processed", f.ptrTailExtend-update.tailBlockNumber, "remaining", f.tailBlockNumber-tailTarget,
"elapsed", common.PrettyDuration(time.Since(f.startedTailExtend)))
f.loggedTailExtend = true
f.lastLogTailExtend = time.Now()
@ -710,6 +743,11 @@ func (u *updateBatch) updatedRangeLength() uint32 {
return u.afterLastMap - u.firstMap
}
// headEpoch returns the head epoch index.
func (u *updateBatch) headEpoch() uint32 {
return uint32(u.headLvPointer >> (u.f.logValuesPerMap + u.f.logMapsPerEpoch))
}
// tailEpoch returns the tail epoch index.
func (u *updateBatch) tailEpoch() uint32 {
return uint32(u.tailBlockLvPointer >> (u.f.logValuesPerMap + u.f.logMapsPerEpoch))
@ -745,12 +783,11 @@ func (u *updateBatch) getRowPtr(mapIndex, rowIndex uint32) (*FilterRow, error) {
}
// initWithBlock initializes the log index with the given block as head.
func (u *updateBatch) initWithBlock(header *types.Header, receipts types.Receipts) error {
func (u *updateBatch) initWithBlock(header *types.Header, receipts types.Receipts, startLvPointer uint64) error {
if u.initialized {
return errors.New("already initialized")
}
u.initialized = true
startLvPointer := uint64(startLvMap) << u.f.logValuesPerMap
u.headLvPointer, u.tailLvPointer, u.tailBlockLvPointer = startLvPointer, startLvPointer, startLvPointer
u.headBlockNumber, u.tailBlockNumber = header.Number.Uint64()-1, header.Number.Uint64()
u.headBlockHash, u.tailParentHash = header.ParentHash, header.ParentHash
@ -795,11 +832,6 @@ func (u *updateBatch) addBlockToHead(header *types.Header, receipts types.Receip
if (u.headBlockNumber-cachedRevertPoints)%revertPointFrequency != 0 {
delete(u.revertPoints, u.headBlockNumber-cachedRevertPoints)
}
if rp, err := u.makeRevertPoint(); err != nil {
return err
} else if rp != nil {
u.revertPoints[u.headBlockNumber] = rp
}
return nil
}

Loading…
Cancel
Save