core/rawdb: single point of maintenance for writing and deleting tx lookup indexes (#21480)

pull/21571/head
Giuseppe Bertone 4 years ago committed by GitHub
parent 4764b2f0be
commit 0185ee0993
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      core/blockchain.go
  2. 34
      core/rawdb/accessors_indexes.go
  3. 6
      core/rawdb/accessors_indexes_test.go
  4. 4
      core/rawdb/chain_iterator.go
  5. 2
      light/txpool.go

@ -712,7 +712,7 @@ func (bc *BlockChain) writeHeadBlock(block *types.Block) {
// Add the block to the canonical chain number scheme and mark as the head // Add the block to the canonical chain number scheme and mark as the head
batch := bc.db.NewBatch() batch := bc.db.NewBatch()
rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64()) rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64())
rawdb.WriteTxLookupEntries(batch, block) rawdb.WriteTxLookupEntriesByBlock(batch, block)
rawdb.WriteHeadBlockHash(batch, block.Hash()) rawdb.WriteHeadBlockHash(batch, block.Hash())
// If the block is better than our head or is on a different chain, force update heads // If the block is better than our head or is on a different chain, force update heads
@ -1217,9 +1217,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// range. In this case, all tx indices of newly imported blocks should be // range. In this case, all tx indices of newly imported blocks should be
// generated. // generated.
if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit { if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit {
rawdb.WriteTxLookupEntries(batch, block) rawdb.WriteTxLookupEntriesByBlock(batch, block)
} else if rawdb.ReadTxIndexTail(bc.db) != nil { } else if rawdb.ReadTxIndexTail(bc.db) != nil {
rawdb.WriteTxLookupEntries(batch, block) rawdb.WriteTxLookupEntriesByBlock(batch, block)
} }
stats.processed++ stats.processed++
} }
@ -1303,7 +1303,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// Write all the data out into the database // Write all the data out into the database
rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()) rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body())
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i]) rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
rawdb.WriteTxLookupEntries(batch, block) // Always write tx indices for live blocks, we assume they are needed rawdb.WriteTxLookupEntriesByBlock(batch, block) // Always write tx indices for live blocks, we assume they are needed
// Write everything belongs to the blocks into the database. So that // Write everything belongs to the blocks into the database. So that
// we can ensure all components of body is completed(body, receipts, // we can ensure all components of body is completed(body, receipts,

@ -53,25 +53,29 @@ func ReadTxLookupEntry(db ethdb.Reader, hash common.Hash) *uint64 {
return &entry.BlockIndex return &entry.BlockIndex
} }
// WriteTxLookupEntries stores a positional metadata for every transaction from // writeTxLookupEntry stores a positional metadata for a transaction,
// a block, enabling hash based transaction and receipt lookups. // enabling hash based transaction and receipt lookups.
func WriteTxLookupEntries(db ethdb.KeyValueWriter, block *types.Block) { func writeTxLookupEntry(db ethdb.KeyValueWriter, hash common.Hash, numberBytes []byte) {
number := block.Number().Bytes() if err := db.Put(txLookupKey(hash), numberBytes); err != nil {
for _, tx := range block.Transactions() {
if err := db.Put(txLookupKey(tx.Hash()), number); err != nil {
log.Crit("Failed to store transaction lookup entry", "err", err) log.Crit("Failed to store transaction lookup entry", "err", err)
} }
}
} }
// WriteTxLookupEntriesByHash is identical to WriteTxLookupEntries, but does not // WriteTxLookupEntries is identical to WriteTxLookupEntry, but it works on
// require a full types.Block as input. // a list of hashes
func WriteTxLookupEntriesByHash(db ethdb.KeyValueWriter, number uint64, hashes []common.Hash) { func WriteTxLookupEntries(db ethdb.KeyValueWriter, number uint64, hashes []common.Hash) {
numberBytes := new(big.Int).SetUint64(number).Bytes() numberBytes := new(big.Int).SetUint64(number).Bytes()
for _, hash := range hashes { for _, hash := range hashes {
if err := db.Put(txLookupKey(hash), numberBytes); err != nil { writeTxLookupEntry(db, hash, numberBytes)
log.Crit("Failed to store transaction lookup entry", "err", err)
} }
}
// WriteTxLookupEntriesByBlock stores a positional metadata for every transaction from
// a block, enabling hash based transaction and receipt lookups.
func WriteTxLookupEntriesByBlock(db ethdb.KeyValueWriter, block *types.Block) {
numberBytes := block.Number().Bytes()
for _, tx := range block.Transactions() {
writeTxLookupEntry(db, tx.Hash(), numberBytes)
} }
} }
@ -83,11 +87,9 @@ func DeleteTxLookupEntry(db ethdb.KeyValueWriter, hash common.Hash) {
} }
// DeleteTxLookupEntries removes all transaction lookups for a given block. // DeleteTxLookupEntries removes all transaction lookups for a given block.
func DeleteTxLookupEntriesByHash(db ethdb.KeyValueWriter, hashes []common.Hash) { func DeleteTxLookupEntries(db ethdb.KeyValueWriter, hashes []common.Hash) {
for _, hash := range hashes { for _, hash := range hashes {
if err := db.Delete(txLookupKey(hash)); err != nil { DeleteTxLookupEntry(db, hash)
log.Crit("Failed to delete transaction lookup entry", "err", err)
}
} }
} }

