mirror of https://github.com/ethereum/go-ethereum
core: move tx indexer to its own file (#28857)
This change moves all the transaction indexing functions to a separate txindexer.go file and defines a txIndexer structure as a refactoring.pull/28863/head
parent
542c861b4f
commit
6b0de79935
@ -0,0 +1,220 @@ |
||||
// Copyright 2024 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>
|
||||
|
||||
package core |
||||
|
||||
import ( |
||||
"errors" |
||||
"fmt" |
||||
|
||||
"github.com/ethereum/go-ethereum/core/rawdb" |
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
"github.com/ethereum/go-ethereum/log" |
||||
) |
||||
|
||||
// TxIndexProgress is the struct describing the progress for transaction indexing.
|
||||
type TxIndexProgress struct { |
||||
Indexed uint64 // number of blocks whose transactions are indexed
|
||||
Remaining uint64 // number of blocks whose transactions are not indexed yet
|
||||
} |
||||
|
||||
// Done returns an indicator if the transaction indexing is finished.
|
||||
func (progress TxIndexProgress) Done() bool { |
||||
return progress.Remaining == 0 |
||||
} |
||||
|
||||
// txIndexer is the module responsible for maintaining transaction indexes
|
||||
// according to the configured indexing range by users.
|
||||
type txIndexer struct { |
||||
// limit is the maximum number of blocks from head whose tx indexes
|
||||
// are reserved:
|
||||
// * 0: means the entire chain should be indexed
|
||||
// * N: means the latest N blocks [HEAD-N+1, HEAD] should be indexed
|
||||
// and all others shouldn't.
|
||||
limit uint64 |
||||
db ethdb.Database |
||||
progress chan chan TxIndexProgress |
||||
term chan chan struct{} |
||||
closed chan struct{} |
||||
} |
||||
|
||||
// newTxIndexer initializes the transaction indexer.
|
||||
func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer { |
||||
indexer := &txIndexer{ |
||||
limit: limit, |
||||
db: chain.db, |
||||
progress: make(chan chan TxIndexProgress), |
||||
term: make(chan chan struct{}), |
||||
closed: make(chan struct{}), |
||||
} |
||||
go indexer.loop(chain) |
||||
|
||||
var msg string |
||||
if limit == 0 { |
||||
msg = "entire chain" |
||||
} else { |
||||
msg = fmt.Sprintf("last %d blocks", limit) |
||||
} |
||||
log.Info("Initialized transaction indexer", "range", msg) |
||||
|
||||
return indexer |
||||
} |
||||
|
||||
// run executes the scheduled indexing/unindexing task in a separate thread.
|
||||
// If the stop channel is closed, the task should be terminated as soon as
|
||||
// possible, the done channel will be closed once the task is finished.
|
||||
func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, done chan struct{}) { |
||||
defer func() { close(done) }() |
||||
|
||||
// Short circuit if chain is empty and nothing to index.
|
||||
if head == 0 { |
||||
return |
||||
} |
||||
// The tail flag is not existent, it means the node is just initialized
|
||||
// and all blocks in the chain (part of them may from ancient store) are
|
||||
// not indexed yet, index the chain according to the configured limit.
|
||||
if tail == nil { |
||||
from := uint64(0) |
||||
if indexer.limit != 0 && head >= indexer.limit { |
||||
from = head - indexer.limit + 1 |
||||
} |
||||
rawdb.IndexTransactions(indexer.db, from, head+1, stop, true) |
||||
return |
||||
} |
||||
// The tail flag is existent (which means indexes in [tail, head] should be
|
||||
// present), while the whole chain are requested for indexing.
|
||||
if indexer.limit == 0 || head < indexer.limit { |
||||
if *tail > 0 { |
||||
// It can happen when chain is rewound to a historical point which
|
||||
// is even lower than the indexes tail, recap the indexing target
|
||||
// to new head to avoid reading non-existent block bodies.
|
||||
end := *tail |
||||
if end > head+1 { |
||||
end = head + 1 |
||||
} |
||||
rawdb.IndexTransactions(indexer.db, 0, end, stop, true) |
||||
} |
||||
return |
||||
} |
||||
// The tail flag is existent, adjust the index range according to configured
|
||||
// limit and the latest chain head.
|
||||
if head-indexer.limit+1 < *tail { |
||||
// Reindex a part of missing indices and rewind index tail to HEAD-limit
|
||||
rawdb.IndexTransactions(indexer.db, head-indexer.limit+1, *tail, stop, true) |
||||
} else { |
||||
// Unindex a part of stale indices and forward index tail to HEAD-limit
|
||||
rawdb.UnindexTransactions(indexer.db, *tail, head-indexer.limit+1, stop, false) |
||||
} |
||||
} |
||||
|
||||
// loop is the scheduler of the indexer, assigning indexing/unindexing tasks depending
|
||||
// on the received chain event.
|
||||
func (indexer *txIndexer) loop(chain *BlockChain) { |
||||
defer close(indexer.closed) |
||||
|
||||
// Listening to chain events and manipulate the transaction indexes.
|
||||
var ( |
||||
stop chan struct{} // Non-nil if background routine is active.
|
||||
done chan struct{} // Non-nil if background routine is active.
|
||||
lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)
|
||||
|
||||
headCh = make(chan ChainHeadEvent) |
||||
sub = chain.SubscribeChainHeadEvent(headCh) |
||||
) |
||||
defer sub.Unsubscribe() |
||||
|
||||
// Launch the initial processing if chain is not empty (head != genesis).
|
||||
// This step is useful in these scenarios that chain has no progress.
|
||||
if head := rawdb.ReadHeadBlock(indexer.db); head != nil && head.Number().Uint64() != 0 { |
||||
stop = make(chan struct{}) |
||||
done = make(chan struct{}) |
||||
lastHead = head.Number().Uint64() |
||||
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.NumberU64(), stop, done) |
||||
} |
||||
for { |
||||
select { |
||||
case head := <-headCh: |
||||
if done == nil { |
||||
stop = make(chan struct{}) |
||||
done = make(chan struct{}) |
||||
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Block.NumberU64(), stop, done) |
||||
} |
||||
lastHead = head.Block.NumberU64() |
||||
case <-done: |
||||
stop = nil |
||||
done = nil |
||||
case ch := <-indexer.progress: |
||||
ch <- indexer.report(lastHead) |
||||
case ch := <-indexer.term: |
||||
if stop != nil { |
||||
close(stop) |
||||
} |
||||
if done != nil { |
||||
log.Info("Waiting background transaction indexer to exit") |
||||
<-done |
||||
} |
||||
close(ch) |
||||
return |
||||
} |
||||
} |
||||
} |
||||
|
||||
// report returns the tx indexing progress.
|
||||
func (indexer *txIndexer) report(head uint64) TxIndexProgress { |
||||
var ( |
||||
remaining uint64 |
||||
tail = rawdb.ReadTxIndexTail(indexer.db) |
||||
) |
||||
total := indexer.limit |
||||
if indexer.limit == 0 || total > head { |
||||
total = head + 1 // genesis included
|
||||
} |
||||
var indexed uint64 |
||||
if tail != nil { |
||||
indexed = head - *tail + 1 |
||||
} |
||||
// The value of indexed might be larger than total if some blocks need
|
||||
// to be unindexed, avoiding a negative remaining.
|
||||
if indexed < total { |
||||
remaining = total - indexed |
||||
} |
||||
return TxIndexProgress{ |
||||
Indexed: indexed, |
||||
Remaining: remaining, |
||||
} |
||||
} |
||||
|
||||
// txIndexProgress retrieves the tx indexing progress, or an error if the
|
||||
// background tx indexer is already stopped.
|
||||
func (indexer *txIndexer) txIndexProgress() (TxIndexProgress, error) { |
||||
ch := make(chan TxIndexProgress, 1) |
||||
select { |
||||
case indexer.progress <- ch: |
||||
return <-ch, nil |
||||
case <-indexer.closed: |
||||
return TxIndexProgress{}, errors.New("indexer is closed") |
||||
} |
||||
} |
||||
|
||||
// close shutdown the indexer. Safe to be called for multiple times.
|
||||
func (indexer *txIndexer) close() { |
||||
ch := make(chan struct{}) |
||||
select { |
||||
case indexer.term <- ch: |
||||
<-ch |
||||
case <-indexer.closed: |
||||
} |
||||
} |
@ -0,0 +1,243 @@ |
||||
// Copyright 2024 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>
|
||||
|
||||
package core |
||||
|
||||
import ( |
||||
"math/big" |
||||
"os" |
||||
"testing" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/consensus/ethash" |
||||
"github.com/ethereum/go-ethereum/core/rawdb" |
||||
"github.com/ethereum/go-ethereum/core/types" |
||||
"github.com/ethereum/go-ethereum/crypto" |
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
"github.com/ethereum/go-ethereum/params" |
||||
) |
||||
|
||||
// TestTxIndexer tests the functionalities for managing transaction indexes.
|
||||
func TestTxIndexer(t *testing.T) { |
||||
var ( |
||||
testBankKey, _ = crypto.GenerateKey() |
||||
testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey) |
||||
testBankFunds = big.NewInt(1000000000000000000) |
||||
|
||||
gspec = &Genesis{ |
||||
Config: params.TestChainConfig, |
||||
Alloc: GenesisAlloc{testBankAddress: {Balance: testBankFunds}}, |
||||
BaseFee: big.NewInt(params.InitialBaseFee), |
||||
} |
||||
engine = ethash.NewFaker() |
||||
nonce = uint64(0) |
||||
chainHead = uint64(128) |
||||
) |
||||
_, blocks, receipts := GenerateChainWithGenesis(gspec, engine, int(chainHead), func(i int, gen *BlockGen) { |
||||
tx, _ := types.SignTx(types.NewTransaction(nonce, common.HexToAddress("0xdeadbeef"), big.NewInt(1000), params.TxGas, big.NewInt(10*params.InitialBaseFee), nil), types.HomesteadSigner{}, testBankKey) |
||||
gen.AddTx(tx) |
||||
nonce += 1 |
||||
}) |
||||
|
||||
// verifyIndexes checks if the transaction indexes are present or not
|
||||
// of the specified block.
|
||||
verifyIndexes := func(db ethdb.Database, number uint64, exist bool) { |
||||
if number == 0 { |
||||
return |
||||
} |
||||
block := blocks[number-1] |
||||
for _, tx := range block.Transactions() { |
||||
lookup := rawdb.ReadTxLookupEntry(db, tx.Hash()) |
||||
if exist && lookup == nil { |
||||
t.Fatalf("missing %d %x", number, tx.Hash().Hex()) |
||||
} |
||||
if !exist && lookup != nil { |
||||
t.Fatalf("unexpected %d %x", number, tx.Hash().Hex()) |
||||
} |
||||
} |
||||
} |
||||
verify := func(db ethdb.Database, expTail uint64, indexer *txIndexer) { |
||||
tail := rawdb.ReadTxIndexTail(db) |
||||
if tail == nil { |
||||
t.Fatal("Failed to write tx index tail") |
||||
} |
||||
if *tail != expTail { |
||||
t.Fatalf("Unexpected tx index tail, want %v, got %d", expTail, *tail) |
||||
} |
||||
if *tail != 0 { |
||||
for number := uint64(0); number < *tail; number += 1 { |
||||
verifyIndexes(db, number, false) |
||||
} |
||||
} |
||||
for number := *tail; number <= chainHead; number += 1 { |
||||
verifyIndexes(db, number, true) |
||||
} |
||||
progress := indexer.report(chainHead) |
||||
if !progress.Done() { |
||||
t.Fatalf("Expect fully indexed") |
||||
} |
||||
} |
||||
|
||||
var cases = []struct { |
||||
limitA uint64 |
||||
tailA uint64 |
||||
limitB uint64 |
||||
tailB uint64 |
||||
limitC uint64 |
||||
tailC uint64 |
||||
}{ |
||||
{ |
||||
// LimitA: 0
|
||||
// TailA: 0
|
||||
//
|
||||
// all blocks are indexed
|
||||
limitA: 0, |
||||
tailA: 0, |
||||
|
||||
// LimitB: 1
|
||||
// TailB: 128
|
||||
//
|
||||
// block-128 is indexed
|
||||
limitB: 1, |
||||
tailB: 128, |
||||
|
||||
// LimitB: 64
|
||||
// TailB: 65
|
||||
//
|
||||
// block [65, 128] are indexed
|
||||
limitC: 64, |
||||
tailC: 65, |
||||
}, |
||||
{ |
||||
// LimitA: 64
|
||||
// TailA: 65
|
||||
//
|
||||
// block [65, 128] are indexed
|
||||
limitA: 64, |
||||
tailA: 65, |
||||
|
||||
// LimitB: 1
|
||||
// TailB: 128
|
||||
//
|
||||
// block-128 is indexed
|
||||
limitB: 1, |
||||
tailB: 128, |
||||
|
||||
// LimitB: 64
|
||||
// TailB: 65
|
||||
//
|
||||
// block [65, 128] are indexed
|
||||
limitC: 64, |
||||
tailC: 65, |
||||
}, |
||||
{ |
||||
// LimitA: 127
|
||||
// TailA: 2
|
||||
//
|
||||
// block [2, 128] are indexed
|
||||
limitA: 127, |
||||
tailA: 2, |
||||
|
||||
// LimitB: 1
|
||||
// TailB: 128
|
||||
//
|
||||
// block-128 is indexed
|
||||
limitB: 1, |
||||
tailB: 128, |
||||
|
||||
// LimitB: 64
|
||||
// TailB: 65
|
||||
//
|
||||
// block [65, 128] are indexed
|
||||
limitC: 64, |
||||
tailC: 65, |
||||
}, |
||||
{ |
||||
// LimitA: 128
|
||||
// TailA: 1
|
||||
//
|
||||
// block [2, 128] are indexed
|
||||
limitA: 128, |
||||
tailA: 1, |
||||
|
||||
// LimitB: 1
|
||||
// TailB: 128
|
||||
//
|
||||
// block-128 is indexed
|
||||
limitB: 1, |
||||
tailB: 128, |
||||
|
||||
// LimitB: 64
|
||||
// TailB: 65
|
||||
//
|
||||
// block [65, 128] are indexed
|
||||
limitC: 64, |
||||
tailC: 65, |
||||
}, |
||||
{ |
||||
// LimitA: 129
|
||||
// TailA: 0
|
||||
//
|
||||
// block [0, 128] are indexed
|
||||
limitA: 129, |
||||
tailA: 0, |
||||
|
||||
// LimitB: 1
|
||||
// TailB: 128
|
||||
//
|
||||
// block-128 is indexed
|
||||
limitB: 1, |
||||
tailB: 128, |
||||
|
||||
// LimitB: 64
|
||||
// TailB: 65
|
||||
//
|
||||
// block [65, 128] are indexed
|
||||
limitC: 64, |
||||
tailC: 65, |
||||
}, |
||||
} |
||||
for _, c := range cases { |
||||
frdir := t.TempDir() |
||||
db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) |
||||
rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...), big.NewInt(0)) |
||||
|
||||
// Index the initial blocks from ancient store
|
||||
indexer := &txIndexer{ |
||||
limit: c.limitA, |
||||
db: db, |
||||
progress: make(chan chan TxIndexProgress), |
||||
} |
||||
indexer.run(nil, 128, make(chan struct{}), make(chan struct{})) |
||||
verify(db, c.tailA, indexer) |
||||
|
||||
indexer.limit = c.limitB |
||||
indexer.run(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}), make(chan struct{})) |
||||
verify(db, c.tailB, indexer) |
||||
|
||||
indexer.limit = c.limitC |
||||
indexer.run(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}), make(chan struct{})) |
||||
verify(db, c.tailC, indexer) |
||||
|
||||
// Recover all indexes
|
||||
indexer.limit = 0 |
||||
indexer.run(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}), make(chan struct{})) |
||||
verify(db, 0, indexer) |
||||
|
||||
db.Close() |
||||
os.RemoveAll(frdir) |
||||
} |
||||
} |
Loading…
Reference in new issue