eth/filters: implement log filter using new log index

pull/31080/head
Zsolt Felfoldi 2 months ago
parent 26671e5488
commit 86f52e7580
  1. 3
      cmd/geth/chaincmd.go
  2. 3
      cmd/geth/main.go
  3. 25
      cmd/utils/flags.go
  4. 14
      core/blockchain.go
  5. 5
      eth/api_backend.go
  6. 99
      eth/backend.go
  7. 8
      eth/ethconfig/config.go
  8. 339
      eth/filters/filter.go
  9. 5
      eth/filters/filter_system.go
  10. 59
      eth/filters/filter_system_test.go
  11. 184
      eth/filters/filter_test.go
  12. 6
      internal/ethapi/api_test.go
  13. 6
      internal/ethapi/backend.go
  14. 10
      internal/ethapi/transaction_args_test.go
  15. 2
      triedb/pathdb/journal.go

@ -100,6 +100,9 @@ if one is set. Otherwise it prints the genesis from the datadir.`,
utils.VMTraceFlag,
utils.VMTraceJsonConfigFlag,
utils.TransactionHistoryFlag,
utils.LogHistoryFlag,
utils.LogNoHistoryFlag,
utils.LogExportCheckpointsFlag,
utils.StateHistoryFlag,
}, utils.DatabaseFlags),
Description: `

@ -86,6 +86,9 @@ var (
utils.SnapshotFlag,
utils.TxLookupLimitFlag, // deprecated
utils.TransactionHistoryFlag,
utils.LogHistoryFlag,
utils.LogNoHistoryFlag,
utils.LogExportCheckpointsFlag,
utils.StateHistoryFlag,
utils.LightServeFlag, // deprecated
utils.LightIngressFlag, // deprecated

@ -272,6 +272,22 @@ var (
Value: ethconfig.Defaults.TransactionHistory,
Category: flags.StateCategory,
}
LogHistoryFlag = &cli.Uint64Flag{
Name: "history.logs",
Usage: "Number of recent blocks to maintain log search index for (default = about one year, 0 = entire chain)",
Value: ethconfig.Defaults.LogHistory,
Category: flags.StateCategory,
}
LogNoHistoryFlag = &cli.BoolFlag{
Name: "history.logs.disable",
Usage: "Do not maintain log search index",
Category: flags.StateCategory,
}
LogExportCheckpointsFlag = &cli.StringFlag{
Name: "history.logs.export",
Usage: "Export checkpoints to file in go source file format",
Value: "",
}
// Beacon client light sync settings
BeaconApiFlag = &cli.StringSliceFlag{
Name: "beacon.api",
@ -1662,6 +1678,15 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
cfg.StateScheme = rawdb.HashScheme
log.Warn("Forcing hash state-scheme for archive mode")
}
if ctx.IsSet(LogHistoryFlag.Name) {
cfg.LogHistory = ctx.Uint64(LogHistoryFlag.Name)
}
if ctx.IsSet(LogNoHistoryFlag.Name) {
cfg.LogNoHistory = true
}
if ctx.IsSet(LogExportCheckpointsFlag.Name) {
cfg.LogExportCheckpoints = ctx.String(LogExportCheckpointsFlag.Name)
}
if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheTrieFlag.Name) {
cfg.TrieCleanCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheTrieFlag.Name) / 100
}

@ -228,6 +228,7 @@ type BlockChain struct {
chainHeadFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
blockProcFeedCount int
scope event.SubscriptionScope
genesisBlock *types.Block
@ -1577,8 +1578,6 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
if len(chain) == 0 {
return 0, nil
}
bc.blockProcFeed.Send(true)
defer bc.blockProcFeed.Send(false)
// Do a sanity check that the provided chain is actually ordered and linked.
for i := 1; i < len(chain); i++ {
@ -1618,6 +1617,17 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
if bc.insertStopped() {
return nil, 0, nil
}
if bc.blockProcFeedCount == 0 {
bc.blockProcFeed.Send(true)
}
bc.blockProcFeedCount++
defer func() {
bc.blockProcFeedCount--
if bc.blockProcFeedCount == 0 {
bc.blockProcFeed.Send(false)
}
}()
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
SenderCacher().RecoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number(), chain[0].Time()), chain)

