eth/filters: send rpctransactions in pending-subscription (#26126)

This PR changes the pending tx subscription to return RPCTransaction types instead of normal Transaction objects. This will fix the inconsistencies with other tx returning API methods (i.e. getTransactionByHash), and also fill in the sender value for the tx.

co-authored by @s1na
pull/26180/head
Martin Holst Swende 2 years ago committed by GitHub
parent e34e540e4c
commit 8c5ce1107b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      accounts/abi/bind/backends/simulated.go
  2. 6
      eth/filters/api.go
  3. 3
      eth/filters/filter_system.go
  4. 8
      eth/filters/filter_system_test.go
  5. 16
      internal/ethapi/api.go
  6. 10
      internal/ethapi/backend.go

@ -915,6 +915,14 @@ func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.Matche
panic("not supported")
}
func (fb *filterBackend) ChainConfig() *params.ChainConfig {
panic("not supported")
}
func (fb *filterBackend) CurrentHeader() *types.Header {
panic("not supported")
}
func nullSubscription() event.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
<-quit

@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/rpc"
)
@ -147,15 +148,18 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool)
go func() {
txs := make(chan []*types.Transaction, 128)
pendingTxSub := api.events.SubscribePendingTxs(txs)
chainConfig := api.sys.backend.ChainConfig()
for {
select {
case txs := <-txs:
// To keep the original behaviour, send a single tx hash in one notification.
// TODO(rjl493456442) Send a batch of tx hashes in one notification
latest := api.sys.backend.CurrentHeader()
for _, tx := range txs {
if fullTx != nil && *fullTx {
notifier.Notify(rpcSub.ID, tx)
rpcTx := ethapi.NewRPCPendingTransaction(tx, latest, chainConfig)
notifier.Notify(rpcSub.ID, rpcTx)
} else {
notifier.Notify(rpcSub.ID, tx.Hash())
}

@ -33,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
lru "github.com/hashicorp/golang-lru"
)
@ -61,6 +62,8 @@ type Backend interface {
GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error)
PendingBlockAndReceipts() (*types.Block, types.Receipts)
CurrentHeader() *types.Header
ChainConfig() *params.ChainConfig
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription

@ -50,6 +50,14 @@ type testBackend struct {
chainFeed event.Feed
}
func (b *testBackend) ChainConfig() *params.ChainConfig {
panic("implement me")
}
func (b *testBackend) CurrentHeader() *types.Header {
panic("implement me")
}
func (b *testBackend) ChainDb() ethdb.Database {
return b.db
}

@ -171,7 +171,7 @@ func (s *TxPoolAPI) Content() map[string]map[string]map[string]*RPCTransaction {
for account, txs := range pending {
dump := make(map[string]*RPCTransaction)
for _, tx := range txs {
dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())
dump[fmt.Sprintf("%d", tx.Nonce())] = NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())
}
content["pending"][account.Hex()] = dump
}
@ -179,7 +179,7 @@ func (s *TxPoolAPI) Content() map[string]map[string]map[string]*RPCTransaction {
for account, txs := range queue {
dump := make(map[string]*RPCTransaction)
for _, tx := range txs {
dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())
dump[fmt.Sprintf("%d", tx.Nonce())] = NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())
}
content["queued"][account.Hex()] = dump
}
@ -195,14 +195,14 @@ func (s *TxPoolAPI) ContentFrom(addr common.Address) map[string]map[string]*RPCT
// Build the pending transactions
dump := make(map[string]*RPCTransaction, len(pending))
for _, tx := range pending {
dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())
dump[fmt.Sprintf("%d", tx.Nonce())] = NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())
}
content["pending"] = dump
// Build the queued transactions
dump = make(map[string]*RPCTransaction, len(queue))
for _, tx := range queue {
dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())
dump[fmt.Sprintf("%d", tx.Nonce())] = NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig())
}
content["queued"] = dump
@ -1344,8 +1344,8 @@ func newRPCTransaction(tx *types.Transaction, blockHash common.Hash, blockNumber
return result
}
// newRPCPendingTransaction returns a pending transaction that will serialize to the RPC representation
func newRPCPendingTransaction(tx *types.Transaction, current *types.Header, config *params.ChainConfig) *RPCTransaction {
// NewRPCPendingTransaction returns a pending transaction that will serialize to the RPC representation
func NewRPCPendingTransaction(tx *types.Transaction, current *types.Header, config *params.ChainConfig) *RPCTransaction {
var baseFee *big.Int
blockNumber := uint64(0)
if current != nil {
@ -1577,7 +1577,7 @@ func (s *TransactionAPI) GetTransactionByHash(ctx context.Context, hash common.H
}
// No finalized transaction, try to retrieve it from the pool
if tx := s.b.GetPoolTransaction(hash); tx != nil {
return newRPCPendingTransaction(tx, s.b.CurrentHeader(), s.b.ChainConfig()), nil
return NewRPCPendingTransaction(tx, s.b.CurrentHeader(), s.b.ChainConfig()), nil
}
// Transaction unknown, return as such
@ -1847,7 +1847,7 @@ func (s *TransactionAPI) PendingTransactions() ([]*RPCTransaction, error) {
for _, tx := range pending {
from, _ := types.Sender(s.signer, tx)
if _, exists := accounts[from]; exists {
transactions = append(transactions, newRPCPendingTransaction(tx, curHeader, s.b.ChainConfig()))
transactions = append(transactions, NewRPCPendingTransaction(tx, curHeader, s.b.ChainConfig()))
}
}
return transactions, nil

@ -27,10 +27,10 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
@ -87,9 +87,15 @@ type Backend interface {
ChainConfig() *params.ChainConfig
Engine() consensus.Engine
// This is copied from filters.Backend
// eth/filters needs to be initialized from this backend type, so methods needed by
// it must also be included here.
filters.Backend
GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error)
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
BloomStatus() (uint64, uint64)
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
}
func GetAPIs(apiBackend Backend) []rpc.API {

Loading…
Cancel
Save