@ -58,12 +58,12 @@ func (h *testHasher) Hash() common.Hash {
func TestLookupStorage(t *testing.T) { func TestLookupStorage(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
writeTxLookupEntries func(ethdb.Writer, *types.Block) writeTxLookupEntriesByBlock func(ethdb.Writer, *types.Block)
}{ }{
{ {
"DatabaseV6", "DatabaseV6",
func(db ethdb.Writer, block *types.Block) { func(db ethdb.Writer, block *types.Block) {
WriteTxLookupEntries(db, block) WriteTxLookupEntriesByBlock(db, block)
}, },
}, },
{ {
@ -110,7 +110,7 @@ func TestLookupStorage(t *testing.T) {
// Insert all the transactions into the database, and verify contents // Insert all the transactions into the database, and verify contents
WriteCanonicalHash(db, block.Hash(), block.NumberU64()) WriteCanonicalHash(db, block.Hash(), block.NumberU64())
WriteBlock(db, block) WriteBlock(db, block)
tc.writeTxLookupEntries(db, block) tc.writeTxLookupEntriesByBlock(db, block)
for i, tx := range txs { for i, tx := range txs {
if txn, hash, number, index := ReadTransaction(db, tx.Hash()); txn == nil { if txn, hash, number, index := ReadTransaction(db, tx.Hash()); txn == nil {

@ -218,7 +218,7 @@ func IndexTransactions(db ethdb.Database, from uint64, to uint64) {
// Next block available, pop it off and index it // Next block available, pop it off and index it
delivery := queue.PopItem().(*blockTxHashes) delivery := queue.PopItem().(*blockTxHashes)
lastNum = delivery.number lastNum = delivery.number
WriteTxLookupEntriesByHash(batch, delivery.number, delivery.hashes) WriteTxLookupEntries(batch, delivery.number, delivery.hashes)
blocks++ blocks++
txs += len(delivery.hashes) txs += len(delivery.hashes)
// If enough data was accumulated in memory or we're at the last block, dump to disk // If enough data was accumulated in memory or we're at the last block, dump to disk
@ -276,7 +276,7 @@ func UnindexTransactions(db ethdb.Database, from uint64, to uint64) {
// Otherwise spin up the concurrent iterator and unindexer // Otherwise spin up the concurrent iterator and unindexer
blocks, txs := 0, 0 blocks, txs := 0, 0
for delivery := range hashesCh { for delivery := range hashesCh {
DeleteTxLookupEntriesByHash(batch, delivery.hashes) DeleteTxLookupEntries(batch, delivery.hashes)
txs += len(delivery.hashes) txs += len(delivery.hashes)
blocks++ blocks++

@ -185,7 +185,7 @@ func (pool *TxPool) checkMinedTxs(ctx context.Context, hash common.Hash, number
if _, err := GetBlockReceipts(ctx, pool.odr, hash, number); err != nil { // ODR caches, ignore results if _, err := GetBlockReceipts(ctx, pool.odr, hash, number); err != nil { // ODR caches, ignore results
return err return err
} }
rawdb.WriteTxLookupEntries(pool.chainDb, block) rawdb.WriteTxLookupEntriesByBlock(pool.chainDb, block)
// Update the transaction pool's state // Update the transaction pool's state
for _, tx := range list { for _, tx := range list {

Loading…
Cancel
Save