@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/txpool"
@ -411,6 +412,10 @@ func (b *EthAPIBackend) ServiceFilter(ctx context.Context, session *bloombits.Ma
}
}
func (b *EthAPIBackend) NewMatcherBackend() filtermaps.MatcherBackend {
return b.eth.filterMaps.NewMatcherBackend()
}
func (b *EthAPIBackend) Engine() consensus.Engine {
return b.eth.engine
}

@ -23,6 +23,7 @@ import (
"math/big"
"runtime"
"sync"
"time"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
@ -30,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/pruner"
"github.com/ethereum/go-ethereum/core/txpool"
@ -85,6 +87,9 @@ type Ethereum struct {
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
closeBloomHandler chan struct{}
filterMaps *filtermaps.FilterMaps
closeFilterMaps chan chan struct{}
APIBackend *EthAPIBackend
miner *miner.Miner
@ -222,6 +227,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
return nil, err
}
eth.bloomIndexer.Start(eth.blockchain)
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.newChainView(eth.blockchain.CurrentBlock()), filtermaps.DefaultParams, config.LogHistory, 1000, config.LogNoHistory, config.LogExportCheckpoints)
eth.closeFilterMaps = make(chan chan struct{})
if config.BlobPool.Datadir != "" {
config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)
@ -364,9 +371,97 @@ func (s *Ethereum) Start() error {
// Start the networking layer
s.handler.Start(s.p2pServer.MaxPeers)
// start log indexer
s.filterMaps.Start()
go s.updateFilterMapsHeads()
return nil
}
func (s *Ethereum) newChainView(head *types.Header) *filtermaps.StoredChainView {
if head == nil {
return nil
}
return filtermaps.NewStoredChainView(s.blockchain, head.Number.Uint64(), head.Hash())
}
func (s *Ethereum) updateFilterMapsHeads() {
headEventCh := make(chan core.ChainEvent, 10)
blockProcCh := make(chan bool, 10)
sub := s.blockchain.SubscribeChainEvent(headEventCh)
sub2 := s.blockchain.SubscribeBlockProcessingEvent(blockProcCh)
defer func() {
sub.Unsubscribe()
sub2.Unsubscribe()
for {
select {
case <-headEventCh:
case <-blockProcCh:
default:
return
}
}
}()
head := s.blockchain.CurrentBlock()
targetView := s.newChainView(head) // nil if already sent to channel
var blockProc, lastBlockProc bool
setHead := func(newHead *types.Header) {
if newHead == nil {
return
}
if head == nil || newHead.Hash() != head.Hash() {
head = newHead
targetView = s.newChainView(head)
}
}
for {
if blockProc != lastBlockProc {
select {
case s.filterMaps.BlockProcessingCh <- blockProc:
lastBlockProc = blockProc
case ev := <-headEventCh:
setHead(ev.Header)
case blockProc = <-blockProcCh:
//fmt.Println("block proc feed", blockProc)
case <-time.After(time.Second * 10):
setHead(s.blockchain.CurrentBlock())
case ch := <-s.closeFilterMaps:
close(ch)
return
}
} else if targetView != nil {
select {
case s.filterMaps.TargetViewCh <- targetView:
targetView = nil
case ev := <-headEventCh:
setHead(ev.Header)
case blockProc = <-blockProcCh:
//fmt.Println("block proc feed", blockProc)
case <-time.After(time.Second * 10):
setHead(s.blockchain.CurrentBlock())
case ch := <-s.closeFilterMaps:
close(ch)
return
}
} else {
select {
case ev := <-headEventCh:
setHead(ev.Header)
case <-time.After(time.Second * 10):
setHead(s.blockchain.CurrentBlock())
case blockProc = <-blockProcCh:
//fmt.Println("block proc feed", blockProc)
case ch := <-s.closeFilterMaps:
close(ch)
return
}
}
}
}
func (s *Ethereum) setupDiscovery() error {
eth.StartENRUpdater(s.blockchain, s.p2pServer.LocalNode())
@ -409,6 +504,10 @@ func (s *Ethereum) Stop() error {
// Then stop everything else.
s.bloomIndexer.Close()
close(s.closeBloomHandler)
ch := make(chan struct{})
s.closeFilterMaps <- ch
<-ch
s.filterMaps.Stop()
s.txPool.Close()
s.blockchain.Stop()
s.engine.Close()

@ -52,6 +52,7 @@ var Defaults = Config{
NetworkId: 0, // enable auto configuration of networkID == chainID
TxLookupLimit: 2350000,
TransactionHistory: 2350000,
LogHistory: 2350000,
StateHistory: params.FullImmutabilityThreshold,
DatabaseCache: 512,
TrieCleanCache: 154,
@ -93,8 +94,11 @@ type Config struct {
// Deprecated: use 'TransactionHistory' instead.
TxLookupLimit uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.
TransactionHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.
StateHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose state histories are reserved.
TransactionHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.
LogHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head where a log search index is maintained.
LogNoHistory bool `toml:",omitempty"` // No log search index is maintained.
LogExportCheckpoints string // export log index checkpoints to file
StateHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose state histories are reserved.
// State scheme represents the scheme used to store ethereum states and trie
// nodes on top. It can be 'hash', 'path', or none which means use the scheme

@ -19,12 +19,15 @@ package filters
import (
"context"
"errors"
"math"
"math/big"
"slices"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)
@ -38,36 +41,14 @@ type Filter struct {
block *common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks
matcher *bloombits.Matcher
rangeLogsTestHook chan rangeLogsTestEvent
}
// NewRangeFilter creates a new filter which uses a bloom filter on blocks to
// figure out whether a particular block is interesting or not.
func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
// Flatten the address and topic filter clauses into a single bloombits filter
// system. Since the bloombits are not positional, nil topics are permitted,
// which get flattened into a nil byte slice.
var filters [][][]byte
if len(addresses) > 0 {
filter := make([][]byte, len(addresses))
for i, address := range addresses {
filter[i] = address.Bytes()
}
filters = append(filters, filter)
}
for _, topicList := range topics {
filter := make([][]byte, len(topicList))
for i, topic := range topicList {
filter[i] = topic.Bytes()
}
filters = append(filters, filter)
}
size, _ := sys.backend.BloomStatus()
// Create a generic filter and convert it into a range filter
filter := newFilter(sys, addresses, topics)
filter.matcher = bloombits.NewMatcher(size, filters)
filter.begin = begin
filter.end = end
@ -113,161 +94,259 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
return nil, errPendingLogsUnsupported
}
resolveSpecial := func(number int64) (int64, error) {
var hdr *types.Header
resolveSpecial := func(number int64) (uint64, error) {
switch number {
case rpc.LatestBlockNumber.Int64(), rpc.PendingBlockNumber.Int64():
// we should return head here since we've already captured
// that we need to get the pending logs in the pending boolean above
hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if hdr == nil {
return 0, errors.New("latest header not found")
}
case rpc.LatestBlockNumber.Int64():
// when searching from and/or until the current head, we resolve it
// to MaxUint64 which is translated by rangeLogs to the actual head
// in each iteration, ensuring that the head block will be searched
// even if the chain is updated during search.
return math.MaxUint64, nil
case rpc.FinalizedBlockNumber.Int64():
hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.FinalizedBlockNumber)
hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.FinalizedBlockNumber)
if hdr == nil {
return 0, errors.New("finalized header not found")
}
return hdr.Number.Uint64(), nil
case rpc.SafeBlockNumber.Int64():
hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.SafeBlockNumber)
hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.SafeBlockNumber)
if hdr == nil {
return 0, errors.New("safe header not found")
}
default:
return number, nil
return hdr.Number.Uint64(), nil
}
if number < 0 {
return 0, errors.New("negative block number")
}
return hdr.Number.Int64(), nil
return uint64(number), nil
}
var err error
// range query need to resolve the special begin/end block number
if f.begin, err = resolveSpecial(f.begin); err != nil {
begin, err := resolveSpecial(f.begin)
if err != nil {
return nil, err
}
if f.end, err = resolveSpecial(f.end); err != nil {
end, err := resolveSpecial(f.end)
if err != nil {
return nil, err
}
logChan, errChan := f.rangeLogsAsync(ctx)
var logs []*types.Log
for {
select {
case log := <-logChan:
logs = append(logs, log)
case err := <-errChan:
return logs, err
}
}
return f.rangeLogs(ctx, begin, end)
}
// rangeLogsAsync retrieves block-range logs that match the filter criteria asynchronously,
// it creates and returns two channels: one for delivering log data, and one for reporting errors.
func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *types.Log, chan error) {
var (
logChan = make(chan *types.Log)
errChan = make(chan error)
)
const (
rangeLogsTestSync = iota
rangeLogsTestTrimmed
rangeLogsTestIndexed
rangeLogsTestUnindexed
rangeLogsTestDone
)
go func() {
type rangeLogsTestEvent struct {
event int
begin, end uint64
}
func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([]*types.Log, error) {
if f.rangeLogsTestHook != nil {
defer func() {
close(errChan)
close(logChan)
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestDone, 0, 0}
close(f.rangeLogsTestHook)
}()
}
// Gather all indexed logs, and finish with non indexed ones
var (
end = uint64(f.end)
size, sections = f.sys.backend.BloomStatus()
err error
)
if indexed := sections * size; indexed > uint64(f.begin) {
if indexed > end {
indexed = end + 1
}
if err = f.indexedLogs(ctx, indexed-1, logChan); err != nil {
errChan <- err
return
}
}
if err := f.unindexedLogs(ctx, end, logChan); err != nil {
errChan <- err
return
}
errChan <- nil
}()
return logChan, errChan
}
if firstBlock > lastBlock {
return nil, nil
}
// indexedLogs returns the logs matching the filter criteria based on the bloom
// bits indexed available locally or via the network.
func (f *Filter) indexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error {
// Create a matcher session and request servicing from the backend
matches := make(chan uint64, 64)
mb := f.sys.backend.NewMatcherBackend()
defer mb.Close()
session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
// enforce a consistent state before starting the search in order to be able
// to determine valid range later
syncRange, err := mb.SyncLogIndex(ctx)
if err != nil {
return err
return nil, err
}
if !syncRange.Indexed {
// fallback to completely unindexed search
headNum := syncRange.HeadNumber
if firstBlock > headNum {
firstBlock = headNum
}
if lastBlock > headNum {
lastBlock = headNum
}
if f.rangeLogsTestHook != nil {
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, firstBlock, lastBlock}
}
return f.unindexedLogs(ctx, firstBlock, lastBlock)
}
defer session.Close()
f.sys.backend.ServiceFilter(ctx, session)
headBlock := syncRange.HeadNumber // Head is guaranteed != nil
// if haveMatches == true then matches correspond to the block number range
// between matchFirst and matchLast
var (
matches []*types.Log
haveMatches, forceUnindexed bool
matchFirst, matchLast uint64
)
trimMatches := func(trimFirst, trimLast uint64) {
if !haveMatches {
return
}
if trimLast < matchFirst || trimFirst > matchLast {
matches, haveMatches, matchFirst, matchLast = nil, false, 0, 0
return
}
if trimFirst > matchFirst {
for len(matches) > 0 && matches[0].BlockNumber < trimFirst {
matches = matches[1:]
}
matchFirst = trimFirst
}
if trimLast < matchLast {
for len(matches) > 0 && matches[len(matches)-1].BlockNumber > trimLast {
matches = matches[:len(matches)-1]
}
matchLast = trimLast
}
}
for {
select {
case number, ok := <-matches:
// Abort if all matches have been fulfilled
if !ok {
err := session.Error()
if err == nil {
f.begin = int64(end) + 1
// determine range to be searched; for simplicity we only extend the most
// recent end of the existing match set by matching between searchFirst
// and searchLast.
searchFirst, searchLast := firstBlock, lastBlock
if searchFirst > headBlock {
searchFirst = headBlock
}
if searchLast > headBlock {
searchLast = headBlock
}
trimMatches(searchFirst, searchLast)
if haveMatches && matchFirst == searchFirst && matchLast == searchLast {
return matches, nil
}
var trimTailIfNotValid uint64
if haveMatches && matchFirst > searchFirst {
// missing tail section; do unindexed search
if f.rangeLogsTestHook != nil {
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, searchFirst, matchFirst - 1}
}
tailMatches, err := f.unindexedLogs(ctx, searchFirst, matchFirst-1)
if err != nil {
return matches, err
}
matches = append(tailMatches, matches...)
matchFirst = searchFirst
// unindexed results are not affected by valid tail; do not trim tail
trimTailIfNotValid = math.MaxUint64
} else {
// if we have matches, they start at searchFirst
if haveMatches {
searchFirst = matchLast + 1
if !syncRange.Indexed || syncRange.FirstIndexed > searchFirst {
forceUnindexed = true
}
return err
}
f.begin = int64(number) + 1
// Retrieve the suggested block and pull any truly matching logs
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
if header == nil || err != nil {
return err
var newMatches []*types.Log
if !syncRange.Indexed || syncRange.FirstIndexed > searchLast || syncRange.LastIndexed < searchFirst {
forceUnindexed = true
}
if !forceUnindexed {
if syncRange.FirstIndexed > searchFirst {
searchFirst = syncRange.FirstIndexed
}
if syncRange.LastIndexed < searchLast {
searchLast = syncRange.LastIndexed
}
if f.rangeLogsTestHook != nil {
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestIndexed, searchFirst, searchLast}
}
newMatches, err = f.indexedLogs(ctx, mb, searchFirst, searchLast)
// trim tail if it affects the indexed search range
trimTailIfNotValid = searchFirst
if err == filtermaps.ErrMatchAll {
// "match all" filters are not supported by filtermaps; fall back
// to unindexed search which is the most efficient in this case
forceUnindexed = true
}
}
if forceUnindexed {
if f.rangeLogsTestHook != nil {
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, searchFirst, searchLast}
}
newMatches, err = f.unindexedLogs(ctx, searchFirst, searchLast)
// unindexed results are not affected by valid tail; do not trim tail
trimTailIfNotValid = math.MaxUint64
}
found, err := f.checkMatches(ctx, header)
if err != nil {
return err
return matches, err
}
for _, log := range found {
logChan <- log
if !haveMatches {
matches = newMatches
haveMatches, matchFirst, matchLast = true, searchFirst, searchLast
} else {
matches = append(matches, newMatches...)
matchLast = searchLast
}
}
case <-ctx.Done():
return ctx.Err()
if f.rangeLogsTestHook != nil {
f.rangeLogsTestHook <- rangeLogsTestEvent{event: rangeLogsTestSync, begin: matchFirst, end: matchLast}
}
syncRange, err = mb.SyncLogIndex(ctx)
if err != nil {
return matches, err
}
headBlock = syncRange.HeadNumber // Head is guaranteed != nil
if !syncRange.Valid {
matches, haveMatches, matchFirst, matchLast = nil, false, 0, 0
} else {
if syncRange.FirstValid > trimTailIfNotValid {
trimMatches(syncRange.FirstValid, syncRange.LastValid)
} else {
trimMatches(0, syncRange.LastValid)
}
}
if f.rangeLogsTestHook != nil {
f.rangeLogsTestHook <- rangeLogsTestEvent{event: rangeLogsTestTrimmed, begin: matchFirst, end: matchLast}
}
}
}
func (f *Filter) indexedLogs(ctx context.Context, mb filtermaps.MatcherBackend, begin, end uint64) ([]*types.Log, error) {
start := time.Now()
potentialMatches, err := filtermaps.GetPotentialMatches(ctx, mb, begin, end, f.addresses, f.topics)
matches := filterLogs(potentialMatches, nil, nil, f.addresses, f.topics)
log.Trace("Performed indexed log search", "begin", begin, "end", end, "true matches", len(matches), "false positives", len(potentialMatches)-len(matches), "elapsed", common.PrettyDuration(time.Since(start)))
return matches, err
}
// unindexedLogs returns the logs matching the filter criteria based on raw block
// iteration and bloom matching.
func (f *Filter) unindexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error {
for ; f.begin <= int64(end); f.begin++ {
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
func (f *Filter) unindexedLogs(ctx context.Context, begin, end uint64) ([]*types.Log, error) {
start := time.Now()
log.Warn("Performing unindexed log search", "begin", begin, "end", end)
var matches []*types.Log
for blockNumber := begin; blockNumber <= end; blockNumber++ {
select {
case <-ctx.Done():
return matches, ctx.Err()
default:
}
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(blockNumber))
if header == nil || err != nil {
return err
return matches, err
}
found, err := f.blockLogs(ctx, header)
if err != nil {
return err
}
for _, log := range found {
select {
case logChan <- log:
case <-ctx.Done():
return ctx.Err()
}
return matches, err
}
matches = append(matches, found...)
}
return nil
log.Trace("Performed unindexed log search", "begin", begin, "end", end, "matches", len(matches), "elapsed", common.PrettyDuration(time.Since(start)))
return matches, nil
}
// blockLogs returns the logs matching the filter criteria within a single block.

@ -29,7 +29,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@ -69,8 +69,7 @@ type Backend interface {
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
BloomStatus() (uint64, uint64)
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
NewMatcherBackend() filtermaps.MatcherBackend
}
// FilterSystem holds resources shared by all filters.

@ -20,7 +20,6 @@ import (
"context"
"errors"
"math/big"
"math/rand"
"reflect"
"runtime"
"testing"
@ -29,7 +28,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
@ -41,7 +40,7 @@ import (
type testBackend struct {
db ethdb.Database
sections uint64
fm *filtermaps.FilterMaps
txFeed event.Feed
logsFeed event.Feed
rmLogsFeed event.Feed
@ -59,10 +58,28 @@ func (b *testBackend) CurrentHeader() *types.Header {
return hdr
}
func (b *testBackend) CurrentBlock() *types.Header {
return b.CurrentHeader()
}
func (b *testBackend) ChainDb() ethdb.Database {
return b.db
}
func (b *testBackend) GetCanonicalHash(number uint64) common.Hash {
return rawdb.ReadCanonicalHash(b.db, number)
}
func (b *testBackend) GetHeader(hash common.Hash, number uint64) *types.Header {
hdr, _ := b.HeaderByHash(context.Background(), hash)
return hdr
}
func (b *testBackend) GetReceiptsByHash(hash common.Hash) types.Receipts {
r, _ := b.GetReceipts(context.Background(), hash)
return r
}
func (b *testBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) {
var (
hash common.Hash
@ -137,35 +154,19 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
return b.chainFeed.Subscribe(ch)
}
func (b *testBackend) BloomStatus() (uint64, uint64) {
return params.BloomBitsBlocks, b.sections
func (b *testBackend) NewMatcherBackend() filtermaps.MatcherBackend {
return b.fm.NewMatcherBackend()
}
func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
requests := make(chan chan *bloombits.Retrieval)
go session.Multiplex(16, 0, requests)
go func() {
for {
// Wait for a service request or a shutdown
select {
case <-ctx.Done():
return
case request := <-requests:
task := <-request
func (b *testBackend) startFilterMaps(history uint64, noHistory bool) {
b.fm = filtermaps.NewFilterMaps(b.db, b, filtermaps.DefaultParams, history, 1, noHistory)
b.fm.Start()
b.fm.WaitIdle()
}
task.Bitsets = make([][]byte, len(task.Sections))
for i, section := range task.Sections {
if rand.Int()%4 != 0 { // Handle occasional missing deliveries
head := rawdb.ReadCanonicalHash(b.db, (section+1)*params.BloomBitsBlocks-1)
task.Bitsets[i], _ = rawdb.ReadBloomBits(b.db, task.Bit, section, head)
}
}
request <- task
}
}
}()
func (b *testBackend) stopFilterMaps() {
b.fm.Stop()
b.fm = nil
}
func (b *testBackend) setPending(block *types.Block, receipts types.Receipts) {

@ -46,15 +46,27 @@ func makeReceipt(addr common.Address) *types.Receipt {
return receipt
}
func BenchmarkFilters(b *testing.B) {
func BenchmarkFiltersIndexed(b *testing.B) {
benchmarkFilters(b, 0, false)
}
func BenchmarkFiltersHalfIndexed(b *testing.B) {
benchmarkFilters(b, 50000, false)
}
func BenchmarkFiltersUnindexed(b *testing.B) {
benchmarkFilters(b, 0, true)
}
func benchmarkFilters(b *testing.B, history uint64, noHistory bool) {
var (
db = rawdb.NewMemoryDatabase()
_, sys = newTestFilterSystem(b, db, Config{})
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = common.BytesToAddress([]byte("jeff"))
addr3 = common.BytesToAddress([]byte("ethereum"))
addr4 = common.BytesToAddress([]byte("random addresses please"))
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(b, db, Config{})
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = common.BytesToAddress([]byte("jeff"))
addr3 = common.BytesToAddress([]byte("ethereum"))
addr4 = common.BytesToAddress([]byte("random addresses please"))
gspec = &core.Genesis{
Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(1000000)}},
@ -94,9 +106,12 @@ func BenchmarkFilters(b *testing.B) {
rawdb.WriteHeadBlockHash(db, block.Hash())
rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i])
}
backend.startFilterMaps(history, noHistory)
defer backend.stopFilterMaps()
b.ResetTimer()
filter := sys.NewRangeFilter(0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil)
filter := sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), []common.Address{addr1, addr2, addr3, addr4}, nil)
for i := 0; i < b.N; i++ {
filter.begin = 0
@ -107,7 +122,19 @@ func BenchmarkFilters(b *testing.B) {
}
}
func TestFilters(t *testing.T) {
func TestFiltersIndexed(t *testing.T) {
testFilters(t, 0, false)
}
func TestFiltersHalfIndexed(t *testing.T) {
testFilters(t, 500, false)
}
func TestFiltersUnindexed(t *testing.T) {
testFilters(t, 0, true)
}
func testFilters(t *testing.T, history uint64, noHistory bool) {
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
@ -279,6 +306,9 @@ func TestFilters(t *testing.T) {
})
backend.setPending(pchain[0], preceipts[0])
backend.startFilterMaps(history, noHistory)
defer backend.stopFilterMaps()
for i, tc := range []struct {
f *Filter
want string
@ -387,3 +417,137 @@ func TestFilters(t *testing.T) {
}
})
}
func TestRangeLogs(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
gspec = &core.Genesis{
Config: params.TestChainConfig,
Alloc: types.GenesisAlloc{},
BaseFee: big.NewInt(params.InitialBaseFee),
}
)
_, err := gspec.Commit(db, triedb.NewDatabase(db, nil))
if err != nil {
t.Fatal(err)
}
chain, _ := core.GenerateChain(gspec.Config, gspec.ToBlock(), ethash.NewFaker(), db, 1000, func(i int, gen *core.BlockGen) {})
var l uint64
bc, err := core.NewBlockChain(db, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, &l)
if err != nil {
t.Fatal(err)
}
_, err = bc.InsertChain(chain[:600])
if err != nil {
t.Fatal(err)
}
backend.startFilterMaps(200, false)
defer backend.stopFilterMaps()
var (
testCase, event int
filter *Filter
addresses = []common.Address{common.Address{}}
)
newFilter := func(begin, end int64) {
testCase++
event = 0
filter = sys.NewRangeFilter(begin, end, addresses, nil)
filter.rangeLogsTestHook = make(chan rangeLogsTestEvent)
go func(filter *Filter) {
filter.Logs(context.Background())
// ensure that filter will not be blocked if we exit early
for range filter.rangeLogsTestHook {
}
}(filter)
}
expEvent := func(exp rangeLogsTestEvent) {
event++
ev := <-filter.rangeLogsTestHook
if ev != exp {
t.Fatalf("Test case #%d: wrong test event #%d received (got %v, expected %v)", testCase, event, ev, exp)
}
}
// test case #1
newFilter(300, 500)
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 401, 500})
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 401, 500})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 401, 500})
expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 300, 400})
if _, err := bc.InsertChain(chain[600:700]); err != nil {
t.Fatal(err)
}
backend.fm.WaitIdle()
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 300, 500})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 300, 500}) // unindexed search is not affected by trimmed tail
expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0})
// test case #2
newFilter(400, int64(rpc.LatestBlockNumber))
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 501, 700})
if _, err := bc.InsertChain(chain[700:800]); err != nil {
t.Fatal(err)
}
backend.fm.WaitIdle()
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 501, 700})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 601, 700})
expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 600})
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 700})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 700})
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 701, 800})
if err := bc.SetHead(750); err != nil {
t.Fatal(err)
}
backend.fm.WaitIdle()
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 800})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0})
// test case #3
newFilter(int64(rpc.LatestBlockNumber), int64(rpc.LatestBlockNumber))
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 750, 750})
if err := bc.SetHead(740); err != nil {
t.Fatal(err)
}
backend.fm.WaitIdle()
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 750, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 0, 0})
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 740, 740})
if _, err := bc.InsertChain(chain[740:750]); err != nil {
t.Fatal(err)
}
backend.fm.WaitIdle()
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 740, 740})
// trimmed at the beginning of the next iteration
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 740, 740})
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 750, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 750, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 750, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0})
// test case #4
newFilter(400, int64(rpc.LatestBlockNumber))
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 551, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 551, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 551, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 550})
if _, err := bc.InsertChain(chain[750:1000]); err != nil {
t.Fatal(err)
}
backend.fm.WaitIdle()
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 750})
// indexed range affected by tail pruning so we have to discard the entire
// match set
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 0, 0})
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 801, 1000})
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 801, 1000})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 801, 1000})
expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 800})
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 1000})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 1000})
}

