eth/filters: remove support for pending logs (#29574)

This change removes support for subscribing to pending logs. 

"Pending logs" were always an odd feature, because it can never be fully reliable. When support for it was added many years ago, the intention was for this to be used by wallet apps to show the 'potential future token balance' of accounts, i.e. as a way of notifying the user of incoming transfers before they were mined. In order to generate the pending logs, the node must pick a subset of all public mempool transactions, execute them in the EVM, and then dispatch the resulting logs to API consumers.
pull/29604/head
Felix Lange 7 months ago committed by GitHub
parent ad3d8cb12a
commit 82b0dec713
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      cmd/utils/flags.go
  2. 7
      eth/filters/api.go
  3. 41
      eth/filters/filter.go
  4. 187
      eth/filters/filter_system.go
  5. 387
      eth/filters/filter_system_test.go
  6. 8
      eth/filters/filter_test.go
  7. 2
      ethclient/gethclient/gethclient_test.go
  8. 2
      ethclient/simulated/backend.go

@ -1959,7 +1959,7 @@ func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconf
})
stack.RegisterAPIs([]rpc.API{{
Namespace: "eth",
Service: filters.NewFilterAPI(filterSystem, false),
Service: filters.NewFilterAPI(filterSystem),
}})
return filterSystem
}

@ -37,6 +37,7 @@ var (
errInvalidTopic = errors.New("invalid topic(s)")
errFilterNotFound = errors.New("filter not found")
errInvalidBlockRange = errors.New("invalid block range params")
errPendingLogsUnsupported = errors.New("pending logs are not supported")
errExceedMaxTopics = errors.New("exceed max topics")
)
@ -70,10 +71,10 @@ type FilterAPI struct {
}
// NewFilterAPI returns a new FilterAPI instance.
func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI {
func NewFilterAPI(system *FilterSystem) *FilterAPI {
api := &FilterAPI{
sys: system,
events: NewEventSystem(system, lightMode),
events: NewEventSystem(system),
filters: make(map[rpc.ID]*filter),
timeout: system.cfg.Timeout,
}
@ -456,7 +457,7 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
f.txs = nil
return hashes, nil
}
case LogsSubscription, MinedAndPendingLogsSubscription:
case LogsSubscription:
logs := f.logs
f.logs = nil
return returnLogs(logs), nil

@ -108,19 +108,9 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
return f.blockLogs(ctx, header)
}
var (
beginPending = f.begin == rpc.PendingBlockNumber.Int64()
endPending = f.end == rpc.PendingBlockNumber.Int64()
)
// special case for pending logs
if beginPending && !endPending {
return nil, errInvalidBlockRange
}
// Short-cut if all we care about is pending logs
if beginPending && endPending {
return f.pendingLogs(), nil
// Disallow pending logs.
if f.begin == rpc.PendingBlockNumber.Int64() || f.end == rpc.PendingBlockNumber.Int64() {
return nil, errPendingLogsUnsupported
}
resolveSpecial := func(number int64) (int64, error) {
@ -165,17 +155,8 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
case log := <-logChan:
logs = append(logs, log)
case err := <-errChan:
if err != nil {
// if an error occurs during extraction, we do return the extracted data
return logs, err
}
// Append the pending ones
if endPending {
pendingLogs := f.pendingLogs()
logs = append(logs, pendingLogs...)
}
return logs, nil
}
}
}
@ -332,22 +313,6 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*typ
return logs, nil
}
// pendingLogs returns the logs matching the filter criteria within the pending block.
func (f *Filter) pendingLogs() []*types.Log {
block, receipts, _ := f.sys.backend.Pending()
if block == nil || receipts == nil {
return nil
}
if bloomFilter(block.Bloom(), f.addresses, f.topics) {
var unfiltered []*types.Log
for _, r := range receipts {
unfiltered = append(unfiltered, r.Logs...)
}
return filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
}
return nil
}
// filterLogs creates a slice of logs matching the given criteria.
func filterLogs(logs []*types.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*types.Log {
var check = func(log *types.Log) bool {

@ -30,8 +30,6 @@ import (
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@ -63,7 +61,6 @@ type Backend interface {
GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error)
Pending() (*types.Block, types.Receipts, *state.StateDB)
CurrentHeader() *types.Header
ChainConfig() *params.ChainConfig
@ -152,10 +149,6 @@ const (
UnknownSubscription Type = iota
// LogsSubscription queries for new or removed (chain reorg) logs
LogsSubscription
// PendingLogsSubscription queries for logs in pending blocks
PendingLogsSubscription
// MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
MinedAndPendingLogsSubscription
// PendingTransactionsSubscription queries for pending transactions entering
// the pending state
PendingTransactionsSubscription
@ -194,8 +187,6 @@ type subscription struct {
type EventSystem struct {
backend Backend
sys *FilterSystem
lightMode bool
lastHead *types.Header
// Subscriptions
txsSub event.Subscription // Subscription for new transaction event
@ -218,11 +209,10 @@ type EventSystem struct {
//
// The returned manager has a loop that needs to be stopped with the Stop function
// or by stopping the given mux.
func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem {
func NewEventSystem(sys *FilterSystem) *EventSystem {
m := &EventSystem{
sys: sys,
backend: sys.backend,
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txsCh: make(chan core.NewTxsEvent, txChanSize),
@ -310,10 +300,11 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
to = rpc.BlockNumber(crit.ToBlock.Int64())
}
// only interested in pending logs
if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber {
return es.subscribePendingLogs(crit, logs), nil
// Pending logs are not supported anymore.
if from == rpc.PendingBlockNumber || to == rpc.PendingBlockNumber {
return nil, errPendingLogsUnsupported
}
// only interested in new mined logs
if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber {
return es.subscribeLogs(crit, logs), nil
@ -322,10 +313,6 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
if from >= 0 && to >= 0 && to >= from {
return es.subscribeLogs(crit, logs), nil
}
// interested in mined logs from a specific block number, new logs and pending logs
if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber {
return es.subscribeMinedPendingLogs(crit, logs), nil
}
// interested in logs from a specific block number to new mined blocks
if from >= 0 && to == rpc.LatestBlockNumber {
return es.subscribeLogs(crit, logs), nil
@ -333,23 +320,6 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
return nil, errInvalidBlockRange
}
// subscribeMinedPendingLogs creates a subscription that returned mined and
// pending logs that match the given criteria.
func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: MinedAndPendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
txs: make(chan []*types.Transaction),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
// subscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel.
func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
@ -367,23 +337,6 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
return es.subscribe(sub)
}
// subscribePendingLogs creates a subscription that writes contract event logs for
// transactions that enter the transaction pool.
func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
txs: make(chan []*types.Transaction),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
// SubscribeNewHeads creates a subscription that writes the header of a block that is
// imported in the chain.
func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {
@ -430,18 +383,6 @@ func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) {
}
}
func (es *EventSystem) handlePendingLogs(filters filterIndex, logs []*types.Log) {
if len(logs) == 0 {
return
}
for _, f := range filters[PendingLogsSubscription] {
matchedLogs := filterLogs(logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
if len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
}
func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
for _, f := range filters[PendingTransactionsSubscription] {
f.txs <- ev.Txs
@ -452,91 +393,6 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
for _, f := range filters[BlocksSubscription] {
f.headers <- ev.Block.Header()
}
if es.lightMode && len(filters[LogsSubscription]) > 0 {
es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) {
for _, f := range filters[LogsSubscription] {
if f.logsCrit.FromBlock != nil && header.Number.Cmp(f.logsCrit.FromBlock) < 0 {
continue
}
if f.logsCrit.ToBlock != nil && header.Number.Cmp(f.logsCrit.ToBlock) > 0 {
continue
}
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
})
}
}
func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) {
oldh := es.lastHead
es.lastHead = newHeader
if oldh == nil {
return
}
newh := newHeader
// find common ancestor, create list of rolled back and new block hashes
var oldHeaders, newHeaders []*types.Header
for oldh.Hash() != newh.Hash() {
if oldh.Number.Uint64() >= newh.Number.Uint64() {
oldHeaders = append(oldHeaders, oldh)
oldh = rawdb.ReadHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1)
}
if oldh.Number.Uint64() < newh.Number.Uint64() {
newHeaders = append(newHeaders, newh)
newh = rawdb.ReadHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1)
if newh == nil {
// happens when CHT syncing, nothing to do
newh = oldh
}
}
}
// roll back old blocks
for _, h := range oldHeaders {
callBack(h, true)
}
// check new blocks (array is in reverse order)
for i := len(newHeaders) - 1; i >= 0; i-- {
callBack(newHeaders[i], false)
}
}
// filter logs of a single header in light client mode
func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log {
if !bloomFilter(header.Bloom, addresses, topics) {
return nil
}
// Get the logs of the block
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
cached, err := es.sys.cachedLogElem(ctx, header.Hash(), header.Number.Uint64())
if err != nil {
return nil
}
unfiltered := append([]*types.Log{}, cached.logs...)
for i, log := range unfiltered {
// Don't modify in-cache elements
logcopy := *log
logcopy.Removed = remove
// Swap copy in-place
unfiltered[i] = &logcopy
}
logs := filterLogs(unfiltered, nil, nil, addresses, topics)
// Txhash is already resolved
if len(logs) > 0 && logs[0].TxHash != (common.Hash{}) {
return logs
}
// Resolve txhash
body, err := es.sys.cachedGetBody(ctx, cached, header.Hash(), header.Number.Uint64())
if err != nil {
return nil
}
for _, log := range logs {
// logs are already copied, safe to modify
log.TxHash = body.Transactions[log.TxIndex].Hash()
}
return logs
}
// eventLoop (un)installs filters and processes mux events.
@ -564,46 +420,13 @@ func (es *EventSystem) eventLoop() {
es.handleLogs(index, ev.Logs)
case ev := <-es.chainCh:
es.handleChainEvent(index, ev)
// If we have no pending log subscription,
// we don't need to collect any pending logs.
if len(index[PendingLogsSubscription]) == 0 {
continue
}
// Pull the pending logs if there is a new chain head.
pendingBlock, pendingReceipts, _ := es.backend.Pending()
if pendingBlock == nil || pendingReceipts == nil {
continue
}
if pendingBlock.ParentHash() != ev.Block.Hash() {
continue
}
var logs []*types.Log
for _, receipt := range pendingReceipts {
if len(receipt.Logs) > 0 {
logs = append(logs, receipt.Logs...)
}
}
es.handlePendingLogs(index, logs)
case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
index[LogsSubscription][f.id] = f
index[PendingLogsSubscription][f.id] = f
} else {
index[f.typ][f.id] = f
}
close(f.installed)
case f := <-es.uninstall:
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
delete(index[LogsSubscription], f.id)
delete(index[PendingLogsSubscription], f.id)
} else {
delete(index[f.typ], f.id)
}
close(f.err)
// System stopped

@ -19,7 +19,6 @@ package filters
import (
"context"
"errors"
"fmt"
"math/big"
"math/rand"
"reflect"
@ -27,15 +26,12 @@ import (
"testing"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
@ -125,10 +121,6 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash, number uint
return logs, nil
}
func (b *testBackend) Pending() (*types.Block, types.Receipts, *state.StateDB) {
return b.pendingBlock, b.pendingReceipts, nil
}
func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return b.txFeed.Subscribe(ch)
}
@ -181,15 +173,6 @@ func (b *testBackend) setPending(block *types.Block, receipts types.Receipts) {
b.pendingReceipts = receipts
}
func (b *testBackend) notifyPending(logs []*types.Log) {
genesis := &core.Genesis{
Config: params.TestChainConfig,
}
_, blocks, _ := core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 2, func(i int, b *core.BlockGen) {})
b.setPending(blocks[1], []*types.Receipt{{Logs: logs}})
b.chainFeed.Send(core.ChainEvent{Block: blocks[0]})
}
func newTestFilterSystem(t testing.TB, db ethdb.Database, cfg Config) (*testBackend, *FilterSystem) {
backend := &testBackend{db: db}
sys := NewFilterSystem(backend, cfg)
@ -207,7 +190,7 @@ func TestBlockSubscription(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)
api = NewFilterAPI(sys)
genesis = &core.Genesis{
Config: params.TestChainConfig,
BaseFee: big.NewInt(params.InitialBaseFee),
@ -262,7 +245,7 @@ func TestPendingTxFilter(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)
api = NewFilterAPI(sys)
transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
@ -318,7 +301,7 @@ func TestPendingTxFilterFullTx(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)
api = NewFilterAPI(sys)
transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
@ -374,7 +357,7 @@ func TestLogFilterCreation(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
_, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)
api = NewFilterAPI(sys)
testCases = []struct {
crit FilterCriteria
@ -386,8 +369,6 @@ func TestLogFilterCreation(t *testing.T) {
{FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, true},
// "mined" block range to pending
{FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, true},
// new mined and pending blocks
{FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, true},
// from block "higher" than to block
{FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(1)}, false},
// from block "higher" than to block
@ -423,7 +404,7 @@ func TestInvalidLogFilterCreation(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
_, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)
api = NewFilterAPI(sys)
)
// different situations where log filter creation should fail.
@ -449,7 +430,7 @@ func TestInvalidGetLogsRequest(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
_, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)
api = NewFilterAPI(sys)
blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
)
@ -475,7 +456,7 @@ func TestInvalidGetRangeLogsRequest(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
_, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)
api = NewFilterAPI(sys)
)
if _, err := api.GetLogs(context.Background(), FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(1)}); err != errInvalidBlockRange {
@ -490,7 +471,7 @@ func TestLogFilter(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)
api = NewFilterAPI(sys)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@ -509,9 +490,6 @@ func TestLogFilter(t *testing.T) {
{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3},
}
expectedCase7 = []*types.Log{allLogs[3], allLogs[4], allLogs[0], allLogs[1], allLogs[2], allLogs[3], allLogs[4]}
expectedCase11 = []*types.Log{allLogs[1], allLogs[2], allLogs[1], allLogs[2]}
testCases = []struct {
crit FilterCriteria
expected []*types.Log
@ -529,20 +507,14 @@ func TestLogFilter(t *testing.T) {
4: {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[3:5], ""},
// match logs based on multiple addresses and "or" topics
5: {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[2:5], ""},
// logs in the pending block
6: {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, allLogs[:2], ""},
// mined logs with block num >= 2 or pending logs
7: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, expectedCase7, ""},
// all "mined" logs with block num >= 2
8: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs[3:], ""},
6: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs[3:], ""},
// all "mined" logs
9: {FilterCriteria{ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs, ""},
7: {FilterCriteria{ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs, ""},
// all "mined" logs with 1>= block num <=2 and topic secondTopic
10: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""},
// all "mined" and pending logs with topic firstTopic
11: {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), Topics: [][]common.Hash{{firstTopic}}}, expectedCase11, ""},
8: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""},
// match all logs due to wildcard topic
12: {FilterCriteria{Topics: [][]common.Hash{nil}}, allLogs[1:], ""},
9: {FilterCriteria{Topics: [][]common.Hash{nil}}, allLogs[1:], ""},
}
)
@ -557,16 +529,13 @@ func TestLogFilter(t *testing.T) {
t.Fatal("Logs event not delivered")
}
// set pending logs
backend.notifyPending(allLogs)
for i, tt := range testCases {
var fetched []*types.Log
timeout := time.Now().Add(1 * time.Second)
for { // fetch all expected logs
results, err := api.GetFilterChanges(tt.id)
if err != nil {
t.Fatalf("Unable to fetch logs: %v", err)
t.Fatalf("test %d: unable to fetch logs: %v", i, err)
}
fetched = append(fetched, results.([]*types.Log)...)
@ -597,326 +566,6 @@ func TestLogFilter(t *testing.T) {
}
}
// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event feed.
func TestPendingLogsSubscription(t *testing.T) {
t.Parallel()
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333")
notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999")
firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222")
thirdTopic = common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333")
fourthTopic = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444")
notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999")
allLogs = [][]*types.Log{
{{Address: firstAddr, Topics: []common.Hash{}, BlockNumber: 0}},
{{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1}},
{{Address: secondAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 2}},
{{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3}},
{{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 4}},
{
{Address: thirdAddress, Topics: []common.Hash{firstTopic}, BlockNumber: 5},
{Address: thirdAddress, Topics: []common.Hash{thirdTopic}, BlockNumber: 5},
{Address: thirdAddress, Topics: []common.Hash{fourthTopic}, BlockNumber: 5},
{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 5},
},
}
pendingBlockNumber = big.NewInt(rpc.PendingBlockNumber.Int64())
testCases = []struct {
crit ethereum.FilterQuery
expected []*types.Log
c chan []*types.Log
sub *Subscription
err chan error
}{
// match all
{
ethereum.FilterQuery{FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
flattenLogs(allLogs),
nil, nil, nil,
},
// match none due to no matching addresses
{
ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
nil,
nil, nil, nil,
},
// match logs based on addresses, ignore topics
{
ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
append(flattenLogs(allLogs[:2]), allLogs[5][3]),
nil, nil, nil,
},
// match none due to no matching topics (match with address)
{
ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
nil,
nil, nil, nil,
},
// match logs based on addresses and topics
{
ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
append(flattenLogs(allLogs[3:5]), allLogs[5][0]),
nil, nil, nil,
},
// match logs based on multiple addresses and "or" topics
{
ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
append(flattenLogs(allLogs[2:5]), allLogs[5][0]),
nil, nil, nil,
},
// multiple pending logs, should match only 2 topics from the logs in block 5
{
ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
[]*types.Log{allLogs[5][0], allLogs[5][2]},
nil, nil, nil,
},
// match none due to only matching new mined logs
{
ethereum.FilterQuery{},
nil,
nil, nil, nil,
},
// match none due to only matching mined logs within a specific block range
{
ethereum.FilterQuery{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)},
nil,
nil, nil, nil,
},
// match all due to matching mined and pending logs
{
ethereum.FilterQuery{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())},
flattenLogs(allLogs),
nil, nil, nil,
},
// match none due to matching logs from a specific block number to new mined blocks
{
ethereum.FilterQuery{FromBlock: big.NewInt(1), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())},
nil,
nil, nil, nil,
},
}
)
// create all subscriptions, this ensures all subscriptions are created before the events are posted.
// on slow machines this could otherwise lead to missing events when the subscription is created after
// (some) events are posted.
for i := range testCases {
testCases[i].c = make(chan []*types.Log)
testCases[i].err = make(chan error, 1)
var err error
testCases[i].sub, err = api.events.SubscribeLogs(testCases[i].crit, testCases[i].c)
if err != nil {
t.Fatalf("SubscribeLogs %d failed: %v\n", i, err)
}
}
for n, test := range testCases {
i := n
tt := test
go func() {
defer tt.sub.Unsubscribe()
var fetched []*types.Log
timeout := time.After(1 * time.Second)
fetchLoop:
for {
select {
case logs := <-tt.c:
// Do not break early if we've fetched greater, or equal,
// to the number of logs expected. This ensures we do not
// deadlock the filter system because it will do a blocking
// send on this channel if another log arrives.
fetched = append(fetched, logs...)
case <-timeout:
break fetchLoop
}
}
if len(fetched) != len(tt.expected) {
tt.err <- fmt.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched))
return
}
for l := range fetched {
if fetched[l].Removed {
tt.err <- fmt.Errorf("expected log not to be removed for log %d in case %d", l, i)
return
}
if !reflect.DeepEqual(fetched[l], tt.expected[l]) {
tt.err <- fmt.Errorf("invalid log on index %d for case %d\n", l, i)
return
}
}
tt.err <- nil
}()
}
// set pending logs
var flattenLogs []*types.Log
for _, logs := range allLogs {
flattenLogs = append(flattenLogs, logs...)
}
backend.notifyPending(flattenLogs)
for i := range testCases {
err := <-testCases[i].err
if err != nil {
t.Fatalf("test %d failed: %v", i, err)
}
<-testCases[i].sub.Err()
}
}
func TestLightFilterLogs(t *testing.T) {
t.Parallel()
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, true)
signer = types.HomesteadSigner{}
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333")
notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999")
firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222")
// posted twice, once as regular logs and once as pending logs.
allLogs = []*types.Log{
// Block 1
{Address: firstAddr, Topics: []common.Hash{}, Data: []byte{}, BlockNumber: 2, Index: 0},
// Block 2
{Address: firstAddr, Topics: []common.Hash{firstTopic}, Data: []byte{}, BlockNumber: 3, Index: 0},
{Address: secondAddr, Topics: []common.Hash{firstTopic}, Data: []byte{}, BlockNumber: 3, Index: 1},
{Address: thirdAddress, Topics: []common.Hash{secondTopic}, Data: []byte{}, BlockNumber: 3, Index: 2},
// Block 3
{Address: thirdAddress, Topics: []common.Hash{secondTopic}, Data: []byte{}, BlockNumber: 4, Index: 0},
}
testCases = []struct {
crit FilterCriteria
expected []*types.Log
id rpc.ID
}{
// match all
0: {FilterCriteria{}, allLogs, ""},
// match none due to no matching addresses
1: {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, ""},
// match logs based on addresses, ignore topics
2: {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""},
// match logs based on addresses and topics
3: {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[3:5], ""},
// all logs with block num >= 3
4: {FilterCriteria{FromBlock: big.NewInt(3), ToBlock: big.NewInt(5)}, allLogs[1:], ""},
// all logs
5: {FilterCriteria{FromBlock: big.NewInt(0), ToBlock: big.NewInt(5)}, allLogs, ""},
// all logs with 1>= block num <=2 and topic secondTopic
6: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(3), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""},
}
key, _ = crypto.GenerateKey()
addr = crypto.PubkeyToAddress(key.PublicKey)
genesis = &core.Genesis{Config: params.TestChainConfig,
Alloc: types.GenesisAlloc{
addr: {Balance: big.NewInt(params.Ether)},
},
}
receipts = []*types.Receipt{{
Logs: []*types.Log{allLogs[0]},
}, {
Logs: []*types.Log{allLogs[1], allLogs[2], allLogs[3]},
}, {
Logs: []*types.Log{allLogs[4]},
}}
)
_, blocks, _ := core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 4, func(i int, b *core.BlockGen) {
if i == 0 {
return
}
receipts[i-1].Bloom = types.CreateBloom(types.Receipts{receipts[i-1]})
b.AddUncheckedReceipt(receipts[i-1])
tx, _ := types.SignTx(types.NewTx(&types.LegacyTx{Nonce: uint64(i - 1), To: &common.Address{}, Value: big.NewInt(1000), Gas: params.TxGas, GasPrice: b.BaseFee(), Data: nil}), signer, key)
b.AddTx(tx)
})
for i, block := range blocks {
rawdb.WriteBlock(db, block)
rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64())
rawdb.WriteHeadBlockHash(db, block.Hash())
if i > 0 {
rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), []*types.Receipt{receipts[i-1]})
}
}
// create all filters
for i := range testCases {
id, err := api.NewFilter(testCases[i].crit)
if err != nil {
t.Fatal(err)
}
testCases[i].id = id
}
// raise events
time.Sleep(1 * time.Second)
for _, block := range blocks {
backend.chainFeed.Send(core.ChainEvent{Block: block, Hash: common.Hash{}, Logs: allLogs})
}
for i, tt := range testCases {
var fetched []*types.Log
timeout := time.Now().Add(1 * time.Second)
for { // fetch all expected logs
results, err := api.GetFilterChanges(tt.id)
if err != nil {
t.Fatalf("Unable to fetch logs: %v", err)
}
fetched = append(fetched, results.([]*types.Log)...)
if len(fetched) >= len(tt.expected) {
break
}
// check timeout
if time.Now().After(timeout) {
break
}
time.Sleep(100 * time.Millisecond)
}
if len(fetched) != len(tt.expected) {
t.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched))
return
}
for l := range fetched {
if fetched[l].Removed {
t.Errorf("expected log not to be removed for log %d in case %d", l, i)
}
expected := *tt.expected[l]
blockNum := expected.BlockNumber - 1
expected.BlockHash = blocks[blockNum].Hash()
expected.TxHash = blocks[blockNum].Transactions()[0].Hash()
if !reflect.DeepEqual(fetched[l], &expected) {
t.Errorf("invalid log on index %d for case %d", l, i)
}
}
}
}
// TestPendingTxFilterDeadlock tests if the event loop hangs when pending
// txes arrive at the same time that one of multiple filters is timing out.
// Please refer to #22131 for more details.
@ -927,7 +576,7 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{Timeout: timeout})
api = NewFilterAPI(sys, false)
api = NewFilterAPI(sys)
done = make(chan struct{})
)
@ -979,11 +628,3 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
}
}
}
func flattenLogs(pl [][]*types.Log) []*types.Log {
var logs []*types.Log
for _, l := range pl {
logs = append(logs, l...)
}
return logs
}

