From 5b1a04b9c749d804b51159fe12246c56de8515c1 Mon Sep 17 00:00:00 2001 From: lmittmann Date: Wed, 12 Oct 2022 11:54:52 +0200 Subject: [PATCH] eth/filters, ethclient/gethclient: add fullTx option to pending tx filter (#25186) This PR adds a way to subscribe to the _full_ pending transactions, as opposed to just being notified about hashes. In use cases where client subscribes to newPendingTransactions and gets txhashes only to then request the actual transaction, the caller can now shortcut that flow and obtain the transactions directly. Co-authored-by: Felix Lange --- eth/filters/api.go | 38 +++++++++++++++--------- eth/filters/filter_system.go | 28 ++++++++---------- eth/filters/filter_system_test.go | 22 +++++++------- ethclient/gethclient/gethclient.go | 7 ++++- ethclient/gethclient/gethclient_test.go | 39 ++++++++++++++++++++++++- 5 files changed, 91 insertions(+), 43 deletions(-) diff --git a/eth/filters/api.go b/eth/filters/api.go index 43e63d5ba9..f52bff6f3c 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -38,6 +38,7 @@ type filter struct { typ Type deadline *time.Timer // filter is inactive when deadline triggers hashes []common.Hash + txs []*types.Transaction crit FilterCriteria logs []*types.Log s *Subscription // associated subscription in event system @@ -96,28 +97,28 @@ func (api *FilterAPI) timeoutLoop(timeout time.Duration) { } } -// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes +// NewPendingTransactionFilter creates a filter that fetches pending transactions // as transactions enter the pending state. // // It is part of the filter package because this filter can be used through the // `eth_getFilterChanges` polling method that is also used for log filters. func (api *FilterAPI) NewPendingTransactionFilter() rpc.ID { var ( - pendingTxs = make(chan []common.Hash) + pendingTxs = make(chan []*types.Transaction) pendingTxSub = api.events.SubscribePendingTxs(pendingTxs) ) api.filtersMu.Lock() - api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub} + api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub} api.filtersMu.Unlock() go func() { for { select { - case ph := <-pendingTxs: + case pTx := <-pendingTxs: api.filtersMu.Lock() if f, found := api.filters[pendingTxSub.ID]; found { - f.hashes = append(f.hashes, ph...) + f.txs = append(f.txs, pTx...) } api.filtersMu.Unlock() case <-pendingTxSub.Err(): @@ -132,9 +133,10 @@ func (api *FilterAPI) NewPendingTransactionFilter() rpc.ID { return pendingTxSub.ID } -// NewPendingTransactions creates a subscription that is triggered each time a transaction -// enters the transaction pool and was signed from one of the transactions this nodes manages. -func (api *FilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) { +// NewPendingTransactions creates a subscription that is triggered each time a +// transaction enters the transaction pool. If fullTx is true the full tx is +// sent to the client, otherwise the hash is sent. +func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported @@ -143,16 +145,20 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscrip rpcSub := notifier.CreateSubscription() go func() { - txHashes := make(chan []common.Hash, 128) - pendingTxSub := api.events.SubscribePendingTxs(txHashes) + txs := make(chan []*types.Transaction, 128) + pendingTxSub := api.events.SubscribePendingTxs(txs) for { select { - case hashes := <-txHashes: + case txs := <-txs: // To keep the original behaviour, send a single tx hash in one notification. // TODO(rjl493456442) Send a batch of tx hashes in one notification - for _, h := range hashes { - notifier.Notify(rpcSub.ID, h) + for _, tx := range txs { + if fullTx != nil && *fullTx { + notifier.Notify(rpcSub.ID, tx) + } else { + notifier.Notify(rpcSub.ID, tx.Hash()) + } } case <-rpcSub.Err(): pendingTxSub.Unsubscribe() @@ -411,10 +417,14 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { f.deadline.Reset(api.timeout) switch f.typ { - case PendingTransactionsSubscription, BlocksSubscription: + case BlocksSubscription: hashes := f.hashes f.hashes = nil return returnHashes(hashes), nil + case PendingTransactionsSubscription: + txs := f.txs + f.txs = nil + return txs, nil case LogsSubscription, MinedAndPendingLogsSubscription: logs := f.logs f.logs = nil diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index ab9858f454..e86a67abfd 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -124,8 +124,8 @@ const ( PendingLogsSubscription // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks. MinedAndPendingLogsSubscription - // PendingTransactionsSubscription queries tx hashes for pending - // transactions entering the pending state + // PendingTransactionsSubscription queries for pending transactions entering + // the pending state PendingTransactionsSubscription // BlocksSubscription queries hashes for blocks that are imported BlocksSubscription @@ -151,7 +151,7 @@ type subscription struct { created time.Time logsCrit ethereum.FilterQuery logs chan []*types.Log - hashes chan []common.Hash + txs chan []*types.Transaction headers chan *types.Header installed chan struct{} // closed when the filter is installed err chan error // closed when the filter is uninstalled @@ -244,7 +244,7 @@ func (sub *Subscription) Unsubscribe() { case sub.es.uninstall <- sub.f: break uninstallLoop case <-sub.f.logs: - case <-sub.f.hashes: + case <-sub.f.txs: case <-sub.f.headers: } } @@ -311,7 +311,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan []common.Hash), + txs: make(chan []*types.Transaction), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -328,7 +328,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan []common.Hash), + txs: make(chan []*types.Transaction), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -345,7 +345,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan []common.Hash), + txs: make(chan []*types.Transaction), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -361,7 +361,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti typ: BlocksSubscription, created: time.Now(), logs: make(chan []*types.Log), - hashes: make(chan []common.Hash), + txs: make(chan []*types.Transaction), headers: headers, installed: make(chan struct{}), err: make(chan error), @@ -369,15 +369,15 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti return es.subscribe(sub) } -// SubscribePendingTxs creates a subscription that writes transaction hashes for +// SubscribePendingTxs creates a subscription that writes transactions for // transactions that enter the transaction pool. -func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription { +func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: PendingTransactionsSubscription, created: time.Now(), logs: make(chan []*types.Log), - hashes: hashes, + txs: txs, headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -421,12 +421,8 @@ func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLog } func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) { - hashes := make([]common.Hash, 0, len(ev.Txs)) - for _, tx := range ev.Txs { - hashes = append(hashes, tx.Hash()) - } for _, f := range filters[PendingTransactionsSubscription] { - f.hashes <- hashes + f.txs <- ev.Txs } } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 4386f0e5bd..a41271f7b8 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -240,7 +240,7 @@ func TestPendingTxFilter(t *testing.T) { types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), } - hashes []common.Hash + txs []*types.Transaction ) fid0 := api.NewPendingTransactionFilter() @@ -255,9 +255,9 @@ func TestPendingTxFilter(t *testing.T) { t.Fatalf("Unable to retrieve logs: %v", err) } - h := results.([]common.Hash) - hashes = append(hashes, h...) - if len(hashes) >= len(transactions) { + tx := results.([]*types.Transaction) + txs = append(txs, tx...) + if len(txs) >= len(transactions) { break } // check timeout @@ -268,13 +268,13 @@ func TestPendingTxFilter(t *testing.T) { time.Sleep(100 * time.Millisecond) } - if len(hashes) != len(transactions) { - t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes)) + if len(txs) != len(transactions) { + t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(txs)) return } - for i := range hashes { - if hashes[i] != transactions[i].Hash() { - t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i]) + for i := range txs { + if txs[i].Hash() != transactions[i].Hash() { + t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash()) } } } @@ -705,11 +705,11 @@ func TestPendingTxFilterDeadlock(t *testing.T) { fids[i] = fid // Wait for at least one tx to arrive in filter for { - hashes, err := api.GetFilterChanges(fid) + txs, err := api.GetFilterChanges(fid) if err != nil { t.Fatalf("Filter should exist: %v\n", err) } - if len(hashes.([]common.Hash)) > 0 { + if len(txs.([]*types.Transaction)) > 0 { break } runtime.Gosched() diff --git a/ethclient/gethclient/gethclient.go b/ethclient/gethclient/gethclient.go index e182911aa5..8211ee75ae 100644 --- a/ethclient/gethclient/gethclient.go +++ b/ethclient/gethclient/gethclient.go @@ -166,7 +166,12 @@ func (ec *Client) GetNodeInfo(ctx context.Context) (*p2p.NodeInfo, error) { return &result, err } -// SubscribePendingTransactions subscribes to new pending transactions. +// SubscribeFullPendingTransactions subscribes to new pending transactions. +func (ec *Client) SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *types.Transaction) (*rpc.ClientSubscription, error) { + return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions", true) +} + +// SubscribePendingTransactions subscribes to new pending transaction hashes. func (ec *Client) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error) { return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions") } diff --git a/ethclient/gethclient/gethclient_test.go b/ethclient/gethclient/gethclient_test.go index da0118887b..e60490c616 100644 --- a/ethclient/gethclient/gethclient_test.go +++ b/ethclient/gethclient/gethclient_test.go @@ -122,8 +122,11 @@ func TestGethClient(t *testing.T) { "TestSetHead", func(t *testing.T) { testSetHead(t, client) }, }, { - "TestSubscribePendingTxs", + "TestSubscribePendingTxHashes", func(t *testing.T) { testSubscribePendingTransactions(t, client) }, + }, { + "TestSubscribePendingTxs", + func(t *testing.T) { testSubscribeFullPendingTransactions(t, client) }, }, { "TestCallContract", func(t *testing.T) { testCallContract(t, client) }, @@ -303,6 +306,40 @@ func testSubscribePendingTransactions(t *testing.T, client *rpc.Client) { } } +func testSubscribeFullPendingTransactions(t *testing.T, client *rpc.Client) { + ec := New(client) + ethcl := ethclient.NewClient(client) + // Subscribe to Transactions + ch := make(chan *types.Transaction) + ec.SubscribeFullPendingTransactions(context.Background(), ch) + // Send a transaction + chainID, err := ethcl.ChainID(context.Background()) + if err != nil { + t.Fatal(err) + } + // Create transaction + tx := types.NewTransaction(1, common.Address{1}, big.NewInt(1), 22000, big.NewInt(1), nil) + signer := types.LatestSignerForChainID(chainID) + signature, err := crypto.Sign(signer.Hash(tx).Bytes(), testKey) + if err != nil { + t.Fatal(err) + } + signedTx, err := tx.WithSignature(signer, signature) + if err != nil { + t.Fatal(err) + } + // Send transaction + err = ethcl.SendTransaction(context.Background(), signedTx) + if err != nil { + t.Fatal(err) + } + // Check that the transaction was send over the channel + tx = <-ch + if tx.Hash() != signedTx.Hash() { + t.Fatalf("Invalid tx hash received, got %v, want %v", tx.Hash(), signedTx.Hash()) + } +} + func testCallContract(t *testing.T, client *rpc.Client) { ec := New(client) msg := ethereum.CallMsg{