@ -45,7 +45,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/beacon"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
@ -620,11 +620,9 @@ func (b testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent)
func (b testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
panic("implement me")
}
func (b testBackend) BloomStatus() (uint64, uint64) { panic("implement me") }
func (b testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
func (b testBackend) NewMatcherBackend() filtermaps.MatcherBackend {
panic("implement me")
}
func TestEstimateGas(t *testing.T) {
t.Parallel()
// Initialize test accounts

@ -27,7 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
@ -93,8 +93,8 @@ type Backend interface {
GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error)
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
BloomStatus() (uint64, uint64)
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
NewMatcherBackend() filtermaps.MatcherBackend
}
func GetAPIs(apiBackend Backend) []rpc.API {

@ -30,7 +30,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
@ -393,12 +393,12 @@ func (b *backendMock) TxPoolContent() (map[common.Address][]*types.Transaction,
func (b *backendMock) TxPoolContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) {
return nil, nil
}
func (b *backendMock) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription { return nil }
func (b *backendMock) BloomStatus() (uint64, uint64) { return 0, 0 }
func (b *backendMock) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {}
func (b *backendMock) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return nil }
func (b *backendMock) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription { return nil }
func (b *backendMock) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return nil }
func (b *backendMock) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
return nil
}
func (b *backendMock) Engine() consensus.Engine { return nil }
func (b *backendMock) NewMatcherBackend() filtermaps.MatcherBackend { return nil }

@ -46,7 +46,7 @@ var (
// - Version 1: storage.Incomplete field is removed
// - Version 2: add post-modification state values
// - Version 3: a flag has been added to indicate whether the storage slot key is the raw key or a hash
const journalVersion uint64 = 3
const journalVersion uint64 = 4 //TODO xxx
// loadJournal tries to parse the layer journal from the disk.
func (db *Database) loadJournal(diskRoot common.Hash) (layer, error) {

Loading…
Cancel
Save