|
|
|
@ -22,6 +22,14 @@ const ( |
|
|
|
|
// updateLoop initializes and updates the log index structure according to the
|
|
|
|
|
// canonical chain.
|
|
|
|
|
func (f *FilterMaps) updateLoop() { |
|
|
|
|
defer f.closeWg.Done() |
|
|
|
|
f.updateMapCache() |
|
|
|
|
if rp, err := f.newUpdateBatch().makeRevertPoint(); err == nil { |
|
|
|
|
f.revertPoints[rp.blockNumber] = rp |
|
|
|
|
} else { |
|
|
|
|
log.Error("Error creating head revert point", "error", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
|
headEventCh = make(chan core.ChainHeadEvent) |
|
|
|
|
sub = f.chain.SubscribeChainHeadEvent(headEventCh) |
|
|
|
@ -54,8 +62,7 @@ func (f *FilterMaps) updateLoop() { |
|
|
|
|
case <-time.After(time.Second * 20): |
|
|
|
|
// keep updating log index during syncing
|
|
|
|
|
head = f.chain.CurrentBlock() |
|
|
|
|
case ch := <-f.closeCh: |
|
|
|
|
close(ch) |
|
|
|
|
case <-f.closeCh: |
|
|
|
|
stop = true |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -69,7 +76,10 @@ func (f *FilterMaps) updateLoop() { |
|
|
|
|
|
|
|
|
|
for !stop { |
|
|
|
|
if !fmr.initialized { |
|
|
|
|
f.tryInit(head) |
|
|
|
|
if !f.tryInit(head) { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if syncMatcher != nil { |
|
|
|
|
syncMatcher.synced(head) |
|
|
|
|
syncMatcher = nil |
|
|
|
@ -82,7 +92,9 @@ func (f *FilterMaps) updateLoop() { |
|
|
|
|
} |
|
|
|
|
// log index is initialized
|
|
|
|
|
if fmr.headBlockHash != head.Hash() { |
|
|
|
|
f.tryUpdateHead(head) |
|
|
|
|
if !f.tryUpdateHead(head) { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
fmr = f.getRange() |
|
|
|
|
if fmr.headBlockHash != head.Hash() { |
|
|
|
|
wait() |
|
|
|
@ -101,8 +113,7 @@ func (f *FilterMaps) updateLoop() { |
|
|
|
|
head = ev.Block.Header() |
|
|
|
|
case syncMatcher = <-f.matcherSyncCh: |
|
|
|
|
head = f.chain.CurrentBlock() |
|
|
|
|
case ch := <-f.closeCh: |
|
|
|
|
close(ch) |
|
|
|
|
case <-f.closeCh: |
|
|
|
|
stop = true |
|
|
|
|
return true |
|
|
|
|
default: |
|
|
|
@ -128,24 +139,34 @@ func (f *FilterMaps) getRange() filterMapsRange { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// tryInit attempts to initialize the log index structure.
|
|
|
|
|
func (f *FilterMaps) tryInit(head *types.Header) { |
|
|
|
|
// Returns false if indexer was stopped during a database reset. In this case the
|
|
|
|
|
// indexer should exit and remaining parts of the old database will be removed
|
|
|
|
|
// at next startup.
|
|
|
|
|
func (f *FilterMaps) tryInit(head *types.Header) bool { |
|
|
|
|
if !f.reset() { |
|
|
|
|
return false |
|
|
|
|
} |
|
|
|
|
receipts := rawdb.ReadRawReceipts(f.db, head.Hash(), head.Number.Uint64()) |
|
|
|
|
if receipts == nil { |
|
|
|
|
log.Error("Could not retrieve block receipts for init block", "number", head.Number, "hash", head.Hash()) |
|
|
|
|
return |
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
|
update := f.newUpdateBatch() |
|
|
|
|
if err := update.initWithBlock(head, receipts); err != nil { |
|
|
|
|
log.Error("Could not initialize log index", "error", err) |
|
|
|
|
} |
|
|
|
|
f.applyUpdateBatch(update) |
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// tryUpdateHead attempts to update the log index with a new head. If necessary,
|
|
|
|
|
// it reverts to a common ancestor with the old head before adding new block logs.
|
|
|
|
|
// If no suitable revert point is available (probably a reorg just after init)
|
|
|
|
|
// then it resets the index and tries to re-initialize with the new head.
|
|
|
|
|
func (f *FilterMaps) tryUpdateHead(newHead *types.Header) { |
|
|
|
|
// Returns false if indexer was stopped during a database reset. In this case the
|
|
|
|
|
// indexer should exit and remaining parts of the old database will be removed
|
|
|
|
|
// at next startup.
|
|
|
|
|
func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool { |
|
|
|
|
// iterate back from new head until the log index head or a revert point and
|
|
|
|
|
// collect headers of blocks to be added
|
|
|
|
|
var ( |
|
|
|
@ -159,14 +180,12 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) { |
|
|
|
|
rp, err = f.getRevertPoint(chainPtr.Number.Uint64()) |
|
|
|
|
if err != nil { |
|
|
|
|
log.Error("Error fetching revert point", "block number", chainPtr.Number.Uint64(), "error", err) |
|
|
|
|
return |
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
|
if rp == nil { |
|
|
|
|
// there are no more revert points available so we should reset and re-initialize
|
|
|
|
|
log.Warn("No suitable revert point exists; re-initializing log index", "block number", newHead.Number.Uint64()) |
|
|
|
|
f.reset() |
|
|
|
|
f.tryInit(newHead) |
|
|
|
|
return |
|
|
|
|
return f.tryInit(newHead) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if chainPtr.Hash() == rp.blockHash { |
|
|
|
@ -178,7 +197,7 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) { |
|
|
|
|
chainPtr = f.chain.GetHeader(chainPtr.ParentHash, chainPtr.Number.Uint64()-1) |
|
|
|
|
if chainPtr == nil { |
|
|
|
|
log.Error("Canonical header not found", "number", chainPtr.Number.Uint64()-1, "hash", chainPtr.ParentHash) |
|
|
|
|
return |
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if rp.blockHash != f.headBlockHash { |
|
|
|
@ -187,12 +206,12 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) { |
|
|
|
|
} |
|
|
|
|
if err := f.revertTo(rp); err != nil { |
|
|
|
|
log.Error("Error applying revert point", "block number", chainPtr.Number.Uint64(), "error", err) |
|
|
|
|
return |
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if newHeaders == nil { |
|
|
|
|
return |
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
|
// add logs of new blocks in reverse order
|
|
|
|
|
update := f.newUpdateBatch() |
|
|
|
@ -214,6 +233,7 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
f.applyUpdateBatch(update) |
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// tryExtendTail attempts to extend the log index backwards until it indexes the
|
|
|
|
|