Zsolt Felfoldi 2 months ago
parent 3ca2602aa4
commit 5a12b94cc7
  1. 18
      core/filtermaps/filtermaps.go
  2. 68
      core/filtermaps/indexer.go

@ -14,6 +14,7 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)
@ -28,6 +29,13 @@ const (
headCacheSize = 8 // maximum number of recent filter maps cached in memory
)
type blockchain interface {
CurrentBlock() *types.Header
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
GetHeader(hash common.Hash, number uint64) *types.Header
GetCanonicalHash(number uint64) common.Hash
}
// FilterMaps is the in-memory representation of the log index structure that is
// responsible for building and updating the index according to the canonical
// chain.
@ -40,7 +48,12 @@ type FilterMaps struct {
closeCh chan chan struct{}
filterMapsRange
chain *core.BlockChain
chain blockchain
chainHeadLock sync.Mutex
chainHeadCh chan *types.Header
chainHead *types.Header
chainHeadCount uint64
// filterMapCache caches certain filter maps (headCacheSize most recent maps
// and one tail map) that are expected to be frequently accessed and modified
@ -86,7 +99,7 @@ type filterMapsRange struct {
// NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep
// the structure in sync with the given blockchain.
func NewFilterMaps(db ethdb.Database, chain *core.BlockChain) *FilterMaps {
func NewFilterMaps(db ethdb.Database, chain blockchain) *FilterMaps {
rs, err := rawdb.ReadFilterMapsRange(db)
if err != nil {
log.Error("Error reading log index range", "error", err)
@ -104,6 +117,7 @@ func NewFilterMaps(db ethdb.Database, chain *core.BlockChain) *FilterMaps {
headBlockHash: rs.HeadBlockHash,
tailParentHash: rs.TailParentHash,
},
chainHeadCh: make(chan *types.Header, 10),
filterMapCache: make(map[uint32]*filterMap),
blockPtrCache: lru.NewCache[uint32, uint64](1000),
lvPointerCache: lru.NewCache[uint64, uint64](1000),

@ -19,42 +19,67 @@ const (
cachedRevertPoints = 64 // revert points for most recent blocks in memory
)
// updateLoop initializes and updates the log index structure according to the
// canonical chain.
func (f *FilterMaps) updateLoop() {
headEventCh := make(chan core.ChainHeadEvent)
sub := f.chain.SubscribeChainHeadEvent(headEventCh)
defer sub.Unsubscribe()
func (f *FilterMaps) UpdateHead() bool {
f.chainHeadLock.Lock()
defer f.chainHeadLock.Unlock()
head := f.chain.CurrentBlock()
if head == nil {
return false
}
if f.chainHead == nil || head.Hash() != f.chainHead.Hash() {
select {
case ev := <-headEventCh:
head = ev.Block.Header()
case ch := <-f.closeCh:
close(ch)
return
case f.chainHeadCh <- head:
f.chainHead = head
f.chainHeadCount++
return true
default:
}
}
fmr := f.getRange()
return false
}
var stop bool
// updateLoop initializes and updates the log index structure according to the
// canonical chain.
func (f *FilterMaps) updateLoop() {
headEventCh := make(chan core.ChainHeadEvent)
sub := f.chain.SubscribeChainHeadEvent(headEventCh)
defer sub.Unsubscribe()
var (
head *types.Header
stop bool
)
wait := func() {
if stop {
return
}
select {
case ev := <-headEventCh:
head = ev.Block.Header()
case <-headEventCh:
if f.UpdateHead() {
head = <-f.chainHeadCh
}
case head = <-f.chainHeadCh:
case <-time.After(time.Second * 20):
// keep updating log index during syncing
head = f.chain.CurrentBlock()
if f.UpdateHead() {
head = <-f.chainHeadCh
}
case ch := <-f.closeCh:
close(ch)
stop = true
}
}
f.UpdateHead()
for head == nil {
wait()
if stop {
return
}
}
fmr := f.getRange()
for !stop {
if !fmr.initialized {
f.tryInit(head)
@ -77,14 +102,19 @@ func (f *FilterMaps) updateLoop() {
f.tryExtendTail(func() bool {
// return true if tail processing needs to be stopped
select {
case ev := <-headEventCh:
head = ev.Block.Header()
case <-headEventCh:
if f.UpdateHead() {
head = <-f.chainHeadCh
}
case head = <-f.chainHeadCh:
case ch := <-f.closeCh:
close(ch)
stop = true
return true
default:
head = f.chain.CurrentBlock()
if f.UpdateHead() {
head = <-f.chainHeadCh
}
}
// stop if there is a new chain head (always prioritize head updates)
return fmr.headBlockHash != head.Hash()

Loading…
Cancel
Save