From 86f52e7580851f02b0737e03c04647ae30bd4a6b Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Sat, 14 Dec 2024 16:16:52 +0100 Subject: [PATCH] eth/filters: implement log filter using new log index --- cmd/geth/chaincmd.go | 3 + cmd/geth/main.go | 3 + cmd/utils/flags.go | 25 ++ core/blockchain.go | 14 +- eth/api_backend.go | 5 + eth/backend.go | 99 +++++++ eth/ethconfig/config.go | 8 +- eth/filters/filter.go | 339 ++++++++++++++--------- eth/filters/filter_system.go | 5 +- eth/filters/filter_system_test.go | 59 ++-- eth/filters/filter_test.go | 184 +++++++++++- internal/ethapi/api_test.go | 6 +- internal/ethapi/backend.go | 6 +- internal/ethapi/transaction_args_test.go | 10 +- triedb/pathdb/journal.go | 2 +- 15 files changed, 579 insertions(+), 189 deletions(-) diff --git a/cmd/geth/chaincmd.go b/cmd/geth/chaincmd.go index bbadb1cc19..a39ebcc02f 100644 --- a/cmd/geth/chaincmd.go +++ b/cmd/geth/chaincmd.go @@ -100,6 +100,9 @@ if one is set. Otherwise it prints the genesis from the datadir.`, utils.VMTraceFlag, utils.VMTraceJsonConfigFlag, utils.TransactionHistoryFlag, + utils.LogHistoryFlag, + utils.LogNoHistoryFlag, + utils.LogExportCheckpointsFlag, utils.StateHistoryFlag, }, utils.DatabaseFlags), Description: ` diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 9d9256862b..19c1af48a6 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -86,6 +86,9 @@ var ( utils.SnapshotFlag, utils.TxLookupLimitFlag, // deprecated utils.TransactionHistoryFlag, + utils.LogHistoryFlag, + utils.LogNoHistoryFlag, + utils.LogExportCheckpointsFlag, utils.StateHistoryFlag, utils.LightServeFlag, // deprecated utils.LightIngressFlag, // deprecated diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index f079df83b9..2ad6104d42 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -272,6 +272,22 @@ var ( Value: ethconfig.Defaults.TransactionHistory, Category: flags.StateCategory, } + LogHistoryFlag = &cli.Uint64Flag{ + Name: "history.logs", + Usage: "Number of recent blocks to maintain log search index for (default = about one year, 0 = entire chain)", + Value: ethconfig.Defaults.LogHistory, + Category: flags.StateCategory, + } + LogNoHistoryFlag = &cli.BoolFlag{ + Name: "history.logs.disable", + Usage: "Do not maintain log search index", + Category: flags.StateCategory, + } + LogExportCheckpointsFlag = &cli.StringFlag{ + Name: "history.logs.export", + Usage: "Export checkpoints to file in go source file format", + Value: "", + } // Beacon client light sync settings BeaconApiFlag = &cli.StringSliceFlag{ Name: "beacon.api", @@ -1662,6 +1678,15 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { cfg.StateScheme = rawdb.HashScheme log.Warn("Forcing hash state-scheme for archive mode") } + if ctx.IsSet(LogHistoryFlag.Name) { + cfg.LogHistory = ctx.Uint64(LogHistoryFlag.Name) + } + if ctx.IsSet(LogNoHistoryFlag.Name) { + cfg.LogNoHistory = true + } + if ctx.IsSet(LogExportCheckpointsFlag.Name) { + cfg.LogExportCheckpoints = ctx.String(LogExportCheckpointsFlag.Name) + } if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheTrieFlag.Name) { cfg.TrieCleanCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheTrieFlag.Name) / 100 } diff --git a/core/blockchain.go b/core/blockchain.go index 6aac541ba0..50a7442940 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -228,6 +228,7 @@ type BlockChain struct { chainHeadFeed event.Feed logsFeed event.Feed blockProcFeed event.Feed + blockProcFeedCount int scope event.SubscriptionScope genesisBlock *types.Block @@ -1577,8 +1578,6 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { if len(chain) == 0 { return 0, nil } - bc.blockProcFeed.Send(true) - defer bc.blockProcFeed.Send(false) // Do a sanity check that the provided chain is actually ordered and linked. for i := 1; i < len(chain); i++ { @@ -1618,6 +1617,17 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness if bc.insertStopped() { return nil, 0, nil } + if bc.blockProcFeedCount == 0 { + bc.blockProcFeed.Send(true) + } + bc.blockProcFeedCount++ + defer func() { + bc.blockProcFeedCount-- + if bc.blockProcFeedCount == 0 { + bc.blockProcFeed.Send(false) + } + }() + // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) SenderCacher().RecoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number(), chain[0].Time()), chain) diff --git a/eth/api_backend.go b/eth/api_backend.go index be2101c6ec..b7dfaad4c4 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/consensus/misc/eip4844" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/txpool" @@ -411,6 +412,10 @@ func (b *EthAPIBackend) ServiceFilter(ctx context.Context, session *bloombits.Ma } } +func (b *EthAPIBackend) NewMatcherBackend() filtermaps.MatcherBackend { + return b.eth.filterMaps.NewMatcherBackend() +} + func (b *EthAPIBackend) Engine() consensus.Engine { return b.eth.engine } diff --git a/eth/backend.go b/eth/backend.go index a3aa0a7b9b..8e7a44b5cf 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -23,6 +23,7 @@ import ( "math/big" "runtime" "sync" + "time" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" @@ -30,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/pruner" "github.com/ethereum/go-ethereum/core/txpool" @@ -85,6 +87,9 @@ type Ethereum struct { bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports closeBloomHandler chan struct{} + filterMaps *filtermaps.FilterMaps + closeFilterMaps chan chan struct{} + APIBackend *EthAPIBackend miner *miner.Miner @@ -222,6 +227,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { return nil, err } eth.bloomIndexer.Start(eth.blockchain) + eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.newChainView(eth.blockchain.CurrentBlock()), filtermaps.DefaultParams, config.LogHistory, 1000, config.LogNoHistory, config.LogExportCheckpoints) + eth.closeFilterMaps = make(chan chan struct{}) if config.BlobPool.Datadir != "" { config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir) @@ -364,9 +371,97 @@ func (s *Ethereum) Start() error { // Start the networking layer s.handler.Start(s.p2pServer.MaxPeers) + + // start log indexer + s.filterMaps.Start() + go s.updateFilterMapsHeads() return nil } +func (s *Ethereum) newChainView(head *types.Header) *filtermaps.StoredChainView { + if head == nil { + return nil + } + return filtermaps.NewStoredChainView(s.blockchain, head.Number.Uint64(), head.Hash()) +} + +func (s *Ethereum) updateFilterMapsHeads() { + headEventCh := make(chan core.ChainEvent, 10) + blockProcCh := make(chan bool, 10) + sub := s.blockchain.SubscribeChainEvent(headEventCh) + sub2 := s.blockchain.SubscribeBlockProcessingEvent(blockProcCh) + defer func() { + sub.Unsubscribe() + sub2.Unsubscribe() + for { + select { + case <-headEventCh: + case <-blockProcCh: + default: + return + } + } + }() + + head := s.blockchain.CurrentBlock() + targetView := s.newChainView(head) // nil if already sent to channel + var blockProc, lastBlockProc bool + + setHead := func(newHead *types.Header) { + if newHead == nil { + return + } + if head == nil || newHead.Hash() != head.Hash() { + head = newHead + targetView = s.newChainView(head) + } + } + + for { + if blockProc != lastBlockProc { + select { + case s.filterMaps.BlockProcessingCh <- blockProc: + lastBlockProc = blockProc + case ev := <-headEventCh: + setHead(ev.Header) + case blockProc = <-blockProcCh: + //fmt.Println("block proc feed", blockProc) + case <-time.After(time.Second * 10): + setHead(s.blockchain.CurrentBlock()) + case ch := <-s.closeFilterMaps: + close(ch) + return + } + } else if targetView != nil { + select { + case s.filterMaps.TargetViewCh <- targetView: + targetView = nil + case ev := <-headEventCh: + setHead(ev.Header) + case blockProc = <-blockProcCh: + //fmt.Println("block proc feed", blockProc) + case <-time.After(time.Second * 10): + setHead(s.blockchain.CurrentBlock()) + case ch := <-s.closeFilterMaps: + close(ch) + return + } + } else { + select { + case ev := <-headEventCh: + setHead(ev.Header) + case <-time.After(time.Second * 10): + setHead(s.blockchain.CurrentBlock()) + case blockProc = <-blockProcCh: + //fmt.Println("block proc feed", blockProc) + case ch := <-s.closeFilterMaps: + close(ch) + return + } + } + } +} + func (s *Ethereum) setupDiscovery() error { eth.StartENRUpdater(s.blockchain, s.p2pServer.LocalNode()) @@ -409,6 +504,10 @@ func (s *Ethereum) Stop() error { // Then stop everything else. s.bloomIndexer.Close() close(s.closeBloomHandler) + ch := make(chan struct{}) + s.closeFilterMaps <- ch + <-ch + s.filterMaps.Stop() s.txPool.Close() s.blockchain.Stop() s.engine.Close() diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 6b75ab816f..f8286036bf 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -52,6 +52,7 @@ var Defaults = Config{ NetworkId: 0, // enable auto configuration of networkID == chainID TxLookupLimit: 2350000, TransactionHistory: 2350000, + LogHistory: 2350000, StateHistory: params.FullImmutabilityThreshold, DatabaseCache: 512, TrieCleanCache: 154, @@ -93,8 +94,11 @@ type Config struct { // Deprecated: use 'TransactionHistory' instead. TxLookupLimit uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved. - TransactionHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved. - StateHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose state histories are reserved. + TransactionHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved. + LogHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head where a log search index is maintained. + LogNoHistory bool `toml:",omitempty"` // No log search index is maintained. + LogExportCheckpoints string // export log index checkpoints to file + StateHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose state histories are reserved. // State scheme represents the scheme used to store ethereum states and trie // nodes on top. It can be 'hash', 'path', or none which means use the scheme diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 09ccb93907..7819e110fb 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -19,12 +19,15 @@ package filters import ( "context" "errors" + "math" "math/big" "slices" + "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" ) @@ -38,36 +41,14 @@ type Filter struct { block *common.Hash // Block hash if filtering a single block begin, end int64 // Range interval if filtering multiple blocks - matcher *bloombits.Matcher + rangeLogsTestHook chan rangeLogsTestEvent } // NewRangeFilter creates a new filter which uses a bloom filter on blocks to // figure out whether a particular block is interesting or not. func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { - // Flatten the address and topic filter clauses into a single bloombits filter - // system. Since the bloombits are not positional, nil topics are permitted, - // which get flattened into a nil byte slice. - var filters [][][]byte - if len(addresses) > 0 { - filter := make([][]byte, len(addresses)) - for i, address := range addresses { - filter[i] = address.Bytes() - } - filters = append(filters, filter) - } - for _, topicList := range topics { - filter := make([][]byte, len(topicList)) - for i, topic := range topicList { - filter[i] = topic.Bytes() - } - filters = append(filters, filter) - } - size, _ := sys.backend.BloomStatus() - // Create a generic filter and convert it into a range filter filter := newFilter(sys, addresses, topics) - - filter.matcher = bloombits.NewMatcher(size, filters) filter.begin = begin filter.end = end @@ -113,161 +94,259 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { return nil, errPendingLogsUnsupported } - resolveSpecial := func(number int64) (int64, error) { - var hdr *types.Header + resolveSpecial := func(number int64) (uint64, error) { switch number { - case rpc.LatestBlockNumber.Int64(), rpc.PendingBlockNumber.Int64(): - // we should return head here since we've already captured - // that we need to get the pending logs in the pending boolean above - hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) - if hdr == nil { - return 0, errors.New("latest header not found") - } + case rpc.LatestBlockNumber.Int64(): + // when searching from and/or until the current head, we resolve it + // to MaxUint64 which is translated by rangeLogs to the actual head + // in each iteration, ensuring that the head block will be searched + // even if the chain is updated during search. + return math.MaxUint64, nil case rpc.FinalizedBlockNumber.Int64(): - hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.FinalizedBlockNumber) + hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.FinalizedBlockNumber) if hdr == nil { return 0, errors.New("finalized header not found") } + return hdr.Number.Uint64(), nil case rpc.SafeBlockNumber.Int64(): - hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.SafeBlockNumber) + hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.SafeBlockNumber) if hdr == nil { return 0, errors.New("safe header not found") } - default: - return number, nil + return hdr.Number.Uint64(), nil + } + if number < 0 { + return 0, errors.New("negative block number") } - return hdr.Number.Int64(), nil + return uint64(number), nil } - var err error // range query need to resolve the special begin/end block number - if f.begin, err = resolveSpecial(f.begin); err != nil { + begin, err := resolveSpecial(f.begin) + if err != nil { return nil, err } - if f.end, err = resolveSpecial(f.end); err != nil { + end, err := resolveSpecial(f.end) + if err != nil { return nil, err } - - logChan, errChan := f.rangeLogsAsync(ctx) - var logs []*types.Log - for { - select { - case log := <-logChan: - logs = append(logs, log) - case err := <-errChan: - return logs, err - } - } + return f.rangeLogs(ctx, begin, end) } -// rangeLogsAsync retrieves block-range logs that match the filter criteria asynchronously, -// it creates and returns two channels: one for delivering log data, and one for reporting errors. -func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *types.Log, chan error) { - var ( - logChan = make(chan *types.Log) - errChan = make(chan error) - ) +const ( + rangeLogsTestSync = iota + rangeLogsTestTrimmed + rangeLogsTestIndexed + rangeLogsTestUnindexed + rangeLogsTestDone +) - go func() { +type rangeLogsTestEvent struct { + event int + begin, end uint64 +} + +func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([]*types.Log, error) { + if f.rangeLogsTestHook != nil { defer func() { - close(errChan) - close(logChan) + f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestDone, 0, 0} + close(f.rangeLogsTestHook) }() + } - // Gather all indexed logs, and finish with non indexed ones - var ( - end = uint64(f.end) - size, sections = f.sys.backend.BloomStatus() - err error - ) - if indexed := sections * size; indexed > uint64(f.begin) { - if indexed > end { - indexed = end + 1 - } - if err = f.indexedLogs(ctx, indexed-1, logChan); err != nil { - errChan <- err - return - } - } - - if err := f.unindexedLogs(ctx, end, logChan); err != nil { - errChan <- err - return - } - - errChan <- nil - }() - - return logChan, errChan -} + if firstBlock > lastBlock { + return nil, nil + } -// indexedLogs returns the logs matching the filter criteria based on the bloom -// bits indexed available locally or via the network. -func (f *Filter) indexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error { - // Create a matcher session and request servicing from the backend - matches := make(chan uint64, 64) + mb := f.sys.backend.NewMatcherBackend() + defer mb.Close() - session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches) + // enforce a consistent state before starting the search in order to be able + // to determine valid range later + syncRange, err := mb.SyncLogIndex(ctx) if err != nil { - return err + return nil, err + } + if !syncRange.Indexed { + // fallback to completely unindexed search + headNum := syncRange.HeadNumber + if firstBlock > headNum { + firstBlock = headNum + } + if lastBlock > headNum { + lastBlock = headNum + } + if f.rangeLogsTestHook != nil { + f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, firstBlock, lastBlock} + } + return f.unindexedLogs(ctx, firstBlock, lastBlock) } - defer session.Close() - f.sys.backend.ServiceFilter(ctx, session) + headBlock := syncRange.HeadNumber // Head is guaranteed != nil + // if haveMatches == true then matches correspond to the block number range + // between matchFirst and matchLast + var ( + matches []*types.Log + haveMatches, forceUnindexed bool + matchFirst, matchLast uint64 + ) + trimMatches := func(trimFirst, trimLast uint64) { + if !haveMatches { + return + } + if trimLast < matchFirst || trimFirst > matchLast { + matches, haveMatches, matchFirst, matchLast = nil, false, 0, 0 + return + } + if trimFirst > matchFirst { + for len(matches) > 0 && matches[0].BlockNumber < trimFirst { + matches = matches[1:] + } + matchFirst = trimFirst + } + if trimLast < matchLast { + for len(matches) > 0 && matches[len(matches)-1].BlockNumber > trimLast { + matches = matches[:len(matches)-1] + } + matchLast = trimLast + } + } for { - select { - case number, ok := <-matches: - // Abort if all matches have been fulfilled - if !ok { - err := session.Error() - if err == nil { - f.begin = int64(end) + 1 + // determine range to be searched; for simplicity we only extend the most + // recent end of the existing match set by matching between searchFirst + // and searchLast. + searchFirst, searchLast := firstBlock, lastBlock + if searchFirst > headBlock { + searchFirst = headBlock + } + if searchLast > headBlock { + searchLast = headBlock + } + trimMatches(searchFirst, searchLast) + if haveMatches && matchFirst == searchFirst && matchLast == searchLast { + return matches, nil + } + var trimTailIfNotValid uint64 + if haveMatches && matchFirst > searchFirst { + // missing tail section; do unindexed search + if f.rangeLogsTestHook != nil { + f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, searchFirst, matchFirst - 1} + } + tailMatches, err := f.unindexedLogs(ctx, searchFirst, matchFirst-1) + if err != nil { + return matches, err + } + matches = append(tailMatches, matches...) + matchFirst = searchFirst + // unindexed results are not affected by valid tail; do not trim tail + trimTailIfNotValid = math.MaxUint64 + } else { + // if we have matches, they start at searchFirst + if haveMatches { + searchFirst = matchLast + 1 + if !syncRange.Indexed || syncRange.FirstIndexed > searchFirst { + forceUnindexed = true } - return err } - f.begin = int64(number) + 1 - - // Retrieve the suggested block and pull any truly matching logs - header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(number)) - if header == nil || err != nil { - return err + var newMatches []*types.Log + if !syncRange.Indexed || syncRange.FirstIndexed > searchLast || syncRange.LastIndexed < searchFirst { + forceUnindexed = true + } + if !forceUnindexed { + if syncRange.FirstIndexed > searchFirst { + searchFirst = syncRange.FirstIndexed + } + if syncRange.LastIndexed < searchLast { + searchLast = syncRange.LastIndexed + } + if f.rangeLogsTestHook != nil { + f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestIndexed, searchFirst, searchLast} + } + newMatches, err = f.indexedLogs(ctx, mb, searchFirst, searchLast) + // trim tail if it affects the indexed search range + trimTailIfNotValid = searchFirst + if err == filtermaps.ErrMatchAll { + // "match all" filters are not supported by filtermaps; fall back + // to unindexed search which is the most efficient in this case + forceUnindexed = true + } + } + if forceUnindexed { + if f.rangeLogsTestHook != nil { + f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, searchFirst, searchLast} + } + newMatches, err = f.unindexedLogs(ctx, searchFirst, searchLast) + // unindexed results are not affected by valid tail; do not trim tail + trimTailIfNotValid = math.MaxUint64 } - found, err := f.checkMatches(ctx, header) if err != nil { - return err + return matches, err } - for _, log := range found { - logChan <- log + if !haveMatches { + matches = newMatches + haveMatches, matchFirst, matchLast = true, searchFirst, searchLast + } else { + matches = append(matches, newMatches...) + matchLast = searchLast } + } - case <-ctx.Done(): - return ctx.Err() + if f.rangeLogsTestHook != nil { + f.rangeLogsTestHook <- rangeLogsTestEvent{event: rangeLogsTestSync, begin: matchFirst, end: matchLast} + } + syncRange, err = mb.SyncLogIndex(ctx) + if err != nil { + return matches, err + } + headBlock = syncRange.HeadNumber // Head is guaranteed != nil + if !syncRange.Valid { + matches, haveMatches, matchFirst, matchLast = nil, false, 0, 0 + } else { + if syncRange.FirstValid > trimTailIfNotValid { + trimMatches(syncRange.FirstValid, syncRange.LastValid) + } else { + trimMatches(0, syncRange.LastValid) + } + } + if f.rangeLogsTestHook != nil { + f.rangeLogsTestHook <- rangeLogsTestEvent{event: rangeLogsTestTrimmed, begin: matchFirst, end: matchLast} } } } +func (f *Filter) indexedLogs(ctx context.Context, mb filtermaps.MatcherBackend, begin, end uint64) ([]*types.Log, error) { + start := time.Now() + potentialMatches, err := filtermaps.GetPotentialMatches(ctx, mb, begin, end, f.addresses, f.topics) + matches := filterLogs(potentialMatches, nil, nil, f.addresses, f.topics) + log.Trace("Performed indexed log search", "begin", begin, "end", end, "true matches", len(matches), "false positives", len(potentialMatches)-len(matches), "elapsed", common.PrettyDuration(time.Since(start))) + return matches, err +} + // unindexedLogs returns the logs matching the filter criteria based on raw block // iteration and bloom matching. -func (f *Filter) unindexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error { - for ; f.begin <= int64(end); f.begin++ { - header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin)) +func (f *Filter) unindexedLogs(ctx context.Context, begin, end uint64) ([]*types.Log, error) { + start := time.Now() + log.Warn("Performing unindexed log search", "begin", begin, "end", end) + var matches []*types.Log + for blockNumber := begin; blockNumber <= end; blockNumber++ { + select { + case <-ctx.Done(): + return matches, ctx.Err() + default: + } + header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(blockNumber)) if header == nil || err != nil { - return err + return matches, err } found, err := f.blockLogs(ctx, header) if err != nil { - return err - } - for _, log := range found { - select { - case logChan <- log: - case <-ctx.Done(): - return ctx.Err() - } + return matches, err } + matches = append(matches, found...) } - return nil + log.Trace("Performed unindexed log search", "begin", begin, "end", end, "matches", len(matches), "elapsed", common.PrettyDuration(time.Since(start))) + return matches, nil } // blockLogs returns the logs matching the filter criteria within a single block. diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 86012b3f9a..7531a1ecfc 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -29,7 +29,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -69,8 +69,7 @@ type Backend interface { SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription - BloomStatus() (uint64, uint64) - ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) + NewMatcherBackend() filtermaps.MatcherBackend } // FilterSystem holds resources shared by all filters. diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index aec5ee4166..0160daaca4 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -20,7 +20,6 @@ import ( "context" "errors" "math/big" - "math/rand" "reflect" "runtime" "testing" @@ -29,7 +28,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" @@ -41,7 +40,7 @@ import ( type testBackend struct { db ethdb.Database - sections uint64 + fm *filtermaps.FilterMaps txFeed event.Feed logsFeed event.Feed rmLogsFeed event.Feed @@ -59,10 +58,28 @@ func (b *testBackend) CurrentHeader() *types.Header { return hdr } +func (b *testBackend) CurrentBlock() *types.Header { + return b.CurrentHeader() +} + func (b *testBackend) ChainDb() ethdb.Database { return b.db } +func (b *testBackend) GetCanonicalHash(number uint64) common.Hash { + return rawdb.ReadCanonicalHash(b.db, number) +} + +func (b *testBackend) GetHeader(hash common.Hash, number uint64) *types.Header { + hdr, _ := b.HeaderByHash(context.Background(), hash) + return hdr +} + +func (b *testBackend) GetReceiptsByHash(hash common.Hash) types.Receipts { + r, _ := b.GetReceipts(context.Background(), hash) + return r +} + func (b *testBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) { var ( hash common.Hash @@ -137,35 +154,19 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc return b.chainFeed.Subscribe(ch) } -func (b *testBackend) BloomStatus() (uint64, uint64) { - return params.BloomBitsBlocks, b.sections +func (b *testBackend) NewMatcherBackend() filtermaps.MatcherBackend { + return b.fm.NewMatcherBackend() } -func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { - requests := make(chan chan *bloombits.Retrieval) - - go session.Multiplex(16, 0, requests) - go func() { - for { - // Wait for a service request or a shutdown - select { - case <-ctx.Done(): - return - - case request := <-requests: - task := <-request +func (b *testBackend) startFilterMaps(history uint64, noHistory bool) { + b.fm = filtermaps.NewFilterMaps(b.db, b, filtermaps.DefaultParams, history, 1, noHistory) + b.fm.Start() + b.fm.WaitIdle() +} - task.Bitsets = make([][]byte, len(task.Sections)) - for i, section := range task.Sections { - if rand.Int()%4 != 0 { // Handle occasional missing deliveries - head := rawdb.ReadCanonicalHash(b.db, (section+1)*params.BloomBitsBlocks-1) - task.Bitsets[i], _ = rawdb.ReadBloomBits(b.db, task.Bit, section, head) - } - } - request <- task - } - } - }() +func (b *testBackend) stopFilterMaps() { + b.fm.Stop() + b.fm = nil } func (b *testBackend) setPending(block *types.Block, receipts types.Receipts) { diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index 6a3057326d..505bc071aa 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -46,15 +46,27 @@ func makeReceipt(addr common.Address) *types.Receipt { return receipt } -func BenchmarkFilters(b *testing.B) { +func BenchmarkFiltersIndexed(b *testing.B) { + benchmarkFilters(b, 0, false) +} + +func BenchmarkFiltersHalfIndexed(b *testing.B) { + benchmarkFilters(b, 50000, false) +} + +func BenchmarkFiltersUnindexed(b *testing.B) { + benchmarkFilters(b, 0, true) +} + +func benchmarkFilters(b *testing.B, history uint64, noHistory bool) { var ( - db = rawdb.NewMemoryDatabase() - _, sys = newTestFilterSystem(b, db, Config{}) - key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - addr1 = crypto.PubkeyToAddress(key1.PublicKey) - addr2 = common.BytesToAddress([]byte("jeff")) - addr3 = common.BytesToAddress([]byte("ethereum")) - addr4 = common.BytesToAddress([]byte("random addresses please")) + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(b, db, Config{}) + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + addr2 = common.BytesToAddress([]byte("jeff")) + addr3 = common.BytesToAddress([]byte("ethereum")) + addr4 = common.BytesToAddress([]byte("random addresses please")) gspec = &core.Genesis{ Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(1000000)}}, @@ -94,9 +106,12 @@ func BenchmarkFilters(b *testing.B) { rawdb.WriteHeadBlockHash(db, block.Hash()) rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i]) } + backend.startFilterMaps(history, noHistory) + defer backend.stopFilterMaps() + b.ResetTimer() - filter := sys.NewRangeFilter(0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil) + filter := sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), []common.Address{addr1, addr2, addr3, addr4}, nil) for i := 0; i < b.N; i++ { filter.begin = 0 @@ -107,7 +122,19 @@ func BenchmarkFilters(b *testing.B) { } } -func TestFilters(t *testing.T) { +func TestFiltersIndexed(t *testing.T) { + testFilters(t, 0, false) +} + +func TestFiltersHalfIndexed(t *testing.T) { + testFilters(t, 500, false) +} + +func TestFiltersUnindexed(t *testing.T) { + testFilters(t, 0, true) +} + +func testFilters(t *testing.T, history uint64, noHistory bool) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{}) @@ -279,6 +306,9 @@ func TestFilters(t *testing.T) { }) backend.setPending(pchain[0], preceipts[0]) + backend.startFilterMaps(history, noHistory) + defer backend.stopFilterMaps() + for i, tc := range []struct { f *Filter want string @@ -387,3 +417,137 @@ func TestFilters(t *testing.T) { } }) } + +func TestRangeLogs(t *testing.T) { + var ( + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) + gspec = &core.Genesis{ + Config: params.TestChainConfig, + Alloc: types.GenesisAlloc{}, + BaseFee: big.NewInt(params.InitialBaseFee), + } + ) + _, err := gspec.Commit(db, triedb.NewDatabase(db, nil)) + if err != nil { + t.Fatal(err) + } + chain, _ := core.GenerateChain(gspec.Config, gspec.ToBlock(), ethash.NewFaker(), db, 1000, func(i int, gen *core.BlockGen) {}) + var l uint64 + bc, err := core.NewBlockChain(db, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, &l) + if err != nil { + t.Fatal(err) + } + _, err = bc.InsertChain(chain[:600]) + if err != nil { + t.Fatal(err) + } + + backend.startFilterMaps(200, false) + defer backend.stopFilterMaps() + + var ( + testCase, event int + filter *Filter + addresses = []common.Address{common.Address{}} + ) + + newFilter := func(begin, end int64) { + testCase++ + event = 0 + filter = sys.NewRangeFilter(begin, end, addresses, nil) + filter.rangeLogsTestHook = make(chan rangeLogsTestEvent) + go func(filter *Filter) { + filter.Logs(context.Background()) + // ensure that filter will not be blocked if we exit early + for range filter.rangeLogsTestHook { + } + }(filter) + } + + expEvent := func(exp rangeLogsTestEvent) { + event++ + ev := <-filter.rangeLogsTestHook + if ev != exp { + t.Fatalf("Test case #%d: wrong test event #%d received (got %v, expected %v)", testCase, event, ev, exp) + } + } + + // test case #1 + newFilter(300, 500) + expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 401, 500}) + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 401, 500}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 401, 500}) + expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 300, 400}) + if _, err := bc.InsertChain(chain[600:700]); err != nil { + t.Fatal(err) + } + backend.fm.WaitIdle() + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 300, 500}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 300, 500}) // unindexed search is not affected by trimmed tail + expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0}) + + // test case #2 + newFilter(400, int64(rpc.LatestBlockNumber)) + expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 501, 700}) + if _, err := bc.InsertChain(chain[700:800]); err != nil { + t.Fatal(err) + } + backend.fm.WaitIdle() + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 501, 700}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 601, 700}) + expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 600}) + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 700}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 700}) + expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 701, 800}) + if err := bc.SetHead(750); err != nil { + t.Fatal(err) + } + backend.fm.WaitIdle() + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 800}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 750}) + expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0}) + + // test case #3 + newFilter(int64(rpc.LatestBlockNumber), int64(rpc.LatestBlockNumber)) + expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 750, 750}) + if err := bc.SetHead(740); err != nil { + t.Fatal(err) + } + backend.fm.WaitIdle() + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 750, 750}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 0, 0}) + expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 740, 740}) + if _, err := bc.InsertChain(chain[740:750]); err != nil { + t.Fatal(err) + } + backend.fm.WaitIdle() + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 740, 740}) + // trimmed at the beginning of the next iteration + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 740, 740}) + expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 750, 750}) + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 750, 750}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 750, 750}) + expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0}) + + // test case #4 + newFilter(400, int64(rpc.LatestBlockNumber)) + expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 551, 750}) + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 551, 750}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 551, 750}) + expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 550}) + if _, err := bc.InsertChain(chain[750:1000]); err != nil { + t.Fatal(err) + } + backend.fm.WaitIdle() + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 750}) + // indexed range affected by tail pruning so we have to discard the entire + // match set + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 0, 0}) + expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 801, 1000}) + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 801, 1000}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 801, 1000}) + expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 800}) + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 1000}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 1000}) +} diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index 0303a0a6ea..0f501ac266 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -45,7 +45,7 @@ import ( "github.com/ethereum/go-ethereum/consensus/beacon" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" @@ -620,11 +620,9 @@ func (b testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) func (b testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { panic("implement me") } -func (b testBackend) BloomStatus() (uint64, uint64) { panic("implement me") } -func (b testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { +func (b testBackend) NewMatcherBackend() filtermaps.MatcherBackend { panic("implement me") } - func TestEstimateGas(t *testing.T) { t.Parallel() // Initialize test accounts diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 3d8f7aa700..9e2ea2c876 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -27,7 +27,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" @@ -93,8 +93,8 @@ type Backend interface { GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription - BloomStatus() (uint64, uint64) - ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) + + NewMatcherBackend() filtermaps.MatcherBackend } func GetAPIs(apiBackend Backend) []rpc.API { diff --git a/internal/ethapi/transaction_args_test.go b/internal/ethapi/transaction_args_test.go index 7172fc883f..7113fde0b8 100644 --- a/internal/ethapi/transaction_args_test.go +++ b/internal/ethapi/transaction_args_test.go @@ -30,7 +30,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" @@ -393,12 +393,12 @@ func (b *backendMock) TxPoolContent() (map[common.Address][]*types.Transaction, func (b *backendMock) TxPoolContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) { return nil, nil } -func (b *backendMock) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription { return nil } -func (b *backendMock) BloomStatus() (uint64, uint64) { return 0, 0 } -func (b *backendMock) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {} -func (b *backendMock) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return nil } +func (b *backendMock) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription { return nil } +func (b *backendMock) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return nil } func (b *backendMock) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { return nil } func (b *backendMock) Engine() consensus.Engine { return nil } + +func (b *backendMock) NewMatcherBackend() filtermaps.MatcherBackend { return nil } diff --git a/triedb/pathdb/journal.go b/triedb/pathdb/journal.go index 79a7a22e0b..a2f8040396 100644 --- a/triedb/pathdb/journal.go +++ b/triedb/pathdb/journal.go @@ -46,7 +46,7 @@ var ( // - Version 1: storage.Incomplete field is removed // - Version 2: add post-modification state values // - Version 3: a flag has been added to indicate whether the storage slot key is the raw key or a hash -const journalVersion uint64 = 3 +const journalVersion uint64 = 4 //TODO xxx // loadJournal tries to parse the layer journal from the disk. func (db *Database) loadJournal(diskRoot common.Hash) (layer, error) {