@ -19,6 +19,7 @@ package blobpool
import (
import (
"container/heap"
"container/heap"
"errors"
"fmt"
"fmt"
"math"
"math"
"math/big"
"math/big"
@ -35,7 +36,6 @@ import (
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/metrics"
@ -83,16 +83,6 @@ const (
limboedTransactionStore = "limbo"
limboedTransactionStore = "limbo"
)
)
// blobTx is a wrapper around types.BlobTx which also contains the literal blob
// data along with all the transaction metadata.
type blobTx struct {
Tx * types . Transaction
Blobs [ ] kzg4844 . Blob
Commits [ ] kzg4844 . Commitment
Proofs [ ] kzg4844 . Proof
}
// blobTxMeta is the minimal subset of types.BlobTx necessary to validate and
// blobTxMeta is the minimal subset of types.BlobTx necessary to validate and
// schedule the blob transactions into the following blocks. Only ever add the
// schedule the blob transactions into the following blocks. Only ever add the
// bare minimum needed fields to keep the size down (and thus number of entries
// bare minimum needed fields to keep the size down (and thus number of entries
@ -455,22 +445,27 @@ func (p *BlobPool) Close() error {
// parseTransaction is a callback method on pool creation that gets called for
// parseTransaction is a callback method on pool creation that gets called for
// each transaction on disk to create the in-memory metadata index.
// each transaction on disk to create the in-memory metadata index.
func ( p * BlobPool ) parseTransaction ( id uint64 , size uint32 , blob [ ] byte ) error {
func ( p * BlobPool ) parseTransaction ( id uint64 , size uint32 , blob [ ] byte ) error {
item := new ( blobTx )
tx := new ( types . Transaction )
if err := rlp . DecodeBytes ( blob , item ) ; err != nil {
if err := rlp . DecodeBytes ( blob , tx ) ; err != nil {
// This path is impossible unless the disk data representation changes
// This path is impossible unless the disk data representation changes
// across restarts. For that ever unprobable case, recover gracefully
// across restarts. For that ever unprobable case, recover gracefully
// by ignoring this data entry.
// by ignoring this data entry.
log . Error ( "Failed to decode blob pool entry" , "id" , id , "err" , err )
log . Error ( "Failed to decode blob pool entry" , "id" , id , "err" , err )
return err
return err
}
}
meta := newBlobTxMeta ( id , size , item . Tx )
if tx . BlobTxSidecar ( ) == nil {
log . Error ( "Missing sidecar in blob pool entry" , "id" , id , "hash" , tx . Hash ( ) )
return errors . New ( "missing blob sidecar" )
}
meta := newBlobTxMeta ( id , size , tx )
sender , err := p . signer . Sender ( item . Tx )
sender , err := p . signer . Sender ( tx )
if err != nil {
if err != nil {
// This path is impossible unless the signature validity changes across
// This path is impossible unless the signature validity changes across
// restarts. For that ever unprobable case, recover gracefully by ignoring
// restarts. For that ever unprobable case, recover gracefully by ignoring
// this data entry.
// this data entry.
log . Error ( "Failed to recover blob tx sender" , "id" , id , "hash" , i tem . T x. Hash ( ) , "err" , err )
log . Error ( "Failed to recover blob tx sender" , "id" , id , "hash" , tx . Hash ( ) , "err" , err )
return err
return err
}
}
if _ , ok := p . index [ sender ] ; ! ok {
if _ , ok := p . index [ sender ] ; ! ok {
@ -718,17 +713,17 @@ func (p *BlobPool) offload(addr common.Address, nonce uint64, id uint64, inclusi
log . Error ( "Blobs missing for included transaction" , "from" , addr , "nonce" , nonce , "id" , id , "err" , err )
log . Error ( "Blobs missing for included transaction" , "from" , addr , "nonce" , nonce , "id" , id , "err" , err )
return
return
}
}
item := new ( blobTx )
var tx types . Transaction
if err = rlp . DecodeBytes ( data , item ) ; err != nil {
if err = rlp . DecodeBytes ( data , tx ) ; err != nil {
log . Error ( "Blobs corrupted for included transaction" , "from" , addr , "nonce" , nonce , "id" , id , "err" , err )
log . Error ( "Blobs corrupted for included transaction" , "from" , addr , "nonce" , nonce , "id" , id , "err" , err )
return
return
}
}
block , ok := inclusions [ i tem . T x. Hash ( ) ]
block , ok := inclusions [ tx . Hash ( ) ]
if ! ok {
if ! ok {
log . Warn ( "Blob transaction swapped out by signer" , "from" , addr , "nonce" , nonce , "id" , id )
log . Warn ( "Blob transaction swapped out by signer" , "from" , addr , "nonce" , nonce , "id" , id )
return
return
}
}
if err := p . limbo . push ( item . Tx . Hash ( ) , block , item . Blobs , item . Commits , item . Proofs ) ; err != nil {
if err := p . limbo . push ( & tx , block ) ; err != nil {
log . Warn ( "Failed to offload blob tx into limbo" , "err" , err )
log . Warn ( "Failed to offload blob tx into limbo" , "err" , err )
return
return
}
}
@ -760,7 +755,7 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
for addr , txs := range reinject {
for addr , txs := range reinject {
// Blindly push all the lost transactions back into the pool
// Blindly push all the lost transactions back into the pool
for _ , tx := range txs {
for _ , tx := range txs {
p . reinject ( addr , tx )
p . reinject ( addr , tx . Hash ( ) )
}
}
// Recheck the account's pooled transactions to drop included and
// Recheck the account's pooled transactions to drop included and
// invalidated one
// invalidated one
@ -920,16 +915,19 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*
// Note, the method will not initialize the eviction cache values as those will
// Note, the method will not initialize the eviction cache values as those will
// be done once for all transactions belonging to an account after all individual
// be done once for all transactions belonging to an account after all individual
// transactions are injected back into the pool.
// transactions are injected back into the pool.
func ( p * BlobPool ) reinject ( addr common . Address , tx * types . Transaction ) {
func ( p * BlobPool ) reinject ( addr common . Address , txhash common . Hash ) {
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
// add the transaction back into the pool as it is not mineable.
// add the transaction back into the pool as it is not mineable.
blobs , commi ts , proofs , err := p . limbo . pull ( tx . Hash ( ) )
tx , err := p . limbo . pull ( txhash )
if err != nil {
if err != nil {
log . Error ( "Blobs unavailable, dropping reorged tx" , "err" , err )
log . Error ( "Blobs unavailable, dropping reorged tx" , "err" , err )
return
return
}
}
// Serialize the transaction back into the primary datastore
// TODO: seems like an easy optimization here would be getting the serialized tx
blob , err := rlp . EncodeToBytes ( & blobTx { Tx : tx , Blobs : blobs , Commits : commits , Proofs : proofs } )
// from limbo instead of re-serializing it here.
// Serialize the transaction back into the primary datastore.
blob , err := rlp . EncodeToBytes ( tx )
if err != nil {
if err != nil {
log . Error ( "Failed to encode transaction for storage" , "hash" , tx . Hash ( ) , "err" , err )
log . Error ( "Failed to encode transaction for storage" , "hash" , tx . Hash ( ) , "err" , err )
return
return
@ -939,9 +937,9 @@ func (p *BlobPool) reinject(addr common.Address, tx *types.Transaction) {
log . Error ( "Failed to write transaction into storage" , "hash" , tx . Hash ( ) , "err" , err )
log . Error ( "Failed to write transaction into storage" , "hash" , tx . Hash ( ) , "err" , err )
return
return
}
}
// Update the indixes and metrics
// Update the indixes and metrics
meta := newBlobTxMeta ( id , p . store . Size ( id ) , tx )
meta := newBlobTxMeta ( id , p . store . Size ( id ) , tx )
if _ , ok := p . index [ addr ] ; ! ok {
if _ , ok := p . index [ addr ] ; ! ok {
if err := p . reserve ( addr , true ) ; err != nil {
if err := p . reserve ( addr , true ) ; err != nil {
log . Warn ( "Failed to reserve account for blob pool" , "tx" , tx . Hash ( ) , "from" , addr , "err" , err )
log . Warn ( "Failed to reserve account for blob pool" , "tx" , tx . Hash ( ) , "from" , addr , "err" , err )
@ -1023,7 +1021,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {
// validateTx checks whether a transaction is valid according to the consensus
// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
// rules and adheres to some heuristic limits of the local node (price and size).
func ( p * BlobPool ) validateTx ( tx * types . Transaction , blobs [ ] kzg4844 . Blob , commits [ ] kzg4844 . Commitment , proofs [ ] kzg4844 . Proof ) error {
func ( p * BlobPool ) validateTx ( tx * types . Transaction ) error {
// Ensure the transaction adheres to basic pool filters (type, size, tip) and
// Ensure the transaction adheres to basic pool filters (type, size, tip) and
// consensus rules
// consensus rules
baseOpts := & txpool . ValidationOptions {
baseOpts := & txpool . ValidationOptions {
@ -1032,7 +1030,7 @@ func (p *BlobPool) validateTx(tx *types.Transaction, blobs []kzg4844.Blob, commi
MaxSize : txMaxSize ,
MaxSize : txMaxSize ,
MinTip : p . gasTip . ToBig ( ) ,
MinTip : p . gasTip . ToBig ( ) ,
}
}
if err := txpool . ValidateTransaction ( tx , blobs , commits , proofs , p . head , p . signer , baseOpts ) ; err != nil {
if err := txpool . ValidateTransaction ( tx , p . head , p . signer , baseOpts ) ; err != nil {
return err
return err
}
}
// Ensure the transaction adheres to the stateful pool filters (nonce, balance)
// Ensure the transaction adheres to the stateful pool filters (nonce, balance)
@ -1117,7 +1115,7 @@ func (p *BlobPool) Has(hash common.Hash) bool {
}
}
// Get returns a transaction if it is contained in the pool, or nil otherwise.
// Get returns a transaction if it is contained in the pool, or nil otherwise.
func ( p * BlobPool ) Get ( hash common . Hash ) * txpool . Transaction {
func ( p * BlobPool ) Get ( hash common . Hash ) * types . Transaction {
// Track the amount of time waiting to retrieve a fully resolved blob tx from
// Track the amount of time waiting to retrieve a fully resolved blob tx from
// the pool and the amount of time actually spent on pulling the data from disk.
// the pool and the amount of time actually spent on pulling the data from disk.
getStart := time . Now ( )
getStart := time . Now ( )
@ -1139,32 +1137,27 @@ func (p *BlobPool) Get(hash common.Hash) *txpool.Transaction {
log . Error ( "Tracked blob transaction missing from store" , "hash" , hash , "id" , id , "err" , err )
log . Error ( "Tracked blob transaction missing from store" , "hash" , hash , "id" , id , "err" , err )
return nil
return nil
}
}
item := new ( blobTx )
item := new ( types . Transaction )
if err = rlp . DecodeBytes ( data , item ) ; err != nil {
if err = rlp . DecodeBytes ( data , item ) ; err != nil {
log . Error ( "Blobs corrupted for traced transaction" , "hash" , hash , "id" , id , "err" , err )
log . Error ( "Blobs corrupted for traced transaction" , "hash" , hash , "id" , id , "err" , err )
return nil
return nil
}
}
return & txpool . Transaction {
return item
Tx : item . Tx ,
BlobTxBlobs : item . Blobs ,
BlobTxCommits : item . Commits ,
BlobTxProofs : item . Proofs ,
}
}
}
// Add inserts a set of blob transactions into the pool if they pass validation (both
// Add inserts a set of blob transactions into the pool if they pass validation (both
// consensus validity and pool restictions).
// consensus validity and pool restictions).
func ( p * BlobPool ) Add ( txs [ ] * txpool . Transaction , local bool , sync bool ) [ ] error {
func ( p * BlobPool ) Add ( txs [ ] * types . Transaction , local bool , sync bool ) [ ] error {
errs := make ( [ ] error , len ( txs ) )
errs := make ( [ ] error , len ( txs ) )
for i , tx := range txs {
for i , tx := range txs {
errs [ i ] = p . add ( tx . Tx , tx . BlobTxBlobs , tx . BlobTxCommits , tx . BlobTxProofs )
errs [ i ] = p . add ( tx )
}
}
return errs
return errs
}
}
// Add inserts a new blob transaction into the pool if it passes validation (both
// Add inserts a new blob transaction into the pool if it passes validation (both
// consensus validity and pool restictions).
// consensus validity and pool restictions).
func ( p * BlobPool ) add ( tx * types . Transaction , blobs [ ] kzg4844 . Blob , commits [ ] kzg4844 . Commitment , proofs [ ] kzg4844 . Proof ) ( err error ) {
func ( p * BlobPool ) add ( tx * types . Transaction ) ( err error ) {
// The blob pool blocks on adding a transaction. This is because blob txs are
// The blob pool blocks on adding a transaction. This is because blob txs are
// only even pulled form the network, so this method will act as the overload
// only even pulled form the network, so this method will act as the overload
// protection for fetches.
// protection for fetches.
@ -1178,7 +1171,7 @@ func (p *BlobPool) add(tx *types.Transaction, blobs []kzg4844.Blob, commits []kz
} ( time . Now ( ) )
} ( time . Now ( ) )
// Ensure the transaction is valid from all perspectives
// Ensure the transaction is valid from all perspectives
if err := p . validateTx ( tx , blobs , commits , proofs ) ; err != nil {
if err := p . validateTx ( tx ) ; err != nil {
log . Trace ( "Transaction validation failed" , "hash" , tx . Hash ( ) , "err" , err )
log . Trace ( "Transaction validation failed" , "hash" , tx . Hash ( ) , "err" , err )
return err
return err
}
}
@ -1203,7 +1196,7 @@ func (p *BlobPool) add(tx *types.Transaction, blobs []kzg4844.Blob, commits []kz
}
}
// Transaction permitted into the pool from a nonce and cost perspective,
// Transaction permitted into the pool from a nonce and cost perspective,
// insert it into the database and update the indices
// insert it into the database and update the indices
blob , err := rlp . EncodeToBytes ( & blobTx { Tx : tx , Blobs : blobs , Commits : commits , Proofs : proofs } )
blob , err := rlp . EncodeToBytes ( tx )
if err != nil {
if err != nil {
log . Error ( "Failed to encode transaction for storage" , "hash" , tx . Hash ( ) , "err" , err )
log . Error ( "Failed to encode transaction for storage" , "hash" , tx . Hash ( ) , "err" , err )
return err
return err