|
|
|
@ -36,6 +36,7 @@ import ( |
|
|
|
|
"github.com/ethereum/go-ethereum/core/state" |
|
|
|
|
"github.com/ethereum/go-ethereum/core/txpool" |
|
|
|
|
"github.com/ethereum/go-ethereum/core/types" |
|
|
|
|
"github.com/ethereum/go-ethereum/crypto/kzg4844" |
|
|
|
|
"github.com/ethereum/go-ethereum/event" |
|
|
|
|
"github.com/ethereum/go-ethereum/log" |
|
|
|
|
"github.com/ethereum/go-ethereum/metrics" |
|
|
|
@ -88,9 +89,11 @@ const ( |
|
|
|
|
// bare minimum needed fields to keep the size down (and thus number of entries
|
|
|
|
|
// larger with the same memory consumption).
|
|
|
|
|
type blobTxMeta struct { |
|
|
|
|
hash common.Hash // Transaction hash to maintain the lookup table
|
|
|
|
|
id uint64 // Storage ID in the pool's persistent store
|
|
|
|
|
size uint32 // Byte size in the pool's persistent store
|
|
|
|
|
hash common.Hash // Transaction hash to maintain the lookup table
|
|
|
|
|
vhashes []common.Hash // Blob versioned hashes to maintain the lookup table
|
|
|
|
|
|
|
|
|
|
id uint64 // Storage ID in the pool's persistent store
|
|
|
|
|
size uint32 // Byte size in the pool's persistent store
|
|
|
|
|
|
|
|
|
|
nonce uint64 // Needed to prioritize inclusion order within an account
|
|
|
|
|
costCap *uint256.Int // Needed to validate cumulative balance sufficiency
|
|
|
|
@ -113,6 +116,7 @@ type blobTxMeta struct { |
|
|
|
|
func newBlobTxMeta(id uint64, size uint32, tx *types.Transaction) *blobTxMeta { |
|
|
|
|
meta := &blobTxMeta{ |
|
|
|
|
hash: tx.Hash(), |
|
|
|
|
vhashes: tx.BlobHashes(), |
|
|
|
|
id: id, |
|
|
|
|
size: size, |
|
|
|
|
nonce: tx.Nonce(), |
|
|
|
@ -306,7 +310,7 @@ type BlobPool struct { |
|
|
|
|
state *state.StateDB // Current state at the head of the chain
|
|
|
|
|
gasTip *uint256.Int // Currently accepted minimum gas tip
|
|
|
|
|
|
|
|
|
|
lookup map[common.Hash]uint64 // Lookup table mapping hashes to tx billy entries
|
|
|
|
|
lookup *lookup // Lookup table mapping blobs to txs and txs to billy entries
|
|
|
|
|
index map[common.Address][]*blobTxMeta // Blob transactions grouped by accounts, sorted by nonce
|
|
|
|
|
spent map[common.Address]*uint256.Int // Expenditure tracking for individual accounts
|
|
|
|
|
evict *evictHeap // Heap of cheapest accounts for eviction when full
|
|
|
|
@ -328,7 +332,7 @@ func New(config Config, chain BlockChain) *BlobPool { |
|
|
|
|
config: config, |
|
|
|
|
signer: types.LatestSigner(chain.Config()), |
|
|
|
|
chain: chain, |
|
|
|
|
lookup: make(map[common.Hash]uint64), |
|
|
|
|
lookup: newLookup(), |
|
|
|
|
index: make(map[common.Address][]*blobTxMeta), |
|
|
|
|
spent: make(map[common.Address]*uint256.Int), |
|
|
|
|
} |
|
|
|
@ -471,7 +475,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
meta := newBlobTxMeta(id, size, tx) |
|
|
|
|
if _, exists := p.lookup[meta.hash]; exists { |
|
|
|
|
if p.lookup.exists(meta.hash) { |
|
|
|
|
// This path is only possible after a crash, where deleted items are not
|
|
|
|
|
// removed via the normal shutdown-startup procedure and thus may get
|
|
|
|
|
// partially resurrected.
|
|
|
|
@ -496,9 +500,8 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error { |
|
|
|
|
p.index[sender] = append(p.index[sender], meta) |
|
|
|
|
p.spent[sender] = new(uint256.Int).Add(p.spent[sender], meta.costCap) |
|
|
|
|
|
|
|
|
|
p.lookup[meta.hash] = meta.id |
|
|
|
|
p.lookup.track(meta) |
|
|
|
|
p.stored += uint64(meta.size) |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -531,7 +534,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 |
|
|
|
|
nonces = append(nonces, txs[i].nonce) |
|
|
|
|
|
|
|
|
|
p.stored -= uint64(txs[i].size) |
|
|
|
|
delete(p.lookup, txs[i].hash) |
|
|
|
|
p.lookup.untrack(txs[i]) |
|
|
|
|
|
|
|
|
|
// Included transactions blobs need to be moved to the limbo
|
|
|
|
|
if filled && inclusions != nil { |
|
|
|
@ -572,7 +575,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 |
|
|
|
|
|
|
|
|
|
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[0].costCap) |
|
|
|
|
p.stored -= uint64(txs[0].size) |
|
|
|
|
delete(p.lookup, txs[0].hash) |
|
|
|
|
p.lookup.untrack(txs[0]) |
|
|
|
|
|
|
|
|
|
// Included transactions blobs need to be moved to the limbo
|
|
|
|
|
if inclusions != nil { |
|
|
|
@ -621,14 +624,14 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 |
|
|
|
|
// crash would result in previously deleted entities being resurrected.
|
|
|
|
|
// That could potentially cause a duplicate nonce to appear.
|
|
|
|
|
if txs[i].nonce == txs[i-1].nonce { |
|
|
|
|
id := p.lookup[txs[i].hash] |
|
|
|
|
id, _ := p.lookup.storeidOfTx(txs[i].hash) |
|
|
|
|
|
|
|
|
|
log.Error("Dropping repeat nonce blob transaction", "from", addr, "nonce", txs[i].nonce, "id", id) |
|
|
|
|
dropRepeatedMeter.Mark(1) |
|
|
|
|
|
|
|
|
|
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap) |
|
|
|
|
p.stored -= uint64(txs[i].size) |
|
|
|
|
delete(p.lookup, txs[i].hash) |
|
|
|
|
p.lookup.untrack(txs[i]) |
|
|
|
|
|
|
|
|
|
if err := p.store.Delete(id); err != nil { |
|
|
|
|
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err) |
|
|
|
@ -650,7 +653,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 |
|
|
|
|
|
|
|
|
|
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[j].costCap) |
|
|
|
|
p.stored -= uint64(txs[j].size) |
|
|
|
|
delete(p.lookup, txs[j].hash) |
|
|
|
|
p.lookup.untrack(txs[j]) |
|
|
|
|
} |
|
|
|
|
txs = txs[:i] |
|
|
|
|
|
|
|
|
@ -688,7 +691,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 |
|
|
|
|
|
|
|
|
|
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], last.costCap) |
|
|
|
|
p.stored -= uint64(last.size) |
|
|
|
|
delete(p.lookup, last.hash) |
|
|
|
|
p.lookup.untrack(last) |
|
|
|
|
} |
|
|
|
|
if len(txs) == 0 { |
|
|
|
|
delete(p.index, addr) |
|
|
|
@ -728,7 +731,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 |
|
|
|
|
|
|
|
|
|
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], last.costCap) |
|
|
|
|
p.stored -= uint64(last.size) |
|
|
|
|
delete(p.lookup, last.hash) |
|
|
|
|
p.lookup.untrack(last) |
|
|
|
|
} |
|
|
|
|
p.index[addr] = txs |
|
|
|
|
|
|
|
|
@ -1006,7 +1009,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { |
|
|
|
|
p.index[addr] = append(p.index[addr], meta) |
|
|
|
|
p.spent[addr] = new(uint256.Int).Add(p.spent[addr], meta.costCap) |
|
|
|
|
} |
|
|
|
|
p.lookup[meta.hash] = meta.id |
|
|
|
|
p.lookup.track(meta) |
|
|
|
|
p.stored += uint64(meta.size) |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
@ -1033,7 +1036,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) { |
|
|
|
|
) |
|
|
|
|
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap) |
|
|
|
|
p.stored -= uint64(tx.size) |
|
|
|
|
delete(p.lookup, tx.hash) |
|
|
|
|
p.lookup.untrack(tx) |
|
|
|
|
txs[i] = nil |
|
|
|
|
|
|
|
|
|
// Drop everything afterwards, no gaps allowed
|
|
|
|
@ -1043,7 +1046,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) { |
|
|
|
|
|
|
|
|
|
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], tx.costCap) |
|
|
|
|
p.stored -= uint64(tx.size) |
|
|
|
|
delete(p.lookup, tx.hash) |
|
|
|
|
p.lookup.untrack(tx) |
|
|
|
|
txs[i+1+j] = nil |
|
|
|
|
} |
|
|
|
|
// Clear out the dropped transactions from the index
|
|
|
|
@ -1171,8 +1174,7 @@ func (p *BlobPool) Has(hash common.Hash) bool { |
|
|
|
|
p.lock.RLock() |
|
|
|
|
defer p.lock.RUnlock() |
|
|
|
|
|
|
|
|
|
_, ok := p.lookup[hash] |
|
|
|
|
return ok |
|
|
|
|
return p.lookup.exists(hash) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Get returns a transaction if it is contained in the pool, or nil otherwise.
|
|
|
|
@ -1189,7 +1191,7 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction { |
|
|
|
|
}(time.Now()) |
|
|
|
|
|
|
|
|
|
// Pull the blob from disk and return an assembled response
|
|
|
|
|
id, ok := p.lookup[hash] |
|
|
|
|
id, ok := p.lookup.storeidOfTx(hash) |
|
|
|
|
if !ok { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
@ -1206,6 +1208,58 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction { |
|
|
|
|
return item |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetBlobs returns a number of blobs are proofs for the given versioned hashes.
|
|
|
|
|
// This is a utility method for the engine API, enabling consensus clients to
|
|
|
|
|
// retrieve blobs from the pools directly instead of the network.
|
|
|
|
|
func (p *BlobPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.Proof) { |
|
|
|
|
// Create a map of the blob hash to indices for faster fills
|
|
|
|
|
var ( |
|
|
|
|
blobs = make([]*kzg4844.Blob, len(vhashes)) |
|
|
|
|
proofs = make([]*kzg4844.Proof, len(vhashes)) |
|
|
|
|
) |
|
|
|
|
index := make(map[common.Hash]int) |
|
|
|
|
for i, vhash := range vhashes { |
|
|
|
|
index[vhash] = i |
|
|
|
|
} |
|
|
|
|
// Iterate over the blob hashes, pulling transactions that fill it. Take care
|
|
|
|
|
// to also fill anything else the transaction might include (probably will).
|
|
|
|
|
for i, vhash := range vhashes { |
|
|
|
|
// If already filled by a previous fetch, skip
|
|
|
|
|
if blobs[i] != nil { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
// Unfilled, retrieve the datastore item (in a short lock)
|
|
|
|
|
p.lock.RLock() |
|
|
|
|
id, exists := p.lookup.storeidOfBlob(vhash) |
|
|
|
|
if !exists { |
|
|
|
|
p.lock.RUnlock() |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
data, err := p.store.Get(id) |
|
|
|
|
p.lock.RUnlock() |
|
|
|
|
|
|
|
|
|
// After releasing the lock, try to fill any blobs requested
|
|
|
|
|
if err != nil { |
|
|
|
|
log.Error("Tracked blob transaction missing from store", "id", id, "err", err) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
item := new(types.Transaction) |
|
|
|
|
if err = rlp.DecodeBytes(data, item); err != nil { |
|
|
|
|
log.Error("Blobs corrupted for traced transaction", "id", id, "err", err) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
// Fill anything requested, not just the current versioned hash
|
|
|
|
|
sidecar := item.BlobTxSidecar() |
|
|
|
|
for j, blobhash := range item.BlobHashes() { |
|
|
|
|
if idx, ok := index[blobhash]; ok { |
|
|
|
|
blobs[idx] = &sidecar.Blobs[j] |
|
|
|
|
proofs[idx] = &sidecar.Proofs[j] |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return blobs, proofs |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Add inserts a set of blob transactions into the pool if they pass validation (both
|
|
|
|
|
// consensus validity and pool restrictions).
|
|
|
|
|
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error { |
|
|
|
@ -1319,8 +1373,8 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) { |
|
|
|
|
p.spent[from] = new(uint256.Int).Sub(p.spent[from], prev.costCap) |
|
|
|
|
p.spent[from] = new(uint256.Int).Add(p.spent[from], meta.costCap) |
|
|
|
|
|
|
|
|
|
delete(p.lookup, prev.hash) |
|
|
|
|
p.lookup[meta.hash] = meta.id |
|
|
|
|
p.lookup.untrack(prev) |
|
|
|
|
p.lookup.track(meta) |
|
|
|
|
p.stored += uint64(meta.size) - uint64(prev.size) |
|
|
|
|
} else { |
|
|
|
|
// Transaction extends previously scheduled ones
|
|
|
|
@ -1330,7 +1384,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) { |
|
|
|
|
newacc = true |
|
|
|
|
} |
|
|
|
|
p.spent[from] = new(uint256.Int).Add(p.spent[from], meta.costCap) |
|
|
|
|
p.lookup[meta.hash] = meta.id |
|
|
|
|
p.lookup.track(meta) |
|
|
|
|
p.stored += uint64(meta.size) |
|
|
|
|
} |
|
|
|
|
// Recompute the rolling eviction fields. In case of a replacement, this will
|
|
|
|
@ -1419,7 +1473,7 @@ func (p *BlobPool) drop() { |
|
|
|
|
p.spent[from] = new(uint256.Int).Sub(p.spent[from], drop.costCap) |
|
|
|
|
} |
|
|
|
|
p.stored -= uint64(drop.size) |
|
|
|
|
delete(p.lookup, drop.hash) |
|
|
|
|
p.lookup.untrack(drop) |
|
|
|
|
|
|
|
|
|
// Remove the transaction from the pool's eviction heap:
|
|
|
|
|
// - If the entire account was dropped, pop off the address
|
|
|
|
|