@ -345,15 +345,15 @@ func TestFilters(t *testing.T) {
},
{
f: sys.NewRangeFilter(int64(rpc.PendingBlockNumber), int64(rpc.PendingBlockNumber), nil, nil),
want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696335"],"data":"0x","blockNumber":"0x3e9","transactionHash":"0x4110587c1b8d86edc85dce929a34127f1cb8809515a9f177c91c866de3eb0638","transactionIndex":"0x0","blockHash":"0xd5e8d4e4eb51a2a2a6ec20ef68a4c2801240743c8deb77a6a1d118ac3eefb725","logIndex":"0x0","removed":false}]`,
err: errPendingLogsUnsupported.Error(),
},
{
f: sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.PendingBlockNumber), nil, nil),
want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696334"],"data":"0x","blockNumber":"0x3e8","transactionHash":"0x9a87842100a638dfa5da8842b4beda691d2fd77b0c84b57f24ecfa9fb208f747","transactionIndex":"0x0","blockHash":"0xb360bad5265261c075ece02d3bf0e39498a6a76310482cdfd90588748e6c5ee0","logIndex":"0x0","removed":false},{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696335"],"data":"0x","blockNumber":"0x3e9","transactionHash":"0x4110587c1b8d86edc85dce929a34127f1cb8809515a9f177c91c866de3eb0638","transactionIndex":"0x0","blockHash":"0xd5e8d4e4eb51a2a2a6ec20ef68a4c2801240743c8deb77a6a1d118ac3eefb725","logIndex":"0x0","removed":false}]`,
err: errPendingLogsUnsupported.Error(),
},
{
f: sys.NewRangeFilter(int64(rpc.PendingBlockNumber), int64(rpc.LatestBlockNumber), nil, nil),
err: errInvalidBlockRange.Error(),
err: errPendingLogsUnsupported.Error(),
},
} {
logs, err := tc.f.Logs(context.Background())
@ -375,7 +375,7 @@ func TestFilters(t *testing.T) {
}
t.Run("timeout", func(t *testing.T) {
f := sys.NewRangeFilter(0, -1, nil, nil)
f := sys.NewRangeFilter(0, rpc.LatestBlockNumber.Int64(), nil, nil)
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour))
defer cancel()
_, err := f.Logs(ctx)

@ -65,7 +65,7 @@ func newTestBackend(t *testing.T) (*node.Node, []*types.Block) {
filterSystem := filters.NewFilterSystem(ethservice.APIBackend, filters.Config{})
n.RegisterAPIs([]rpc.API{{
Namespace: "eth",
Service: filters.NewFilterAPI(filterSystem, false),
Service: filters.NewFilterAPI(filterSystem),
}})
// Import the test chain.

@ -114,7 +114,7 @@ func newWithNode(stack *node.Node, conf *eth.Config, blockPeriod uint64) (*Backe
filterSystem := filters.NewFilterSystem(backend.APIBackend, filters.Config{})
stack.RegisterAPIs([]rpc.API{{
Namespace: "eth",
Service: filters.NewFilterAPI(filterSystem, false),
Service: filters.NewFilterAPI(filterSystem),
}})
// Start the node
if err := stack.Start(); err != nil {

Loading…
Cancel
Save