From 0a623896b1699c0e7c518d4ac5dfda7097beb23d Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Fri, 1 Mar 2024 16:44:11 +0100 Subject: [PATCH] eth/protocols/eth: started work on eth/69, drop receipt blooms --- cmd/devp2p/internal/ethtest/conn.go | 10 ++- core/blockchain_reader.go | 14 ++++ core/rawdb/accessors_chain.go | 4 +- core/rawdb/accessors_chain_test.go | 6 +- core/types/receipt.go | 6 +- core/types/receipt_test.go | 8 ++ eth/downloader/downloader_test.go | 2 +- eth/protocols/eth/handler.go | 26 ++++++- eth/protocols/eth/handler_test.go | 71 ++++++++++++++++++ eth/protocols/eth/handlers.go | 109 ++++++++++++++++++++++++++-- eth/protocols/eth/handshake.go | 9 ++- eth/protocols/eth/protocol.go | 5 +- 12 files changed, 247 insertions(+), 23 deletions(-) diff --git a/cmd/devp2p/internal/ethtest/conn.go b/cmd/devp2p/internal/ethtest/conn.go index 757b137aa1..b2778d6486 100644 --- a/cmd/devp2p/internal/ethtest/conn.go +++ b/cmd/devp2p/internal/ethtest/conn.go @@ -66,10 +66,10 @@ func (s *Suite) dialAs(key *ecdsa.PrivateKey) (*Conn, error) { return nil, err } conn.caps = []p2p.Cap{ - {Name: "eth", Version: 67}, {Name: "eth", Version: 68}, + {Name: "eth", Version: 69}, } - conn.ourHighestProtoVersion = 68 + conn.ourHighestProtoVersion = 69 return &conn, nil } @@ -316,8 +316,10 @@ loop: return fmt.Errorf("wrong head block in status, want: %#x (block %d) have %#x", want, chain.blocks[chain.Len()-1].NumberU64(), have) } - if have, want := msg.TD.Cmp(chain.TD()), 0; have != want { - return fmt.Errorf("wrong TD in status: have %v want %v", have, want) + if c.negotiatedProtoVersion < 69 { + if have, want := msg.TD.Cmp(chain.TD()), 0; have != want { + return fmt.Errorf("wrong TD in status: have %v want %v", have, want) + } } if have, want := msg.ForkID, chain.ForkID(); !reflect.DeepEqual(have, want) { return fmt.Errorf("wrong fork ID in status: have %v, want %v", have, want) diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index 19c1b17f36..b0bc416b07 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -230,6 +230,20 @@ func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts { return receipts } +// GetRawReceiptsByHash retrieves the receipts for all transactions in a given block +// without deriving the internal fields and the Bloom. +func (bc *BlockChain) GetRawReceiptsByHash(hash common.Hash) rlp.RawValue { + number := rawdb.ReadHeaderNumber(bc.db, hash) + if number == nil { + return nil + } + receipts := rawdb.ReadReceiptsRLP(bc.db, hash, *number) + if receipts == nil { + return nil + } + return receipts +} + // GetUnclesInChain retrieves all the uncles from a given block backwards until // a specific distance is reached. func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.Header { diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index c4735c850c..23d879c82f 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -585,8 +585,8 @@ func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawVa } // ReadRawReceipts retrieves all the transaction receipts belonging to a block. -// The receipt metadata fields are not guaranteed to be populated, so they -// should not be used. Use ReadReceipts instead if the metadata is needed. +// The receipt metadata fields and the Bloom are not guaranteed to be populated, +// so they should not be used. Use ReadReceipts instead if the metadata is needed. func ReadRawReceipts(db ethdb.Reader, hash common.Hash, number uint64) types.Receipts { // Retrieve the flattened receipt slice data := ReadReceiptsRLP(db, hash, number) diff --git a/core/rawdb/accessors_chain_test.go b/core/rawdb/accessors_chain_test.go index 0e986f66e0..669f97163c 100644 --- a/core/rawdb/accessors_chain_test.go +++ b/core/rawdb/accessors_chain_test.go @@ -400,7 +400,11 @@ func TestBlockReceiptStorage(t *testing.T) { t.Fatalf("receipts returned when body was deleted: %v", rs) } // Ensure that receipts without metadata can be returned without the block body too - if err := checkReceiptsRLP(ReadRawReceipts(db, hash, 0), receipts); err != nil { + raw := ReadRawReceipts(db, hash, 0) + for _, r := range raw { + r.Bloom = types.CreateBloom(types.Receipts{r}) + } + if err := checkReceiptsRLP(raw, receipts); err != nil { t.Fatal(err) } // Sanity check that body alone without the receipt is a full purge diff --git a/core/types/receipt.go b/core/types/receipt.go index 4f96fde59c..939d1cf621 100644 --- a/core/types/receipt.go +++ b/core/types/receipt.go @@ -258,7 +258,7 @@ func (r *Receipt) Size() common.StorageSize { } // ReceiptForStorage is a wrapper around a Receipt with RLP serialization -// that omits the Bloom field and deserialization that re-computes it. +// that omits the Bloom field. The Bloom field is recomputed by DeriveFields. type ReceiptForStorage Receipt // EncodeRLP implements rlp.Encoder, and flattens all content fields of a receipt @@ -291,7 +291,6 @@ func (r *ReceiptForStorage) DecodeRLP(s *rlp.Stream) error { } r.CumulativeGasUsed = stored.CumulativeGasUsed r.Logs = stored.Logs - r.Bloom = CreateBloom(Receipts{(*Receipt)(r)}) return nil } @@ -372,6 +371,9 @@ func (rs Receipts) DeriveFields(config *params.ChainConfig, hash common.Hash, nu rs[i].Logs[j].Index = logIndex logIndex++ } + // also derive the Bloom if not derived yet + rs[i].Bloom = CreateBloom(Receipts{rs[i]}) } + return nil } diff --git a/core/types/receipt_test.go b/core/types/receipt_test.go index fc51eb11a5..25dbf27aca 100644 --- a/core/types/receipt_test.go +++ b/core/types/receipt_test.go @@ -296,6 +296,13 @@ var ( } ) +func init() { + // Correctly compute the bloom filters + for _, receipt := range receipts { + receipt.Bloom = CreateBloom(Receipts{receipt}) + } +} + func TestDecodeEmptyTypedReceipt(t *testing.T) { input := []byte{0x80} var r Receipt @@ -511,6 +518,7 @@ func clearComputedFieldsOnReceipt(receipt *Receipt) *Receipt { cpy.EffectiveGasPrice = big.NewInt(0) cpy.BlobGasUsed = 0 cpy.BlobGasPrice = nil + cpy.Bloom = CreateBloom(Receipts{&cpy}) return &cpy } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 47c89bf768..b7c912eb1c 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -262,7 +262,7 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et // peer in the download tester. The returned function can be used to retrieve // batches of block receipts from the particularly requested peer. func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, sink chan *eth.Response) (*eth.Request, error) { - blobs := eth.ServiceGetReceiptsQuery(dlp.chain, hashes) + blobs := eth.ServiceGetReceiptsQuery68(dlp.chain, hashes) receipts := make([][]*types.Receipt, len(blobs)) for i, blob := range blobs { diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index dc32559c47..0a71262f2c 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -168,8 +168,21 @@ var eth68 = map[uint64]msgHandler{ BlockHeadersMsg: handleBlockHeaders, GetBlockBodiesMsg: handleGetBlockBodies, BlockBodiesMsg: handleBlockBodies, - GetReceiptsMsg: handleGetReceipts, - ReceiptsMsg: handleReceipts, + GetReceiptsMsg: handleGetReceipts68, + ReceiptsMsg: handleReceipts68, + GetPooledTransactionsMsg: handleGetPooledTransactions, + PooledTransactionsMsg: handlePooledTransactions, +} + +var eth69 = map[uint64]msgHandler{ + TransactionsMsg: handleTransactions, + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes, + GetBlockHeadersMsg: handleGetBlockHeaders, + BlockHeadersMsg: handleBlockHeaders, + GetBlockBodiesMsg: handleGetBlockBodies, + BlockBodiesMsg: handleBlockBodies, + GetReceiptsMsg: handleGetReceipts69, + ReceiptsMsg: handleReceipts69, GetPooledTransactionsMsg: handleGetPooledTransactions, PooledTransactionsMsg: handlePooledTransactions, } @@ -187,7 +200,14 @@ func handleMessage(backend Backend, peer *Peer) error { } defer msg.Discard() - var handlers = eth68 + var handlers map[uint64]msgHandler + if peer.version == ETH68 { + handlers = eth68 + } else if peer.version == ETH69 { + handlers = eth69 + } else { + return fmt.Errorf("unknown eth protocol version: %v", peer.version) + } // Track the amount of time it takes to serve the request and run the handler if metrics.Enabled { diff --git a/eth/protocols/eth/handler_test.go b/eth/protocols/eth/handler_test.go index d0509f2f3d..c1670f99b2 100644 --- a/eth/protocols/eth/handler_test.go +++ b/eth/protocols/eth/handler_test.go @@ -18,6 +18,7 @@ package eth import ( "bytes" + "io" "math" "math/big" "math/rand" @@ -576,3 +577,73 @@ func FuzzEthProtocolHandlers(f *testing.F) { handler(backend, decoder{msg: msg}, peer.Peer) }) } + +type receiptForNetwork struct { + Type byte + Status uint64 + GasUsed uint64 + Logs []*types.Log +} + +func (r *receiptForNetwork) EncodeRLP(_w io.Writer) error { + data := &types.ReceiptForStorage{Status: r.Status, CumulativeGasUsed: r.GasUsed, Logs: r.Logs} + if r.Type == types.LegacyTxType { + return rlp.Encode(_w, data) + } + w := rlp.NewEncoderBuffer(_w) + outerList := w.List() + w.Write([]byte{r.Type}) + if r.Status == types.ReceiptStatusSuccessful { + w.Write([]byte{0x01}) + } else { + w.Write([]byte{0x00}) + } + w.WriteUint64(r.GasUsed) + logList := w.List() + for _, log := range r.Logs { + if err := log.EncodeRLP(w); err != nil { + return err + } + } + w.ListEnd(logList) + w.ListEnd(outerList) + return w.Flush() +} + +func TestTransformReceipts(t *testing.T) { + tests := []struct { + input []types.ReceiptForStorage + txs []*types.Transaction + output []receiptForNetwork + }{ + { + input: []types.ReceiptForStorage{{CumulativeGasUsed: 123, Status: 1, Logs: nil}}, + txs: []*types.Transaction{types.NewTx(&types.LegacyTx{})}, + output: []receiptForNetwork{{GasUsed: 123, Status: 1, Logs: nil}}, + }, + { + input: []types.ReceiptForStorage{{CumulativeGasUsed: 123, Status: 1, Logs: nil}}, + txs: []*types.Transaction{types.NewTx(&types.DynamicFeeTx{})}, + output: []receiptForNetwork{{GasUsed: 123, Status: 1, Logs: nil, Type: 2}}, + }, + { + input: []types.ReceiptForStorage{{CumulativeGasUsed: 123, Status: 1, Logs: nil}}, + txs: []*types.Transaction{types.NewTx(&types.AccessListTx{})}, + output: []receiptForNetwork{{GasUsed: 123, Status: 1, Logs: nil, Type: 1}}, + }, + { + input: []types.ReceiptForStorage{{CumulativeGasUsed: 123, Status: 1, Logs: []*types.Log{{Address: common.Address{1}, Topics: []common.Hash{{1}}}}}}, + txs: []*types.Transaction{types.NewTx(&types.AccessListTx{})}, + output: []receiptForNetwork{{GasUsed: 123, Status: 1, Logs: []*types.Log{{Address: common.Address{1}, Topics: []common.Hash{{1}}}}, Type: 1}}, + }, + } + + for i, test := range tests { + in, _ := rlp.EncodeToBytes(test.input) + have := transformReceipts(in, test.txs) + out, _ := rlp.EncodeToBytes(test.output) + if !bytes.Equal(have, out) { + t.Fatalf("transforming receipt mismatch, test %v: want %v have %v", i, out, have) + } + } +} diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index b3886270f3..171593ce6e 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -17,6 +17,7 @@ package eth import ( + "bytes" "encoding/json" "errors" "fmt" @@ -236,19 +237,29 @@ func ServiceGetBlockBodiesQuery(chain *core.BlockChain, query GetBlockBodiesRequ return bodies } -func handleGetReceipts(backend Backend, msg Decoder, peer *Peer) error { +func handleGetReceipts68(backend Backend, msg Decoder, peer *Peer) error { // Decode the block receipts retrieval message var query GetReceiptsPacket if err := msg.Decode(&query); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } - response := ServiceGetReceiptsQuery(backend.Chain(), query.GetReceiptsRequest) + response := ServiceGetReceiptsQuery68(backend.Chain(), query.GetReceiptsRequest) return peer.ReplyReceiptsRLP(query.RequestId, response) } -// ServiceGetReceiptsQuery assembles the response to a receipt query. It is +func handleGetReceipts69(backend Backend, msg Decoder, peer *Peer) error { + // Decode the block receipts retrieval message + var query GetReceiptsPacket + if err := msg.Decode(&query); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + response := serviceGetReceiptsQuery69(backend.Chain(), query.GetReceiptsRequest) + return peer.ReplyReceiptsRLP(query.RequestId, response) +} + +// ServiceGetReceiptsQuery68 assembles the response to a receipt query. It is // exposed to allow external packages to test protocol behavior. -func ServiceGetReceiptsQuery(chain *core.BlockChain, query GetReceiptsRequest) []rlp.RawValue { +func ServiceGetReceiptsQuery68(chain *core.BlockChain, query GetReceiptsRequest) []rlp.RawValue { // Gather state data until the fetch or network limits is reached var ( bytes int @@ -277,6 +288,67 @@ func ServiceGetReceiptsQuery(chain *core.BlockChain, query GetReceiptsRequest) [ return receipts } +// serviceGetReceiptsQuery69 assembles the response to a receipt query. +// It does not send the bloom filters for the receipts +func serviceGetReceiptsQuery69(chain *core.BlockChain, query GetReceiptsRequest) []rlp.RawValue { + // Gather state data until the fetch or network limits is reached + var ( + bytes int + receipts []rlp.RawValue + ) + for lookups, hash := range query { + if bytes >= softResponseLimit || len(receipts) >= maxReceiptsServe || + lookups >= 2*maxReceiptsServe { + break + } + // Retrieve the requested block's receipts + results := chain.GetRawReceiptsByHash(hash) + if results == nil { + if header := chain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { + continue + } + } else { + header := chain.GetHeaderByHash(hash) + if header.ReceiptHash != types.EmptyReceiptsHash { + body := new(types.Body) + if err := rlp.DecodeBytes(chain.GetBodyRLP(hash), &body); err == nil { + results = transformReceipts(results, body.Transactions) + } + } + } + receipts = append(receipts, results) + bytes += len(results) + } + return receipts +} + +// transformReceipts takes a slice of rlp-encoded receipts, and transactions, +// and applies the type-encoding on the receipts (for non-legacy receipts). +// e.g. for non-legacy receipts: receipt-data -> {tx-type || receipt-data} +func transformReceipts(blockReceipts []byte, txs []*types.Transaction) []byte { + var ( + out bytes.Buffer + enc = rlp.NewEncoderBuffer(&out) + it, _ = rlp.NewListIterator(blockReceipts) + ) + outer := enc.List() + for i := 0; it.Next(); i++ { + if txs[i].Type() == types.LegacyTxType { + enc.Write(it.Value()) + continue + } + content, _, _ := rlp.SplitList(it.Value()) + receiptList := enc.List() + enc.Write([]byte{txs[i].Type()}) + enc.Write(content) + enc.ListEnd(receiptList) + } + enc.ListEnd(outer) + enc.Flush() + + return out.Bytes() +} + func handleNewBlockhashes(backend Backend, msg Decoder, peer *Peer) error { return errors.New("block announcements disallowed") // We dropped support for non-merge networks } @@ -334,12 +406,39 @@ func handleBlockBodies(backend Backend, msg Decoder, peer *Peer) error { }, metadata) } -func handleReceipts(backend Backend, msg Decoder, peer *Peer) error { +func handleReceipts68(backend Backend, msg Decoder, peer *Peer) error { + // A batch of receipts arrived to one of our previous requests + res := new(ReceiptsPacket) + if err := msg.Decode(res); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + metadata := func() interface{} { + hasher := trie.NewStackTrie(nil) + hashes := make([]common.Hash, len(res.ReceiptsResponse)) + for i, receipt := range res.ReceiptsResponse { + hashes[i] = types.DeriveSha(types.Receipts(receipt), hasher) + } + return hashes + } + return peer.dispatchResponse(&Response{ + id: res.RequestId, + code: ReceiptsMsg, + Res: &res.ReceiptsResponse, + }, metadata) +} + +func handleReceipts69(backend Backend, msg Decoder, peer *Peer) error { // A batch of receipts arrived to one of our previous requests res := new(ReceiptsPacket) if err := msg.Decode(res); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } + // calculate the bloom filter before dispatching + for _, receipts := range res.ReceiptsResponse { + for _, receipt := range receipts { + receipt.Bloom = types.CreateBloom(types.Receipts{receipt}) + } + } metadata := func() interface{} { hasher := trie.NewStackTrie(nil) hashes := make([]common.Hash, len(res.ReceiptsResponse)) diff --git a/eth/protocols/eth/handshake.go b/eth/protocols/eth/handshake.go index ea16a85b1e..cc50770f1b 100644 --- a/eth/protocols/eth/handshake.go +++ b/eth/protocols/eth/handshake.go @@ -43,14 +43,17 @@ func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis var status StatusPacket // safe to read after two values have been received from errc go func() { - errc <- p2p.Send(p.rw, StatusMsg, &StatusPacket{ + pkt := &StatusPacket{ ProtocolVersion: uint32(p.version), NetworkID: network, - TD: td, Head: head, Genesis: genesis, ForkID: forkID, - }) + } + if p.version == ETH68 { + pkt.TD = td + } + errc <- p2p.Send(p.rw, StatusMsg, pkt) }() go func() { errc <- p.readStatus(network, &status, genesis, forkFilter) diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index aeef4330ff..1f2ae7f0a8 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -31,6 +31,7 @@ import ( // Constants to match up protocol versions and messages const ( ETH68 = 68 + ETH69 = 69 ) // ProtocolName is the official short name of the `eth` protocol used during @@ -39,11 +40,11 @@ const ProtocolName = "eth" // ProtocolVersions are the supported versions of the `eth` protocol (first // is primary). -var ProtocolVersions = []uint{ETH68} +var ProtocolVersions = []uint{ETH69, ETH68} // protocolLengths are the number of implemented message corresponding to // different protocol versions. -var protocolLengths = map[uint]uint64{ETH68: 17} +var protocolLengths = map[uint]uint64{ETH68: 17, ETH69: 17} // maxMessageSize is the maximum cap on the size of a protocol message. const maxMessageSize = 10 * 1024 * 1024