diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index e844b5013c..ad4dabae2c 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -29,7 +29,6 @@ import ( ) const ( - startLvMap = 1 << 31 // map index assigned to init block removedPointer = math.MaxUint64 // used in updateBatch to signal removed items revertPointFrequency = 256 // frequency of revert points in database cachedRevertPoints = 64 // revert points for most recent blocks in memory @@ -157,7 +156,7 @@ func (f *FilterMaps) updateLoop() { // log index is synced to the latest known chain head matcherSync() // process tail blocks if possible - if f.tryUpdateTail(head, func() bool { + if f.tryUpdateTail(func() bool { // return true if tail processing needs to be stopped select { case ev := <-headEventCh: @@ -204,13 +203,14 @@ func (f *FilterMaps) tryInit(head *types.Header) bool { if !f.reset() { return false } + head = f.chain.GetHeader(f.chain.GetCanonicalHash(0), 0) receipts := f.chain.GetReceiptsByHash(head.Hash()) if receipts == nil { log.Error("Could not retrieve block receipts for init block", "number", head.Number, "hash", head.Hash()) return true } update := f.newUpdateBatch() - if err := update.initWithBlock(head, receipts); err != nil { + if err := update.initWithBlock(head, receipts, 0); err != nil { log.Error("Could not initialize log index", "error", err) } f.applyUpdateBatch(update) @@ -234,21 +234,24 @@ func (f *FilterMaps) tryUpdateHead(headFn func() *types.Header) { defer func() { if head.Hash() == f.headBlockHash { if f.loggedHeadUpdate { - log.Info("Forward log indexing finished", "processed", f.headBlockNumber-f.ptrHeadUpdate, - "elapsed", common.PrettyDuration(time.Since(f.lastLogHeadUpdate))) - f.loggedHeadUpdate, f.startHeadUpdate = false, false - } - } else { - if time.Since(f.lastLogHeadUpdate) > logFrequency || !f.loggedHeadUpdate { - log.Info("Forward log indexing in progress", "processed", f.headBlockNumber-f.ptrHeadUpdate, - "remaining", head.Number.Uint64()-f.headBlockNumber, + log.Info("Forward log indexing finished", "filter maps", f.mapCount(f.logValuesPerMap), "first block", f.tailBlockNumber, + "last block", f.headBlockNumber, "processed", f.headBlockNumber-f.ptrHeadUpdate, "elapsed", common.PrettyDuration(time.Since(f.startedHeadUpdate))) - f.loggedHeadUpdate = true - f.lastLogHeadUpdate = time.Now() + f.loggedHeadUpdate, f.startHeadUpdate = false, false } } }() + printProgressLog := func() { + if time.Since(f.lastLogHeadUpdate) > logFrequency || !f.loggedHeadUpdate { + log.Info("Forward log indexing in progress", "filter maps", f.mapCount(f.logValuesPerMap), "first block", f.tailBlockNumber, + "last block", f.headBlockNumber, "processed", f.headBlockNumber-f.ptrHeadUpdate, "remaining", head.Number.Uint64()-f.headBlockNumber, + "elapsed", common.PrettyDuration(time.Since(f.startedHeadUpdate))) + f.loggedHeadUpdate = true + f.lastLogHeadUpdate = time.Now() + } + } + hc := newHeaderChain(f.chain, head.Number.Uint64(), head.Hash()) f.revertToCommonAncestor(head.Number.Uint64(), hc) if !f.initialized { @@ -267,6 +270,7 @@ func (f *FilterMaps) tryUpdateHead(headFn func() *types.Header) { // add new blocks update := f.newUpdateBatch() + lastHeadEpoch := update.headEpoch() for update.headBlockNumber < head.Number.Uint64() { header := hc.getHeader(update.headBlockNumber + 1) if header == nil { @@ -282,24 +286,48 @@ func (f *FilterMaps) tryUpdateHead(headFn func() *types.Header) { log.Error("Error adding new block", "number", header.Number, "hash", header.Hash(), "error", err) break } - if update.updatedRangeLength() >= f.mapsPerEpoch { + if update.headBlockNumber+cachedRevertPoints > head.Number.Uint64() || + update.headBlockNumber%revertPointFrequency == 0 { + if rp, err := update.makeRevertPoint(); err != nil { + log.Error("Error creating revert point", "block number", update.headBlockNumber, "error", err) + } else if rp != nil { + update.revertPoints[update.headBlockNumber] = rp + } + } + newHead := headFn() + if newHead == nil { + f.applyUpdateBatch(update) + printProgressLog() + return + } + if newHead.Hash() != head.Hash() { + head = newHead + hc = newHeaderChain(f.chain, head.Number.Uint64(), head.Hash()) + if hc.getBlockHash(f.headBlockNumber) != f.headBlockHash { + f.applyUpdateBatch(update) + printProgressLog() + f.revertToCommonAncestor(head.Number.Uint64(), hc) + if !f.initialized { + return + } + update = f.newUpdateBatch() + } + } + if headEpoch := update.headEpoch(); headEpoch > lastHeadEpoch { // limit the amount of data updated in a single batch f.applyUpdateBatch(update) - newHead := headFn() - if newHead == nil { - return + // after adding 1 epoch of new log data remove at most 2 epochs of + // unwanted tail data if necessary + tailTarget := f.tailTarget() + if f.tailBlockNumber < tailTarget { + f.unindexTailEpoch(tailTarget) } - if newHead.Hash() != head.Hash() { - head = newHead - hc = newHeaderChain(f.chain, head.Number.Uint64(), head.Hash()) - if hc.getBlockHash(f.headBlockNumber) != f.headBlockHash { - f.revertToCommonAncestor(head.Number.Uint64(), hc) - if !f.initialized { - return - } - } + if f.tailBlockNumber < tailTarget { + f.unindexTailEpoch(tailTarget) } + printProgressLog() update = f.newUpdateBatch() + lastHeadEpoch = headEpoch } } f.applyUpdateBatch(update) @@ -307,6 +335,9 @@ func (f *FilterMaps) tryUpdateHead(headFn func() *types.Header) { // find the latest revert point that is the ancestor of the new head func (f *FilterMaps) revertToCommonAncestor(headNum uint64, hc *headerChain) { + if hc.getBlockHash(f.headBlockNumber) == f.headBlockHash { + return + } var ( number = headNum rp *revertPoint @@ -332,9 +363,6 @@ func (f *FilterMaps) revertToCommonAncestor(headNum uint64, hc *headerChain) { f.setRange(f.db, filterMapsRange{}) return } - if rp.blockHash == f.headBlockHash { - return // found the head revert point, nothing to do - } // revert to the common ancestor if necessary if rp.blockNumber+128 <= f.headBlockNumber { log.Warn("Rolling back log index", "old head", f.headBlockNumber, "new head", rp.blockNumber) @@ -349,13 +377,8 @@ func (f *FilterMaps) revertToCommonAncestor(headNum uint64, hc *headerChain) { // stopFn is called regularly during the process, and if it returns true, the // latest batch is written and the function returns. // tryUpdateTail returns true if it has reached the desired history length. -func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool { - var tailTarget uint64 - if f.history > 0 { - if headNum := head.Number.Uint64(); headNum >= f.history { - tailTarget = headNum + 1 - f.history - } - } +func (f *FilterMaps) tryUpdateTail(stopFn func() bool) bool { + tailTarget := f.tailTarget() tailNum := f.tailBlockNumber if tailNum > tailTarget { if !f.tryExtendTail(tailTarget, stopFn) { @@ -368,14 +391,24 @@ func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool return true } +// tailTarget returns the target value for the tail block number according to the +// log history parameter and the current index head. +func (f *FilterMaps) tailTarget() uint64 { + if f.history == 0 || f.headBlockNumber < f.history { + return 0 + } + return f.headBlockNumber + 1 - f.history +} + // tryExtendTail attempts to extend the log index backwards until the desired // indexed history length is achieved. Returns true if finished. func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool { defer func() { if f.tailBlockNumber <= tailTarget { if f.loggedTailExtend { - log.Info("Reverse log indexing finished", "maps", f.mapCount(f.logValuesPerMap), "history", f.headBlockNumber+1-f.tailBlockNumber, - "processed", f.ptrTailExtend-f.tailBlockNumber, "elapsed", common.PrettyDuration(time.Since(f.startedTailExtend))) + log.Info("Reverse log indexing finished", "filter maps", f.mapCount(f.logValuesPerMap), "first block", f.tailBlockNumber, + "last block", f.headBlockNumber, "processed", f.ptrTailExtend-f.tailBlockNumber, + "elapsed", common.PrettyDuration(time.Since(f.startedTailExtend))) f.loggedTailExtend = false } } @@ -397,8 +430,8 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool { f.applyUpdateBatch(update) if time.Since(f.lastLogTailExtend) > logFrequency || !f.loggedTailExtend { - log.Info("Reverse log indexing in progress", "maps", update.mapCount(f.logValuesPerMap), "history", update.headBlockNumber+1-update.tailBlockNumber, - "processed", f.ptrTailExtend-update.tailBlockNumber, "remaining", update.tailBlockNumber-tailTarget, + log.Info("Reverse log indexing in progress", "filter maps", f.mapCount(f.logValuesPerMap), "first block", f.tailBlockNumber, + "last block", f.headBlockNumber, "processed", f.ptrTailExtend-update.tailBlockNumber, "remaining", f.tailBlockNumber-tailTarget, "elapsed", common.PrettyDuration(time.Since(f.startedTailExtend))) f.loggedTailExtend = true f.lastLogTailExtend = time.Now() @@ -710,6 +743,11 @@ func (u *updateBatch) updatedRangeLength() uint32 { return u.afterLastMap - u.firstMap } +// headEpoch returns the head epoch index. +func (u *updateBatch) headEpoch() uint32 { + return uint32(u.headLvPointer >> (u.f.logValuesPerMap + u.f.logMapsPerEpoch)) +} + // tailEpoch returns the tail epoch index. func (u *updateBatch) tailEpoch() uint32 { return uint32(u.tailBlockLvPointer >> (u.f.logValuesPerMap + u.f.logMapsPerEpoch)) @@ -745,12 +783,11 @@ func (u *updateBatch) getRowPtr(mapIndex, rowIndex uint32) (*FilterRow, error) { } // initWithBlock initializes the log index with the given block as head. -func (u *updateBatch) initWithBlock(header *types.Header, receipts types.Receipts) error { +func (u *updateBatch) initWithBlock(header *types.Header, receipts types.Receipts, startLvPointer uint64) error { if u.initialized { return errors.New("already initialized") } u.initialized = true - startLvPointer := uint64(startLvMap) << u.f.logValuesPerMap u.headLvPointer, u.tailLvPointer, u.tailBlockLvPointer = startLvPointer, startLvPointer, startLvPointer u.headBlockNumber, u.tailBlockNumber = header.Number.Uint64()-1, header.Number.Uint64() u.headBlockHash, u.tailParentHash = header.ParentHash, header.ParentHash @@ -795,11 +832,6 @@ func (u *updateBatch) addBlockToHead(header *types.Header, receipts types.Receip if (u.headBlockNumber-cachedRevertPoints)%revertPointFrequency != 0 { delete(u.revertPoints, u.headBlockNumber-cachedRevertPoints) } - if rp, err := u.makeRevertPoint(); err != nil { - return err - } else if rp != nil { - u.revertPoints[u.headBlockNumber] = rp - } return nil }