From 40cdcf8c47ff094775aca08fd5d94051f9cf1dbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felf=C3=B6ldi=20Zsolt?= Date: Mon, 13 May 2019 13:41:10 +0200 Subject: [PATCH] les, light: implement ODR transaction lookup by hash (#19069) * les, light: implement ODR transaction lookup by hash * les: delete useless file * internal/ethapi: always use backend to find transaction * les, eth, internal/ethapi: renamed GetCanonicalTransaction to GetTransaction * light: add canonical header verification to GetTransaction --- eth/api_backend.go | 6 +++++ internal/ethapi/api.go | 23 ++++++++++++------- internal/ethapi/backend.go | 1 + les/api_backend.go | 4 ++++ les/backend.go | 3 ++- les/handler.go | 17 +++++++++----- les/handler_test.go | 26 +++++++++++----------- les/helper_test.go | 4 +++- les/odr.go | 1 + les/odr_requests.go | 40 +++++++++++++++++++++++++++++++++ les/odr_test.go | 45 +++++++++++++++++++++++++++++--------- les/peer.go | 2 +- les/protocol.go | 8 ------- les/txrelay.go | 15 +++++++++---- light/odr.go | 17 ++++++++++++++ light/odr_util.go | 21 ++++++++++++++++++ 16 files changed, 182 insertions(+), 51 deletions(-) diff --git a/eth/api_backend.go b/eth/api_backend.go index 9ac06ffa4f..db0e8cf410 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" @@ -178,6 +179,11 @@ func (b *EthAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction return b.eth.txPool.Get(hash) } +func (b *EthAPIBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) { + tx, blockHash, blockNumber, index := rawdb.ReadTransaction(b.eth.ChainDb(), txHash) + return tx, blockHash, blockNumber, index, nil +} + func (b *EthAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) { return b.eth.txPool.State().GetNonce(addr), nil } diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index e918df9bbb..07142d66be 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1155,25 +1155,32 @@ func (s *PublicTransactionPoolAPI) GetTransactionCount(ctx context.Context, addr } // GetTransactionByHash returns the transaction for the given hash -func (s *PublicTransactionPoolAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) *RPCTransaction { +func (s *PublicTransactionPoolAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) { // Try to return an already finalized transaction - if tx, blockHash, blockNumber, index := rawdb.ReadTransaction(s.b.ChainDb(), hash); tx != nil { - return newRPCTransaction(tx, blockHash, blockNumber, index) + tx, blockHash, blockNumber, index, err := s.b.GetTransaction(ctx, hash) + if err != nil { + return nil, err + } + if tx != nil { + return newRPCTransaction(tx, blockHash, blockNumber, index), nil } // No finalized transaction, try to retrieve it from the pool if tx := s.b.GetPoolTransaction(hash); tx != nil { - return newRPCPendingTransaction(tx) + return newRPCPendingTransaction(tx), nil } + // Transaction unknown, return as such - return nil + return nil, nil } // GetRawTransactionByHash returns the bytes of the transaction for the given hash. func (s *PublicTransactionPoolAPI) GetRawTransactionByHash(ctx context.Context, hash common.Hash) (hexutil.Bytes, error) { - var tx *types.Transaction - // Retrieve a finalized transaction, or a pooled otherwise - if tx, _, _, _ = rawdb.ReadTransaction(s.b.ChainDb(), hash); tx == nil { + tx, _, _, _, err := s.b.GetTransaction(ctx, hash) + if err != nil { + return nil, err + } + if tx == nil { if tx = s.b.GetPoolTransaction(hash); tx == nil { // Transaction not found anywhere, abort return nil, nil diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 0c6c7eace8..9229ccfb64 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -62,6 +62,7 @@ type Backend interface { // TxPool API SendTx(ctx context.Context, signedTx *types.Transaction) error + GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) GetPoolTransactions() (types.Transactions, error) GetPoolTransaction(txHash common.Hash) *types.Transaction GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) diff --git a/les/api_backend.go b/les/api_backend.go index 589cf572d2..6de15e7bd2 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -132,6 +132,10 @@ func (b *LesApiBackend) GetPoolTransaction(txHash common.Hash) *types.Transactio return b.eth.txPool.GetTransaction(txHash) } +func (b *LesApiBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) { + return light.GetTransaction(ctx, b.eth.odr, txHash) +} + func (b *LesApiBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) { return b.eth.txPool.GetNonce(ctx, addr) } diff --git a/les/backend.go b/les/backend.go index 80a912816f..887f882108 100644 --- a/les/backend.go +++ b/les/backend.go @@ -114,9 +114,9 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { if leth.config.ULC != nil { trustedNodes = leth.config.ULC.TrustedServers } - leth.relay = NewLesTxRelay(peers, leth.reqDist) leth.serverPool = newServerPool(chainDb, quitSync, &leth.wg, trustedNodes) leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool) + leth.relay = NewLesTxRelay(peers, leth.retriever) leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.retriever) leth.chtIndexer = light.NewChtIndexer(chainDb, leth.odr, params.CHTFrequency, params.HelperTrieConfirmations) @@ -271,6 +271,7 @@ func (s *LightEthereum) Start(srvr *p2p.Server) error { // Ethereum protocol. func (s *LightEthereum) Stop() error { s.odr.Stop() + s.relay.Stop() s.bloomIndexer.Close() s.chtIndexer.Close() s.blockchain.Stop() diff --git a/les/handler.go b/les/handler.go index d46eeb03a5..c6baeece4d 100644 --- a/les/handler.go +++ b/les/handler.go @@ -979,7 +979,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrRequestRejected, "") } go func() { - stats := make([]txStatus, len(req.Txs)) + stats := make([]light.TxStatus, len(req.Txs)) for i, tx := range req.Txs { if i != 0 && !task.waitOrStop() { return @@ -1014,7 +1014,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrRequestRejected, "") } go func() { - stats := make([]txStatus, len(req.Hashes)) + stats := make([]light.TxStatus, len(req.Hashes)) for i, hash := range req.Hashes { if i != 0 && !task.waitOrStop() { return @@ -1032,7 +1032,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.Log().Trace("Received tx status response") var resp struct { ReqID, BV uint64 - Status []txStatus + Status []light.TxStatus } if err := msg.Decode(&resp); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) @@ -1040,6 +1040,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.fcServer.ReceivedReply(resp.ReqID, resp.BV) + p.Log().Trace("Received helper trie proof response") + deliverMsg = &Msg{ + MsgType: MsgTxStatus, + ReqID: resp.ReqID, + Obj: resp.Status, + } + default: p.Log().Trace("Received unknown message", "code", msg.Code) return errResp(ErrInvalidMsgCode, "%v", msg.Code) @@ -1097,8 +1104,8 @@ func (pm *ProtocolManager) getHelperTrieAuxData(req HelperTrieReq) []byte { return nil } -func (pm *ProtocolManager) txStatus(hash common.Hash) txStatus { - var stat txStatus +func (pm *ProtocolManager) txStatus(hash common.Hash) light.TxStatus { + var stat light.TxStatus stat.Status = pm.txpool.Status([]common.Hash{hash})[0] // If the transaction is unknown to the pool, try looking it up locally if stat.Status == core.TxStatusUnknown { diff --git a/les/handler_test.go b/les/handler_test.go index c659f8088b..d8051c1450 100644 --- a/les/handler_test.go +++ b/les/handler_test.go @@ -443,7 +443,7 @@ func TestTransactionStatusLes2(t *testing.T) { var reqID uint64 - test := func(tx *types.Transaction, send bool, expStatus txStatus) { + test := func(tx *types.Transaction, send bool, expStatus light.TxStatus) { reqID++ if send { cost := peer.GetRequestCost(SendTxV2Msg, 1) @@ -452,7 +452,7 @@ func TestTransactionStatusLes2(t *testing.T) { cost := peer.GetRequestCost(GetTxStatusMsg, 1) sendRequest(peer.app, GetTxStatusMsg, reqID, cost, []common.Hash{tx.Hash()}) } - if err := expectResponse(peer.app, TxStatusMsg, reqID, testBufLimit, []txStatus{expStatus}); err != nil { + if err := expectResponse(peer.app, TxStatusMsg, reqID, testBufLimit, []light.TxStatus{expStatus}); err != nil { t.Errorf("transaction status mismatch") } } @@ -461,20 +461,20 @@ func TestTransactionStatusLes2(t *testing.T) { // test error status by sending an underpriced transaction tx0, _ := types.SignTx(types.NewTransaction(0, acc1Addr, big.NewInt(10000), params.TxGas, nil, nil), signer, testBankKey) - test(tx0, true, txStatus{Status: core.TxStatusUnknown, Error: core.ErrUnderpriced.Error()}) + test(tx0, true, light.TxStatus{Status: core.TxStatusUnknown, Error: core.ErrUnderpriced.Error()}) tx1, _ := types.SignTx(types.NewTransaction(0, acc1Addr, big.NewInt(10000), params.TxGas, big.NewInt(100000000000), nil), signer, testBankKey) - test(tx1, false, txStatus{Status: core.TxStatusUnknown}) // query before sending, should be unknown - test(tx1, true, txStatus{Status: core.TxStatusPending}) // send valid processable tx, should return pending - test(tx1, true, txStatus{Status: core.TxStatusPending}) // adding it again should not return an error + test(tx1, false, light.TxStatus{Status: core.TxStatusUnknown}) // query before sending, should be unknown + test(tx1, true, light.TxStatus{Status: core.TxStatusPending}) // send valid processable tx, should return pending + test(tx1, true, light.TxStatus{Status: core.TxStatusPending}) // adding it again should not return an error tx2, _ := types.SignTx(types.NewTransaction(1, acc1Addr, big.NewInt(10000), params.TxGas, big.NewInt(100000000000), nil), signer, testBankKey) tx3, _ := types.SignTx(types.NewTransaction(2, acc1Addr, big.NewInt(10000), params.TxGas, big.NewInt(100000000000), nil), signer, testBankKey) // send transactions in the wrong order, tx3 should be queued - test(tx3, true, txStatus{Status: core.TxStatusQueued}) - test(tx2, true, txStatus{Status: core.TxStatusPending}) + test(tx3, true, light.TxStatus{Status: core.TxStatusQueued}) + test(tx2, true, light.TxStatus{Status: core.TxStatusPending}) // query again, now tx3 should be pending too - test(tx3, false, txStatus{Status: core.TxStatusPending}) + test(tx3, false, light.TxStatus{Status: core.TxStatusPending}) // generate and add a block with tx1 and tx2 included gchain, _ := core.GenerateChain(params.TestChainConfig, chain.GetBlockByNumber(0), ethash.NewFaker(), db, 1, func(i int, block *core.BlockGen) { @@ -497,8 +497,8 @@ func TestTransactionStatusLes2(t *testing.T) { // check if their status is included now block1hash := rawdb.ReadCanonicalHash(db, 1) - test(tx1, false, txStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.LegacyTxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 0}}) - test(tx2, false, txStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.LegacyTxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 1}}) + test(tx1, false, light.TxStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.LegacyTxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 0}}) + test(tx2, false, light.TxStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.LegacyTxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 1}}) // create a reorg that rolls them back gchain, _ = core.GenerateChain(params.TestChainConfig, chain.GetBlockByNumber(0), ethash.NewFaker(), db, 2, func(i int, block *core.BlockGen) {}) @@ -516,6 +516,6 @@ func TestTransactionStatusLes2(t *testing.T) { t.Fatalf("pending count mismatch: have %d, want 3", pending) } // check if their status is pending again - test(tx1, false, txStatus{Status: core.TxStatusPending}) - test(tx2, false, txStatus{Status: core.TxStatusPending}) + test(tx1, false, light.TxStatus{Status: core.TxStatusPending}) + test(tx2, false, light.TxStatus{Status: core.TxStatusPending}) } diff --git a/les/helper_test.go b/les/helper_test.go index 020c69e175..9a302f8370 100644 --- a/les/helper_test.go +++ b/les/helper_test.go @@ -148,6 +148,7 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor } genesis = gspec.MustCommit(db) chain BlockChain + pool txPool ) if peers == nil { peers = newPeerSet() @@ -162,13 +163,14 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor panic(err) } chain = blockchain + pool = core.NewTxPool(core.DefaultTxPoolConfig, gspec.Config, blockchain) } indexConfig := light.TestServerIndexerConfig if lightSync { indexConfig = light.TestClientIndexerConfig } - pm, err := NewProtocolManager(gspec.Config, indexConfig, lightSync, NetworkId, evmux, engine, peers, chain, nil, db, odr, nil, nil, make(chan struct{}), new(sync.WaitGroup), ulcConfig) + pm, err := NewProtocolManager(gspec.Config, indexConfig, lightSync, NetworkId, evmux, engine, peers, chain, pool, db, odr, nil, nil, make(chan struct{}), new(sync.WaitGroup), ulcConfig) if err != nil { return nil, err } diff --git a/les/odr.go b/les/odr.go index daf2ea19ed..9176924cb8 100644 --- a/les/odr.go +++ b/les/odr.go @@ -86,6 +86,7 @@ const ( MsgReceipts MsgProofsV2 MsgHelperTrieProofs + MsgTxStatus ) // Msg encodes a LES message that delivers reply data for a request diff --git a/les/odr_requests.go b/les/odr_requests.go index 66d6175b8a..328750d7ab 100644 --- a/les/odr_requests.go +++ b/les/odr_requests.go @@ -66,6 +66,8 @@ func LesRequest(req light.OdrRequest) LesOdrRequest { return (*ChtRequest)(r) case *light.BloomRequest: return (*BloomRequest)(r) + case *light.TxStatusRequest: + return (*TxStatusRequest)(r) default: return nil } @@ -471,6 +473,44 @@ func (r *BloomRequest) Validate(db ethdb.Database, msg *Msg) error { return nil } +// TxStatusRequest is the ODR request type for transaction status +type TxStatusRequest light.TxStatusRequest + +// GetCost returns the cost of the given ODR request according to the serving +// peer's cost table (implementation of LesOdrRequest) +func (r *TxStatusRequest) GetCost(peer *peer) uint64 { + return peer.GetRequestCost(GetTxStatusMsg, len(r.Hashes)) +} + +// CanSend tells if a certain peer is suitable for serving the given request +func (r *TxStatusRequest) CanSend(peer *peer) bool { + return peer.version >= lpv2 +} + +// Request sends an ODR request to the LES network (implementation of LesOdrRequest) +func (r *TxStatusRequest) Request(reqID uint64, peer *peer) error { + peer.Log().Debug("Requesting transaction status", "count", len(r.Hashes)) + return peer.RequestTxStatus(reqID, r.GetCost(peer), r.Hashes) +} + +// Valid processes an ODR request reply message from the LES network +// returns true and stores results in memory if the message was a valid reply +// to the request (implementation of LesOdrRequest) +func (r *TxStatusRequest) Validate(db ethdb.Database, msg *Msg) error { + log.Debug("Validating transaction status", "count", len(r.Hashes)) + + // Ensure we have a correct message with a single block body + if msg.MsgType != MsgTxStatus { + return errInvalidMessageType + } + status := msg.Obj.([]light.TxStatus) + if len(status) != len(r.Hashes) { + return errInvalidEntryCount + } + r.Status = status + return nil +} + // readTraceDB stores the keys of database reads. We use this to check that received node // sets contain only the trie nodes necessary to make proofs pass. type readTraceDB struct { diff --git a/les/odr_test.go b/les/odr_test.go index 2cc28e3844..a1d5479563 100644 --- a/les/odr_test.go +++ b/les/odr_test.go @@ -38,7 +38,7 @@ import ( type odrTestFn func(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte -func TestOdrGetBlockLes2(t *testing.T) { testOdr(t, 2, 1, odrGetBlock) } +func TestOdrGetBlockLes2(t *testing.T) { testOdr(t, 2, 1, true, odrGetBlock) } func odrGetBlock(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte { var block *types.Block @@ -54,7 +54,7 @@ func odrGetBlock(ctx context.Context, db ethdb.Database, config *params.ChainCon return rlp } -func TestOdrGetReceiptsLes2(t *testing.T) { testOdr(t, 2, 1, odrGetReceipts) } +func TestOdrGetReceiptsLes2(t *testing.T) { testOdr(t, 2, 1, true, odrGetReceipts) } func odrGetReceipts(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte { var receipts types.Receipts @@ -74,7 +74,7 @@ func odrGetReceipts(ctx context.Context, db ethdb.Database, config *params.Chain return rlp } -func TestOdrAccountsLes2(t *testing.T) { testOdr(t, 2, 1, odrAccounts) } +func TestOdrAccountsLes2(t *testing.T) { testOdr(t, 2, 1, true, odrAccounts) } func odrAccounts(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte { dummyAddr := common.HexToAddress("1234567812345678123456781234567812345678") @@ -102,7 +102,7 @@ func odrAccounts(ctx context.Context, db ethdb.Database, config *params.ChainCon return res } -func TestOdrContractCallLes2(t *testing.T) { testOdr(t, 2, 2, odrContractCall) } +func TestOdrContractCallLes2(t *testing.T) { testOdr(t, 2, 2, true, odrContractCall) } type callmsg struct { types.Message @@ -151,8 +151,32 @@ func odrContractCall(ctx context.Context, db ethdb.Database, config *params.Chai return res } +func TestOdrTxStatusLes2(t *testing.T) { testOdr(t, 2, 1, false, odrTxStatus) } + +func odrTxStatus(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte { + var txs types.Transactions + if bc != nil { + block := bc.GetBlockByHash(bhash) + txs = block.Transactions() + } else { + if block, _ := lc.GetBlockByHash(ctx, bhash); block != nil { + btxs := block.Transactions() + txs = make(types.Transactions, len(btxs)) + for i, tx := range btxs { + var err error + txs[i], _, _, _, err = light.GetTransaction(ctx, lc.Odr(), tx.Hash()) + if err != nil { + return nil + } + } + } + } + rlp, _ := rlp.EncodeToBytes(txs) + return rlp +} + // testOdr tests odr requests whose validation guaranteed by block headers. -func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) { +func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn odrTestFn) { // Assemble the test environment server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, true) defer tearDown() @@ -193,9 +217,10 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) { client.rPeer.hasBlock = func(common.Hash, uint64, bool) bool { return true } client.peers.lock.Unlock() test(5) - - // still expect all retrievals to pass, now data should be cached locally - client.peers.Unregister(client.rPeer.id) - time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed - test(5) + if checkCached { + // still expect all retrievals to pass, now data should be cached locally + client.peers.Unregister(client.rPeer.id) + time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed + test(5) + } } diff --git a/les/peer.go b/les/peer.go index 7a163cd1d0..42c13ab7d1 100644 --- a/les/peer.go +++ b/les/peer.go @@ -317,7 +317,7 @@ func (p *peer) ReplyHelperTrieProofs(reqID uint64, resp HelperTrieResps) *reply } // ReplyTxStatus creates a reply with a batch of transaction status records, corresponding to the ones requested. -func (p *peer) ReplyTxStatus(reqID uint64, stats []txStatus) *reply { +func (p *peer) ReplyTxStatus(reqID uint64, stats []light.TxStatus) *reply { data, _ := rlp.EncodeToBytes(stats) return &reply{p.rw, TxStatusMsg, reqID, data} } diff --git a/les/protocol.go b/les/protocol.go index 86e450d01c..24dc9bdeae 100644 --- a/les/protocol.go +++ b/les/protocol.go @@ -24,8 +24,6 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rlp" @@ -227,9 +225,3 @@ type CodeData []struct { } type proofsData [][]rlp.RawValue - -type txStatus struct { - Status core.TxStatus - Lookup *rawdb.LegacyTxLookupEntry `rlp:"nil"` - Error string -} diff --git a/les/txrelay.go b/les/txrelay.go index a790bbec9c..5ebef1c226 100644 --- a/les/txrelay.go +++ b/les/txrelay.go @@ -17,6 +17,7 @@ package les import ( + "context" "sync" "github.com/ethereum/go-ethereum/common" @@ -36,21 +37,27 @@ type LesTxRelay struct { peerList []*peer peerStartPos int lock sync.RWMutex + stop chan struct{} - reqDist *requestDistributor + retriever *retrieveManager } -func NewLesTxRelay(ps *peerSet, reqDist *requestDistributor) *LesTxRelay { +func NewLesTxRelay(ps *peerSet, retriever *retrieveManager) *LesTxRelay { r := &LesTxRelay{ txSent: make(map[common.Hash]*ltrInfo), txPending: make(map[common.Hash]struct{}), ps: ps, - reqDist: reqDist, + retriever: retriever, + stop: make(chan struct{}), } ps.notify(r) return r } +func (self *LesTxRelay) Stop() { + close(self.stop) +} + func (self *LesTxRelay) registerPeer(p *peer) { self.lock.Lock() defer self.lock.Unlock() @@ -132,7 +139,7 @@ func (self *LesTxRelay) send(txs types.Transactions, count int) { return func() { peer.SendTxs(reqID, cost, enc) } }, } - self.reqDist.queue(rq) + go self.retriever.retrieve(context.Background(), reqID, rq, func(p distPeer, msg *Msg) error { return nil }, self.stop) } } diff --git a/light/odr.go b/light/odr.go index 95f1948e75..d1185e4e07 100644 --- a/light/odr.go +++ b/light/odr.go @@ -175,3 +175,20 @@ func (req *BloomRequest) StoreResult(db ethdb.Database) { rawdb.WriteBloomBits(db, req.BitIdx, sectionIdx, sectionHead, req.BloomBits[i]) } } + +// TxStatus describes the status of a transaction +type TxStatus struct { + Status core.TxStatus + Lookup *rawdb.LegacyTxLookupEntry `rlp:"nil"` + Error string +} + +// TxStatusRequest is the ODR request type for retrieving transaction status +type TxStatusRequest struct { + OdrRequest + Hashes []common.Hash + Status []TxStatus +} + +// StoreResult stores the retrieved data in local database +func (req *TxStatusRequest) StoreResult(db ethdb.Database) {} diff --git a/light/odr_util.go b/light/odr_util.go index cce532f8e7..100bd58428 100644 --- a/light/odr_util.go +++ b/light/odr_util.go @@ -21,6 +21,7 @@ import ( "context" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" @@ -227,3 +228,23 @@ func GetBloomBits(ctx context.Context, odr OdrBackend, bitIdx uint, sectionIdxLi return result, nil } } + +// GetTransaction retrieves a canonical transaction by hash and also returns its position in the chain +func GetTransaction(ctx context.Context, odr OdrBackend, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) { + r := &TxStatusRequest{Hashes: []common.Hash{txHash}} + if err := odr.Retrieve(ctx, r); err != nil || r.Status[0].Status != core.TxStatusIncluded { + return nil, common.Hash{}, 0, 0, err + } else { + pos := r.Status[0].Lookup + // first ensure that we have the header, otherwise block body retrieval will fail + // also verify if this is a canonical block by getting the header by number and checking its hash + if header, err := GetHeaderByNumber(ctx, odr, pos.BlockIndex); err != nil || header.Hash() != pos.BlockHash { + return nil, common.Hash{}, 0, 0, err + } + if body, err := GetBody(ctx, odr, pos.BlockHash, pos.BlockIndex); err != nil || uint64(len(body.Transactions)) <= pos.Index || body.Transactions[pos.Index].Hash() != txHash { + return nil, common.Hash{}, 0, 0, err + } else { + return body.Transactions[pos.Index], pos.BlockHash, pos.BlockIndex, pos.Index, nil + } + } +}