From 6eef141aef618afa6bfad885b544f25b68f77cc2 Mon Sep 17 00:00:00 2001 From: gary rong Date: Mon, 13 Jul 2020 17:02:54 +0800 Subject: [PATCH] les: historical data garbage collection (#19570) This change introduces garbage collection for the light client. Historical chain data is deleted periodically. If you want to disable the GC, use the --light.nopruning flag. --- cmd/geth/main.go | 1 + cmd/geth/usage.go | 1 + cmd/utils/flags.go | 18 ++- consensus/clique/clique.go | 2 +- core/blockchain.go | 8 +- core/chain_indexer.go | 9 +- core/chain_indexer_test.go | 4 + core/chain_makers.go | 2 +- core/dao_test.go | 8 +- core/genesis.go | 2 +- core/rawdb/accessors_chain.go | 33 +++++ core/rawdb/accessors_chain_test.go | 33 +++++ core/rawdb/accessors_indexes.go | 22 +++ core/rawdb/accessors_indexes_test.go | 45 ++++++ core/rawdb/freezer.go | 8 +- core/state/statedb_test.go | 6 +- eth/api_backend.go | 4 +- eth/bloombits.go | 5 + eth/config.go | 9 +- eth/downloader/downloader.go | 20 ++- eth/downloader/downloader_test.go | 3 +- eth/downloader/testchain_test.go | 2 +- eth/gen_config.go | 6 + graphql/graphql.go | 2 +- internal/ethapi/api.go | 25 ++-- internal/ethapi/backend.go | 2 +- les/api_backend.go | 7 +- les/client.go | 9 +- les/odr_requests.go | 10 +- les/odr_test.go | 2 +- les/pruner.go | 98 +++++++++++++ les/pruner_test.go | 197 +++++++++++++++++++++++++ les/request_test.go | 2 +- les/server.go | 4 +- les/sync_test.go | 4 +- les/test_helper.go | 16 +- les/ulc_test.go | 2 +- light/lightchain.go | 20 ++- light/odr.go | 9 +- light/odr_util.go | 212 ++++++++++++++------------- light/postprocess.go | 159 +++++++++++++++++--- params/network_params.go | 10 +- trie/database.go | 11 +- trie/iterator_test.go | 4 +- trie/trie_test.go | 2 +- 45 files changed, 843 insertions(+), 215 deletions(-) create mode 100644 les/pruner.go create mode 100644 les/pruner_test.go diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 03ac7bee5..e82e0ec54 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -98,6 +98,7 @@ var ( utils.LightEgressFlag, utils.LightMaxPeersFlag, utils.LegacyLightPeersFlag, + utils.LightNoPruneFlag, utils.LightKDFFlag, utils.UltraLightServersFlag, utils.UltraLightFractionFlag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index ee97e1a97..05bd28130 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -96,6 +96,7 @@ var AppHelpFlagGroups = []flagGroup{ utils.UltraLightServersFlag, utils.UltraLightFractionFlag, utils.UltraLightOnlyAnnounceFlag, + utils.LightNoPruneFlag, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 8ed86d2fa..c5e2aa8d6 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -282,6 +282,10 @@ var ( Name: "ulc.onlyannounce", Usage: "Ultra light server sends announcements only", } + LightNoPruneFlag = cli.BoolFlag{ + Name: "light.nopruning", + Usage: "Disable ancient light chain data pruning", + } // Ethash settings EthashCacheDirFlag = DirectoryFlag{ Name: "ethash.cachedir", @@ -1070,6 +1074,9 @@ func setLes(ctx *cli.Context, cfg *eth.Config) { if ctx.GlobalIsSet(UltraLightOnlyAnnounceFlag.Name) { cfg.UltraLightOnlyAnnounce = ctx.GlobalBool(UltraLightOnlyAnnounceFlag.Name) } + if ctx.GlobalIsSet(LightNoPruneFlag.Name) { + cfg.LightNoPrune = ctx.GlobalBool(LightNoPruneFlag.Name) + } } // makeDatabaseHandles raises out the number of allowed file handles per process @@ -1800,12 +1807,17 @@ func MakeChainDatabase(ctx *cli.Context, stack *node.Node) ethdb.Database { var ( cache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheDatabaseFlag.Name) / 100 handles = makeDatabaseHandles() + + err error + chainDb ethdb.Database ) - name := "chaindata" if ctx.GlobalString(SyncModeFlag.Name) == "light" { - name = "lightchaindata" + name := "lightchaindata" + chainDb, err = stack.OpenDatabase(name, cache, handles, "") + } else { + name := "chaindata" + chainDb, err = stack.OpenDatabaseWithFreezer(name, cache, handles, ctx.GlobalString(AncientFlag.Name), "") } - chainDb, err := stack.OpenDatabaseWithFreezer(name, cache, handles, ctx.GlobalString(AncientFlag.Name), "") if err != nil { Fatalf("Could not open database: %v", err) } diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 174557531..35542baf4 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -369,7 +369,7 @@ func (c *Clique) snapshot(chain consensus.ChainReader, number uint64, hash commo // at a checkpoint block without a parent (light client CHT), or we have piled // up more headers than allowed to be reorged (chain reinit from a freezer), // consider the checkpoint trusted and snapshot it. - if number == 0 || (number%c.config.Epoch == 0 && (len(headers) > params.ImmutabilityThreshold || chain.GetHeaderByNumber(number-1) == nil)) { + if number == 0 || (number%c.config.Epoch == 0 && (len(headers) > params.FullImmutabilityThreshold || chain.GetHeaderByNumber(number-1) == nil)) { checkpoint := chain.GetHeaderByNumber(number) if checkpoint != nil { hash := checkpoint.Hash() diff --git a/core/blockchain.go b/core/blockchain.go index 0987d65be..7742f4ec2 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -901,14 +901,14 @@ func (bc *BlockChain) Stop() { recent := bc.GetBlockByNumber(number - offset) log.Info("Writing cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "root", recent.Root()) - if err := triedb.Commit(recent.Root(), true); err != nil { + if err := triedb.Commit(recent.Root(), true, nil); err != nil { log.Error("Failed to commit recent state trie", "err", err) } } } if snapBase != (common.Hash{}) { log.Info("Writing snapshot state to disk", "root", snapBase) - if err := triedb.Commit(snapBase, true); err != nil { + if err := triedb.Commit(snapBase, true, nil); err != nil { log.Error("Failed to commit recent state trie", "err", err) } } @@ -1442,7 +1442,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // If we're running an archive node, always flush if bc.cacheConfig.TrieDirtyDisabled { - if err := triedb.Commit(root, false); err != nil { + if err := triedb.Commit(root, false, nil); err != nil { return NonStatTy, err } } else { @@ -1476,7 +1476,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/TriesInMemory) } // Flush an entire trie and restart the counters - triedb.Commit(header.Root, true) + triedb.Commit(header.Root, true, nil) lastWrite = chosen bc.gcproc = 0 } diff --git a/core/chain_indexer.go b/core/chain_indexer.go index 1bff3aee7..066bca100 100644 --- a/core/chain_indexer.go +++ b/core/chain_indexer.go @@ -46,6 +46,9 @@ type ChainIndexerBackend interface { // Commit finalizes the section metadata and stores it into the database. Commit() error + + // Prune deletes the chain index older than the given threshold. + Prune(threshold uint64) error } // ChainIndexerChain interface is used for connecting the indexer to a blockchain @@ -386,7 +389,6 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com c.log.Trace("Processing new chain section", "section", section) // Reset and partial processing - if err := c.backend.Reset(c.ctx, section, lastHead); err != nil { c.setValidSections(0) return common.Hash{}, err @@ -459,6 +461,11 @@ func (c *ChainIndexer) AddChildIndexer(indexer *ChainIndexer) { } } +// Prune deletes all chain data older than given threshold. +func (c *ChainIndexer) Prune(threshold uint64) error { + return c.backend.Prune(threshold) +} + // loadValidSections reads the number of valid sections from the index database // and caches is into the local state. func (c *ChainIndexer) loadValidSections() { diff --git a/core/chain_indexer_test.go b/core/chain_indexer_test.go index ff7548e7b..b76203dc8 100644 --- a/core/chain_indexer_test.go +++ b/core/chain_indexer_test.go @@ -236,3 +236,7 @@ func (b *testChainIndexBackend) Commit() error { } return nil } + +func (b *testChainIndexBackend) Prune(threshold uint64) error { + return nil +} diff --git a/core/chain_makers.go b/core/chain_makers.go index 6524087d4..33f253d9e 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -220,7 +220,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse if err != nil { panic(fmt.Sprintf("state write error: %v", err)) } - if err := statedb.Database().TrieDB().Commit(root, false); err != nil { + if err := statedb.Database().TrieDB().Commit(root, false, nil); err != nil { panic(fmt.Sprintf("trie write error: %v", err)) } return block, b.receipts diff --git a/core/dao_test.go b/core/dao_test.go index 89e1d83d7..b2a9f624a 100644 --- a/core/dao_test.go +++ b/core/dao_test.go @@ -79,7 +79,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { if _, err := bc.InsertChain(blocks); err != nil { t.Fatalf("failed to import contra-fork chain for expansion: %v", err) } - if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true); err != nil { + if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true, nil); err != nil { t.Fatalf("failed to commit contra-fork head for expansion: %v", err) } blocks, _ = GenerateChain(&proConf, conBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {}) @@ -104,7 +104,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { if _, err := bc.InsertChain(blocks); err != nil { t.Fatalf("failed to import pro-fork chain for expansion: %v", err) } - if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true); err != nil { + if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true, nil); err != nil { t.Fatalf("failed to commit pro-fork head for expansion: %v", err) } blocks, _ = GenerateChain(&conConf, proBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {}) @@ -130,7 +130,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { if _, err := bc.InsertChain(blocks); err != nil { t.Fatalf("failed to import contra-fork chain for expansion: %v", err) } - if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true); err != nil { + if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true, nil); err != nil { t.Fatalf("failed to commit contra-fork head for expansion: %v", err) } blocks, _ = GenerateChain(&proConf, conBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {}) @@ -150,7 +150,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { if _, err := bc.InsertChain(blocks); err != nil { t.Fatalf("failed to import pro-fork chain for expansion: %v", err) } - if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true); err != nil { + if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true, nil); err != nil { t.Fatalf("failed to commit pro-fork head for expansion: %v", err) } blocks, _ = GenerateChain(&conConf, proBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {}) diff --git a/core/genesis.go b/core/genesis.go index 655736906..afaa29c42 100644 --- a/core/genesis.go +++ b/core/genesis.go @@ -285,7 +285,7 @@ func (g *Genesis) ToBlock(db ethdb.Database) *types.Block { head.Difficulty = params.GenesisDifficulty } statedb.Commit(false) - statedb.Database().TrieDB().Commit(root, true) + statedb.Database().TrieDB().Commit(root, true, nil) return types.NewBlock(head, nil, nil, nil) } diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 2290e87d5..8dd1f6345 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -80,6 +80,39 @@ func ReadAllHashes(db ethdb.Iteratee, number uint64) []common.Hash { return hashes } +// ReadAllCanonicalHashes retrieves all canonical number and hash mappings at the +// certain chain range. If the accumulated entries reaches the given threshold, +// abort the iteration and return the semi-finish result. +func ReadAllCanonicalHashes(db ethdb.Iteratee, from uint64, to uint64, limit int) ([]uint64, []common.Hash) { + // Short circuit if the limit is 0. + if limit == 0 { + return nil, nil + } + var ( + numbers []uint64 + hashes []common.Hash + ) + // Construct the key prefix of start point. + start, end := headerHashKey(from), headerHashKey(to) + it := db.NewIterator(nil, start) + defer it.Release() + + for it.Next() { + if bytes.Compare(it.Key(), end) >= 0 { + break + } + if key := it.Key(); len(key) == len(headerPrefix)+8+1 && bytes.Equal(key[len(key)-1:], headerHashSuffix) { + numbers = append(numbers, binary.BigEndian.Uint64(key[len(headerPrefix):len(headerPrefix)+8])) + hashes = append(hashes, common.BytesToHash(it.Value())) + // If the accumulated entries reaches the limit threshold, return. + if len(numbers) >= limit { + break + } + } + } + return numbers, hashes +} + // ReadHeaderNumber returns the header number assigned to a hash. func ReadHeaderNumber(db ethdb.KeyValueReader, hash common.Hash) *uint64 { data, _ := db.Get(headerNumberKey(hash)) diff --git a/core/rawdb/accessors_chain_test.go b/core/rawdb/accessors_chain_test.go index bb7dad5df..3eba2a3b4 100644 --- a/core/rawdb/accessors_chain_test.go +++ b/core/rawdb/accessors_chain_test.go @@ -23,6 +23,7 @@ import ( "io/ioutil" "math/big" "os" + "reflect" "testing" "github.com/ethereum/go-ethereum/common" @@ -424,3 +425,35 @@ func TestAncientStorage(t *testing.T) { t.Fatalf("invalid td returned") } } + +func TestCanonicalHashIteration(t *testing.T) { + var cases = []struct { + from, to uint64 + limit int + expect []uint64 + }{ + {1, 8, 0, nil}, + {1, 8, 1, []uint64{1}}, + {1, 8, 10, []uint64{1, 2, 3, 4, 5, 6, 7}}, + {1, 9, 10, []uint64{1, 2, 3, 4, 5, 6, 7, 8}}, + {2, 9, 10, []uint64{2, 3, 4, 5, 6, 7, 8}}, + {9, 10, 10, nil}, + } + // Test empty db iteration + db := NewMemoryDatabase() + numbers, _ := ReadAllCanonicalHashes(db, 0, 10, 10) + if len(numbers) != 0 { + t.Fatalf("No entry should be returned to iterate an empty db") + } + // Fill database with testing data. + for i := uint64(1); i <= 8; i++ { + WriteCanonicalHash(db, common.Hash{}, i) + WriteTd(db, common.Hash{}, i, big.NewInt(10)) // Write some interferential data + } + for i, c := range cases { + numbers, _ := ReadAllCanonicalHashes(db, c.from, c.to, c.limit) + if !reflect.DeepEqual(numbers, c.expect) { + t.Fatalf("Case %d failed, want %v, got %v", i, c.expect, numbers) + } + } +} diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go index c7f3df2ad..9a05eba8d 100644 --- a/core/rawdb/accessors_indexes.go +++ b/core/rawdb/accessors_indexes.go @@ -17,6 +17,7 @@ package rawdb import ( + "bytes" "math/big" "github.com/ethereum/go-ethereum/common" @@ -151,3 +152,24 @@ func WriteBloomBits(db ethdb.KeyValueWriter, bit uint, section uint64, head comm log.Crit("Failed to store bloom bits", "err", err) } } + +// DeleteBloombits removes all compressed bloom bits vector belonging to the +// given section range and bit index. +func DeleteBloombits(db ethdb.Database, bit uint, from uint64, to uint64) { + start, end := bloomBitsKey(bit, from, common.Hash{}), bloomBitsKey(bit, to, common.Hash{}) + it := db.NewIterator(nil, start) + defer it.Release() + + for it.Next() { + if bytes.Compare(it.Key(), end) >= 0 { + break + } + if len(it.Key()) != len(bloomBitsPrefix)+2+8+32 { + continue + } + db.Delete(it.Key()) + } + if it.Error() != nil { + log.Crit("Failed to delete bloom bits", "err", it.Error()) + } +} diff --git a/core/rawdb/accessors_indexes_test.go b/core/rawdb/accessors_indexes_test.go index c09bff010..49d00f990 100644 --- a/core/rawdb/accessors_indexes_test.go +++ b/core/rawdb/accessors_indexes_test.go @@ -17,12 +17,14 @@ package rawdb import ( + "bytes" "math/big" "testing" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" ) @@ -106,3 +108,46 @@ func TestLookupStorage(t *testing.T) { }) } } + +func TestDeleteBloomBits(t *testing.T) { + // Prepare testing data + db := NewMemoryDatabase() + for i := uint(0); i < 2; i++ { + for s := uint64(0); s < 2; s++ { + WriteBloomBits(db, i, s, params.MainnetGenesisHash, []byte{0x01, 0x02}) + WriteBloomBits(db, i, s, params.RinkebyGenesisHash, []byte{0x01, 0x02}) + } + } + check := func(bit uint, section uint64, head common.Hash, exist bool) { + bits, _ := ReadBloomBits(db, bit, section, head) + if exist && !bytes.Equal(bits, []byte{0x01, 0x02}) { + t.Fatalf("Bloombits mismatch") + } + if !exist && len(bits) > 0 { + t.Fatalf("Bloombits should be removed") + } + } + // Check the existence of written data. + check(0, 0, params.MainnetGenesisHash, true) + check(0, 0, params.RinkebyGenesisHash, true) + + // Check the existence of deleted data. + DeleteBloombits(db, 0, 0, 1) + check(0, 0, params.MainnetGenesisHash, false) + check(0, 0, params.RinkebyGenesisHash, false) + check(0, 1, params.MainnetGenesisHash, true) + check(0, 1, params.RinkebyGenesisHash, true) + + // Check the existence of deleted data. + DeleteBloombits(db, 0, 0, 2) + check(0, 0, params.MainnetGenesisHash, false) + check(0, 0, params.RinkebyGenesisHash, false) + check(0, 1, params.MainnetGenesisHash, false) + check(0, 1, params.RinkebyGenesisHash, false) + + // Bit1 shouldn't be affect. + check(1, 0, params.MainnetGenesisHash, true) + check(1, 0, params.RinkebyGenesisHash, true) + check(1, 1, params.MainnetGenesisHash, true) + check(1, 1, params.RinkebyGenesisHash, true) +} diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 01ad281ac..9a40b2cf4 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -287,12 +287,12 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { backoff = true continue - case *number < params.ImmutabilityThreshold: - log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", params.ImmutabilityThreshold) + case *number < params.FullImmutabilityThreshold: + log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", params.FullImmutabilityThreshold) backoff = true continue - case *number-params.ImmutabilityThreshold <= f.frozen: + case *number-params.FullImmutabilityThreshold <= f.frozen: log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", f.frozen) backoff = true continue @@ -304,7 +304,7 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) { continue } // Seems we have data ready to be frozen, process in usable batches - limit := *number - params.ImmutabilityThreshold + limit := *number - params.FullImmutabilityThreshold if limit-f.frozen > freezerBatchLimit { limit = f.frozen + freezerBatchLimit } diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index be2463535..824a59749 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -55,7 +55,7 @@ func TestUpdateLeaks(t *testing.T) { } root := state.IntermediateRoot(false) - if err := state.Database().TrieDB().Commit(root, false); err != nil { + if err := state.Database().TrieDB().Commit(root, false, nil); err != nil { t.Errorf("can not commit trie %v to persistent database", root.Hex()) } @@ -106,7 +106,7 @@ func TestIntermediateLeaks(t *testing.T) { if err != nil { t.Fatalf("failed to commit transition state: %v", err) } - if err = transState.Database().TrieDB().Commit(transRoot, false); err != nil { + if err = transState.Database().TrieDB().Commit(transRoot, false, nil); err != nil { t.Errorf("can not commit trie %v to persistent database", transRoot.Hex()) } @@ -114,7 +114,7 @@ func TestIntermediateLeaks(t *testing.T) { if err != nil { t.Fatalf("failed to commit final state: %v", err) } - if err = finalState.Database().TrieDB().Commit(finalRoot, false); err != nil { + if err = finalState.Database().TrieDB().Commit(finalRoot, false, nil); err != nil { t.Errorf("can not commit trie %v to persistent database", finalRoot.Hex()) } diff --git a/eth/api_backend.go b/eth/api_backend.go index 60ad37e68..a7122da2c 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -185,8 +185,8 @@ func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*typ return logs, nil } -func (b *EthAPIBackend) GetTd(blockHash common.Hash) *big.Int { - return b.eth.blockchain.GetTdByHash(blockHash) +func (b *EthAPIBackend) GetTd(ctx context.Context, hash common.Hash) *big.Int { + return b.eth.blockchain.GetTdByHash(hash) } func (b *EthAPIBackend) GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header) (*vm.EVM, func() error, error) { diff --git a/eth/bloombits.go b/eth/bloombits.go index 35522b9bf..f8b77f9cf 100644 --- a/eth/bloombits.go +++ b/eth/bloombits.go @@ -136,3 +136,8 @@ func (b *BloomIndexer) Commit() error { } return batch.Write() } + +// PruneSections returns an empty error since we don't support pruning here. +func (b *BloomIndexer) Prune(threshold uint64) error { + return nil +} diff --git a/eth/config.go b/eth/config.go index 8547ac177..8b9651e09 100644 --- a/eth/config.go +++ b/eth/config.go @@ -122,10 +122,11 @@ type Config struct { Whitelist map[uint64]common.Hash `toml:"-"` // Light client options - LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests - LightIngress int `toml:",omitempty"` // Incoming bandwidth limit for light servers - LightEgress int `toml:",omitempty"` // Outgoing bandwidth limit for light servers - LightPeers int `toml:",omitempty"` // Maximum number of LES client peers + LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests + LightIngress int `toml:",omitempty"` // Incoming bandwidth limit for light servers + LightEgress int `toml:",omitempty"` // Outgoing bandwidth limit for light servers + LightPeers int `toml:",omitempty"` // Maximum number of LES client peers + LightNoPrune bool `toml:",omitempty"` // Whether to disable light chain pruning // Ultra Light client options UltraLightServers []string `toml:",omitempty"` // List of trusted ultra light servers diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 9e7ea947f..3a289a9c7 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -42,7 +42,6 @@ var ( MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly - MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request MaxStateFetch = 384 // Amount of node state values to allow fetching per request @@ -56,10 +55,11 @@ var ( qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value - maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) - maxHeadersProcess = 2048 // Number of header download results to import at once into the chain - maxResultsProcess = 2048 // Number of content download results to import at once into the chain - maxForkAncestry uint64 = params.ImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it) + maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) + maxHeadersProcess = 2048 // Number of header download results to import at once into the chain + maxResultsProcess = 2048 // Number of content download results to import at once into the chain + fullMaxForkAncestry uint64 = params.FullImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it) + lightMaxForkAncestry uint64 = params.LightImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it) reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs @@ -490,10 +490,10 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I // The peer would start to feed us valid blocks until head, resulting in all of // the blocks might be written into the ancient store. A following mini-reorg // could cause issues. - if d.checkpoint != 0 && d.checkpoint > maxForkAncestry+1 { + if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 { d.ancientLimit = d.checkpoint - } else if height > maxForkAncestry+1 { - d.ancientLimit = height - maxForkAncestry - 1 + } else if height > fullMaxForkAncestry+1 { + d.ancientLimit = height - fullMaxForkAncestry - 1 } frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here. // If a part of blockchain data has already been written into active store, @@ -727,6 +727,10 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight) // Recap floor value for binary search + maxForkAncestry := fullMaxForkAncestry + if d.getMode() == LightSync { + maxForkAncestry = lightMaxForkAncestry + } if localHeight >= maxForkAncestry { // We're above the max reorg threshold, find the earliest fork point floor = int64(localHeight - maxForkAncestry) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 4750da54d..f9092175c 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -37,7 +37,8 @@ import ( // Reduce some of the parameters to make the tester faster. func init() { - maxForkAncestry = 10000 + fullMaxForkAncestry = 10000 + lightMaxForkAncestry = 10000 blockCacheItems = 1024 fsHeaderContCheck = 500 * time.Millisecond } diff --git a/eth/downloader/testchain_test.go b/eth/downloader/testchain_test.go index f410152f5..26b6b6a46 100644 --- a/eth/downloader/testchain_test.go +++ b/eth/downloader/testchain_test.go @@ -45,7 +45,7 @@ var testChainBase = newTestChain(blockCacheItems+200, testGenesis) var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain func init() { - var forkLen = int(maxForkAncestry + 50) + var forkLen = int(fullMaxForkAncestry + 50) var wg sync.WaitGroup wg.Add(3) go func() { testChainForkLightA = testChainBase.makeFork(forkLen, false, 1); wg.Done() }() diff --git a/eth/gen_config.go b/eth/gen_config.go index 8f4a8fa94..2defc36cb 100644 --- a/eth/gen_config.go +++ b/eth/gen_config.go @@ -29,6 +29,7 @@ func (c Config) MarshalTOML() (interface{}, error) { LightIngress int `toml:",omitempty"` LightEgress int `toml:",omitempty"` LightPeers int `toml:",omitempty"` + LightNoPrune bool `toml:",omitempty"` UltraLightServers []string `toml:",omitempty"` UltraLightFraction int `toml:",omitempty"` UltraLightOnlyAnnounce bool `toml:",omitempty"` @@ -66,6 +67,7 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.LightIngress = c.LightIngress enc.LightEgress = c.LightEgress enc.LightPeers = c.LightPeers + enc.LightNoPrune = c.LightNoPrune enc.UltraLightServers = c.UltraLightServers enc.UltraLightFraction = c.UltraLightFraction enc.UltraLightOnlyAnnounce = c.UltraLightOnlyAnnounce @@ -107,6 +109,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { LightIngress *int `toml:",omitempty"` LightEgress *int `toml:",omitempty"` LightPeers *int `toml:",omitempty"` + LightNoPrune *bool `toml:",omitempty"` UltraLightServers []string `toml:",omitempty"` UltraLightFraction *int `toml:",omitempty"` UltraLightOnlyAnnounce *bool `toml:",omitempty"` @@ -171,6 +174,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.LightPeers != nil { c.LightPeers = *dec.LightPeers } + if dec.LightNoPrune != nil { + c.LightNoPrune = *dec.LightNoPrune + } if dec.UltraLightServers != nil { c.UltraLightServers = dec.UltraLightServers } diff --git a/graphql/graphql.go b/graphql/graphql.go index 6e29ccc6e..1479ae7fd 100644 --- a/graphql/graphql.go +++ b/graphql/graphql.go @@ -584,7 +584,7 @@ func (b *Block) TotalDifficulty(ctx context.Context) (hexutil.Big, error) { } h = header.Hash() } - return hexutil.Big(*b.backend.GetTd(h)), nil + return hexutil.Big(*b.backend.GetTd(ctx, h)), nil } // BlockNumberArgs encapsulates arguments to accessors that specify a block number. diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 99b94bd5c..bd4ea6fdd 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -36,7 +36,6 @@ import ( "github.com/ethereum/go-ethereum/consensus/clique" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" @@ -625,7 +624,7 @@ func (s *PublicBlockChainAPI) GetProof(ctx context.Context, address common.Addre func (s *PublicBlockChainAPI) GetHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) { header, err := s.b.HeaderByNumber(ctx, number) if header != nil && err == nil { - response := s.rpcMarshalHeader(header) + response := s.rpcMarshalHeader(ctx, header) if number == rpc.PendingBlockNumber { // Pending header need to nil out a few fields for _, field := range []string{"hash", "nonce", "miner"} { @@ -641,7 +640,7 @@ func (s *PublicBlockChainAPI) GetHeaderByNumber(ctx context.Context, number rpc. func (s *PublicBlockChainAPI) GetHeaderByHash(ctx context.Context, hash common.Hash) map[string]interface{} { header, _ := s.b.HeaderByHash(ctx, hash) if header != nil { - return s.rpcMarshalHeader(header) + return s.rpcMarshalHeader(ctx, header) } return nil } @@ -654,7 +653,7 @@ func (s *PublicBlockChainAPI) GetHeaderByHash(ctx context.Context, hash common.H func (s *PublicBlockChainAPI) GetBlockByNumber(ctx context.Context, number rpc.BlockNumber, fullTx bool) (map[string]interface{}, error) { block, err := s.b.BlockByNumber(ctx, number) if block != nil && err == nil { - response, err := s.rpcMarshalBlock(block, true, fullTx) + response, err := s.rpcMarshalBlock(ctx, block, true, fullTx) if err == nil && number == rpc.PendingBlockNumber { // Pending blocks need to nil out a few fields for _, field := range []string{"hash", "nonce", "miner"} { @@ -671,7 +670,7 @@ func (s *PublicBlockChainAPI) GetBlockByNumber(ctx context.Context, number rpc.B func (s *PublicBlockChainAPI) GetBlockByHash(ctx context.Context, hash common.Hash, fullTx bool) (map[string]interface{}, error) { block, err := s.b.BlockByHash(ctx, hash) if block != nil { - return s.rpcMarshalBlock(block, true, fullTx) + return s.rpcMarshalBlock(ctx, block, true, fullTx) } return nil, err } @@ -687,7 +686,7 @@ func (s *PublicBlockChainAPI) GetUncleByBlockNumberAndIndex(ctx context.Context, return nil, nil } block = types.NewBlockWithHeader(uncles[index]) - return s.rpcMarshalBlock(block, false, false) + return s.rpcMarshalBlock(ctx, block, false, false) } return nil, err } @@ -703,7 +702,7 @@ func (s *PublicBlockChainAPI) GetUncleByBlockHashAndIndex(ctx context.Context, b return nil, nil } block = types.NewBlockWithHeader(uncles[index]) - return s.rpcMarshalBlock(block, false, false) + return s.rpcMarshalBlock(ctx, block, false, false) } return nil, err } @@ -1173,21 +1172,21 @@ func RPCMarshalBlock(block *types.Block, inclTx bool, fullTx bool) (map[string]i // rpcMarshalHeader uses the generalized output filler, then adds the total difficulty field, which requires // a `PublicBlockchainAPI`. -func (s *PublicBlockChainAPI) rpcMarshalHeader(header *types.Header) map[string]interface{} { +func (s *PublicBlockChainAPI) rpcMarshalHeader(ctx context.Context, header *types.Header) map[string]interface{} { fields := RPCMarshalHeader(header) - fields["totalDifficulty"] = (*hexutil.Big)(s.b.GetTd(header.Hash())) + fields["totalDifficulty"] = (*hexutil.Big)(s.b.GetTd(ctx, header.Hash())) return fields } // rpcMarshalBlock uses the generalized output filler, then adds the total difficulty field, which requires // a `PublicBlockchainAPI`. -func (s *PublicBlockChainAPI) rpcMarshalBlock(b *types.Block, inclTx bool, fullTx bool) (map[string]interface{}, error) { +func (s *PublicBlockChainAPI) rpcMarshalBlock(ctx context.Context, b *types.Block, inclTx bool, fullTx bool) (map[string]interface{}, error) { fields, err := RPCMarshalBlock(b, inclTx, fullTx) if err != nil { return nil, err } if inclTx { - fields["totalDifficulty"] = (*hexutil.Big)(s.b.GetTd(b.Hash())) + fields["totalDifficulty"] = (*hexutil.Big)(s.b.GetTd(ctx, b.Hash())) } return fields, err } @@ -1393,8 +1392,8 @@ func (s *PublicTransactionPoolAPI) GetRawTransactionByHash(ctx context.Context, // GetTransactionReceipt returns the transaction receipt for the given transaction hash. func (s *PublicTransactionPoolAPI) GetTransactionReceipt(ctx context.Context, hash common.Hash) (map[string]interface{}, error) { - tx, blockHash, blockNumber, index := rawdb.ReadTransaction(s.b.ChainDb(), hash) - if tx == nil { + tx, blockHash, blockNumber, index, err := s.b.GetTransaction(ctx, hash) + if err != nil { return nil, nil } receipts, err := s.b.GetReceipts(ctx, blockHash) diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index dbdd35ac7..074cd794a 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -59,7 +59,7 @@ type Backend interface { StateAndHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*state.StateDB, *types.Header, error) StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) - GetTd(hash common.Hash) *big.Int + GetTd(ctx context.Context, hash common.Hash) *big.Int GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header) (*vm.EVM, func() error, error) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription diff --git a/les/api_backend.go b/les/api_backend.go index f72cbba07..448260a19 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -162,8 +162,11 @@ func (b *LesApiBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*typ return nil, nil } -func (b *LesApiBackend) GetTd(hash common.Hash) *big.Int { - return b.eth.blockchain.GetTdByHash(hash) +func (b *LesApiBackend) GetTd(ctx context.Context, hash common.Hash) *big.Int { + if number := rawdb.ReadHeaderNumber(b.eth.chainDb, hash); number != nil { + return b.eth.blockchain.GetTdOdr(ctx, hash, *number) + } + return nil } func (b *LesApiBackend) GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header) (*vm.EVM, func() error, error) { diff --git a/les/client.go b/les/client.go index 34a654e22..49b21c967 100644 --- a/les/client.go +++ b/les/client.go @@ -62,6 +62,7 @@ type LightEthereum struct { serverPool *serverPool valueTracker *lpc.ValueTracker dialCandidates enode.Iterator + pruner *pruner bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports @@ -121,8 +122,8 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { leth.relay = newLesTxRelay(peers, leth.retriever) leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.retriever) - leth.chtIndexer = light.NewChtIndexer(chainDb, leth.odr, params.CHTFrequency, params.HelperTrieConfirmations) - leth.bloomTrieIndexer = light.NewBloomTrieIndexer(chainDb, leth.odr, params.BloomBitsBlocksClient, params.BloomTrieFrequency) + leth.chtIndexer = light.NewChtIndexer(chainDb, leth.odr, params.CHTFrequency, params.HelperTrieConfirmations, config.LightNoPrune) + leth.bloomTrieIndexer = light.NewBloomTrieIndexer(chainDb, leth.odr, params.BloomBitsBlocksClient, params.BloomTrieFrequency, config.LightNoPrune) leth.odr.SetIndexers(leth.chtIndexer, leth.bloomTrieIndexer, leth.bloomIndexer) checkpoint := config.Checkpoint @@ -149,6 +150,9 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { leth.chtIndexer.Start(leth.blockchain) leth.bloomIndexer.Start(leth.blockchain) + // Start a light chain pruner to delete useless historical data. + leth.pruner = newPruner(chainDb, leth.chtIndexer, leth.bloomTrieIndexer) + // Rewind the chain in case of an incompatible config upgrade. if compat, ok := genesisErr.(*params.ConfigCompatError); ok { log.Warn("Rewinding chain to upgrade configuration", "err", compat) @@ -302,6 +306,7 @@ func (s *LightEthereum) Stop() error { s.handler.stop() s.txPool.Stop() s.engine.Close() + s.pruner.close() s.eventMux.Stop() s.chainDb.Close() s.wg.Wait() diff --git a/les/odr_requests.go b/les/odr_requests.go index c4b38060c..8c1e0102f 100644 --- a/les/odr_requests.go +++ b/les/odr_requests.go @@ -110,14 +110,16 @@ func (r *BlockRequest) Validate(db ethdb.Database, msg *Msg) error { body := bodies[0] // Retrieve our stored header and validate block content against it - header := rawdb.ReadHeader(db, r.Hash, r.Number) - if header == nil { + if r.Header == nil { + r.Header = rawdb.ReadHeader(db, r.Hash, r.Number) + } + if r.Header == nil { return errHeaderUnavailable } - if header.TxHash != types.DeriveSha(types.Transactions(body.Transactions)) { + if r.Header.TxHash != types.DeriveSha(types.Transactions(body.Transactions)) { return errTxHashMismatch } - if header.UncleHash != types.CalcUncleHash(body.Uncles) { + if r.Header.UncleHash != types.CalcUncleHash(body.Uncles) { return errUncleHashMismatch } // Validations passed, encode and store RLP diff --git a/les/odr_test.go b/les/odr_test.go index 01cc95695..d30642c4f 100644 --- a/les/odr_test.go +++ b/les/odr_test.go @@ -183,7 +183,7 @@ func odrTxStatus(ctx context.Context, db ethdb.Database, config *params.ChainCon // testOdr tests odr requests whose validation guaranteed by block headers. func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn odrTestFn) { // Assemble the test environment - server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true) + server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true, true) defer tearDown() // Ensure the client has synced all necessary data. diff --git a/les/pruner.go b/les/pruner.go new file mode 100644 index 000000000..622e64868 --- /dev/null +++ b/les/pruner.go @@ -0,0 +1,98 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package les + +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/math" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +// pruner is responsible for pruning historical light chain data. +type pruner struct { + db ethdb.Database + indexers []*core.ChainIndexer + closeCh chan struct{} + wg sync.WaitGroup +} + +// newPruner returns a light chain pruner instance. +func newPruner(db ethdb.Database, indexers ...*core.ChainIndexer) *pruner { + pruner := &pruner{ + db: db, + indexers: indexers, + closeCh: make(chan struct{}), + } + pruner.wg.Add(1) + go pruner.loop() + return pruner +} + +// close notifies all background goroutines belonging to pruner to exit. +func (p *pruner) close() { + close(p.closeCh) + p.wg.Wait() +} + +// loop periodically queries the status of chain indexers and prunes useless +// historical chain data. Notably, whenever Geth restarts, it will iterate +// all historical sections even they don't exist at all(below checkpoint) so +// that light client can prune cached chain data that was ODRed after pruning +// that section. +func (p *pruner) loop() { + defer p.wg.Done() + + // cleanTicker is the ticker used to trigger a history clean 2 times a day. + var cleanTicker = time.NewTicker(12 * time.Hour) + + // pruning finds the sections that have been processed by all indexers + // and deletes all historical chain data. + // Note, if some indexers don't support pruning(e.g. eth.BloomIndexer), + // pruning operations can be silently ignored. + pruning := func() { + min := uint64(math.MaxUint64) + for _, indexer := range p.indexers { + sections, _, _ := indexer.Sections() + if sections < min { + min = sections + } + } + // Always keep the latest section data in database. + if min < 2 || len(p.indexers) == 0 { + return + } + for _, indexer := range p.indexers { + if err := indexer.Prune(min - 2); err != nil { + log.Debug("Failed to prune historical data", "err", err) + return + } + } + p.db.Compact(nil, nil) // Compact entire database, ensure all removed data are deleted. + } + for { + pruning() + select { + case <-cleanTicker.C: + case <-p.closeCh: + return + } + } +} diff --git a/les/pruner_test.go b/les/pruner_test.go new file mode 100644 index 000000000..62b4e9a95 --- /dev/null +++ b/les/pruner_test.go @@ -0,0 +1,197 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package les + +import ( + "bytes" + "context" + "encoding/binary" + "testing" + "time" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/light" +) + +func TestLightPruner(t *testing.T) { + config := light.TestClientIndexerConfig + + waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) { + for { + cs, _, _ := cIndexer.Sections() + bts, _, _ := btIndexer.Sections() + if cs >= 3 && bts >= 3 { + break + } + time.Sleep(10 * time.Millisecond) + } + } + server, client, tearDown := newClientServerEnv(t, int(3*config.ChtSize+config.ChtConfirms), 2, waitIndexers, nil, 0, false, true, false) + defer tearDown() + + // checkDB iterates the chain with given prefix, resolves the block number + // with given callback and ensures this entry should exist or not. + checkDB := func(from, to uint64, prefix []byte, resolve func(key, value []byte) *uint64, exist bool) bool { + it := client.db.NewIterator(prefix, nil) + defer it.Release() + + var next = from + for it.Next() { + number := resolve(it.Key(), it.Value()) + if number == nil || *number < from { + continue + } else if *number > to { + return true + } + if exist { + if *number != next { + return false + } + next++ + } else { + return false + } + } + return true + } + // checkPruned checks and ensures the stale chain data has been pruned. + checkPruned := func(from, to uint64) { + // Iterate canonical hash + if !checkDB(from, to, []byte("h"), func(key, value []byte) *uint64 { + if len(key) == 1+8+1 && bytes.Equal(key[9:10], []byte("n")) { + n := binary.BigEndian.Uint64(key[1:9]) + return &n + } + return nil + }, false) { + t.Fatalf("canonical hash mappings are not properly pruned") + } + // Iterate header + if !checkDB(from, to, []byte("h"), func(key, value []byte) *uint64 { + if len(key) == 1+8+32 { + n := binary.BigEndian.Uint64(key[1:9]) + return &n + } + return nil + }, false) { + t.Fatalf("headers are not properly pruned") + } + // Iterate body + if !checkDB(from, to, []byte("b"), func(key, value []byte) *uint64 { + if len(key) == 1+8+32 { + n := binary.BigEndian.Uint64(key[1:9]) + return &n + } + return nil + }, false) { + t.Fatalf("block bodies are not properly pruned") + } + // Iterate receipts + if !checkDB(from, to, []byte("r"), func(key, value []byte) *uint64 { + if len(key) == 1+8+32 { + n := binary.BigEndian.Uint64(key[1:9]) + return &n + } + return nil + }, false) { + t.Fatalf("receipts are not properly pruned") + } + // Iterate td + if !checkDB(from, to, []byte("h"), func(key, value []byte) *uint64 { + if len(key) == 1+8+32+1 && bytes.Equal(key[41:42], []byte("t")) { + n := binary.BigEndian.Uint64(key[1:9]) + return &n + } + return nil + }, false) { + t.Fatalf("tds are not properly pruned") + } + } + // Start light pruner. + time.Sleep(1500 * time.Millisecond) // Ensure light client has finished the syncing and indexing + newPruner(client.db, client.chtIndexer, client.bloomTrieIndexer) + + time.Sleep(1500 * time.Millisecond) // Ensure pruner have enough time to prune data. + checkPruned(1, config.ChtSize-1) + + // Ensure all APIs still work after pruning. + var cases = []struct { + from, to uint64 + methodName string + method func(uint64) bool + }{ + { + 1, 10, "GetHeaderByNumber", + func(n uint64) bool { + _, err := light.GetHeaderByNumber(context.Background(), client.handler.backend.odr, n) + return err == nil + }, + }, + { + 11, 20, "GetCanonicalHash", + func(n uint64) bool { + _, err := light.GetCanonicalHash(context.Background(), client.handler.backend.odr, n) + return err == nil + }, + }, + { + 21, 30, "GetTd", + func(n uint64) bool { + _, err := light.GetTd(context.Background(), client.handler.backend.odr, server.handler.blockchain.GetHeaderByNumber(n).Hash(), n) + return err == nil + }, + }, + { + 31, 40, "GetBodyRLP", + func(n uint64) bool { + _, err := light.GetBodyRLP(context.Background(), client.handler.backend.odr, server.handler.blockchain.GetHeaderByNumber(n).Hash(), n) + return err == nil + }, + }, + { + 41, 50, "GetBlock", + func(n uint64) bool { + _, err := light.GetBlock(context.Background(), client.handler.backend.odr, server.handler.blockchain.GetHeaderByNumber(n).Hash(), n) + return err == nil + }, + }, + { + 51, 60, "GetBlockReceipts", + func(n uint64) bool { + _, err := light.GetBlockReceipts(context.Background(), client.handler.backend.odr, server.handler.blockchain.GetHeaderByNumber(n).Hash(), n) + return err == nil + }, + }, + } + for _, c := range cases { + for i := c.from; i <= c.to; i++ { + if !c.method(i) { + t.Fatalf("rpc method %s failed, number %d", c.methodName, i) + } + } + } + // Check GetBloombits + _, err := light.GetBloomBits(context.Background(), client.handler.backend.odr, 0, []uint64{0}) + if err != nil { + t.Fatalf("Failed to retrieve bloombits of pruned section: %v", err) + } + + // Ensure the ODR cached data can be cleaned by pruner. + newPruner(client.db, client.chtIndexer, client.bloomTrieIndexer) + time.Sleep(50 * time.Millisecond) // Ensure pruner have enough time to prune data. + checkPruned(1, config.ChtSize-1) // Ensure all cached data(by odr) is cleaned. +} diff --git a/les/request_test.go b/les/request_test.go index e20b06fda..485127438 100644 --- a/les/request_test.go +++ b/les/request_test.go @@ -79,7 +79,7 @@ func tfCodeAccess(db ethdb.Database, bhash common.Hash, num uint64) light.OdrReq func testAccess(t *testing.T, protocol int, fn accessTestFn) { // Assemble the test environment - server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true) + server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true, true) defer tearDown() // Ensure the client has synced all necessary data. diff --git a/les/server.go b/les/server.go index 4b623f61e..a154571b4 100644 --- a/les/server.go +++ b/les/server.go @@ -77,8 +77,8 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) { iConfig: light.DefaultServerIndexerConfig, chainDb: e.ChainDb(), chainReader: e.BlockChain(), - chtIndexer: light.NewChtIndexer(e.ChainDb(), nil, params.CHTFrequency, params.HelperTrieProcessConfirmations), - bloomTrieIndexer: light.NewBloomTrieIndexer(e.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency), + chtIndexer: light.NewChtIndexer(e.ChainDb(), nil, params.CHTFrequency, params.HelperTrieProcessConfirmations, true), + bloomTrieIndexer: light.NewBloomTrieIndexer(e.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency, true), closeCh: make(chan struct{}), }, archiveMode: e.ArchiveMode(), diff --git a/les/sync_test.go b/les/sync_test.go index c128a8c9f..ffce4d8df 100644 --- a/les/sync_test.go +++ b/les/sync_test.go @@ -54,7 +54,7 @@ func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) { } } // Generate 512+4 blocks (totally 1 CHT sections) - server, client, tearDown := newClientServerEnv(t, int(config.ChtSize+config.ChtConfirms), protocol, waitIndexers, nil, 0, false, false) + server, client, tearDown := newClientServerEnv(t, int(config.ChtSize+config.ChtConfirms), protocol, waitIndexers, nil, 0, false, false, true) defer tearDown() expected := config.ChtSize + config.ChtConfirms @@ -144,7 +144,7 @@ func testMissOracleBackend(t *testing.T, hasCheckpoint bool) { } } // Generate 512+4 blocks (totally 1 CHT sections) - server, client, tearDown := newClientServerEnv(t, int(config.ChtSize+config.ChtConfirms), 3, waitIndexers, nil, 0, false, false) + server, client, tearDown := newClientServerEnv(t, int(config.ChtSize+config.ChtConfirms), 3, waitIndexers, nil, 0, false, false, true) defer tearDown() expected := config.ChtSize + config.ChtConfirms diff --git a/les/test_helper.go b/les/test_helper.go index 2a2bbb440..28906f1f1 100644 --- a/les/test_helper.go +++ b/les/test_helper.go @@ -158,11 +158,11 @@ func prepare(n int, backend *backends.SimulatedBackend) { } // testIndexers creates a set of indexers with specified params for testing purpose. -func testIndexers(db ethdb.Database, odr light.OdrBackend, config *light.IndexerConfig) []*core.ChainIndexer { +func testIndexers(db ethdb.Database, odr light.OdrBackend, config *light.IndexerConfig, disablePruning bool) []*core.ChainIndexer { var indexers [3]*core.ChainIndexer - indexers[0] = light.NewChtIndexer(db, odr, config.ChtSize, config.ChtConfirms) + indexers[0] = light.NewChtIndexer(db, odr, config.ChtSize, config.ChtConfirms, disablePruning) indexers[1] = eth.NewBloomIndexer(db, config.BloomSize, config.BloomConfirms) - indexers[2] = light.NewBloomTrieIndexer(db, odr, config.BloomSize, config.BloomTrieSize) + indexers[2] = light.NewBloomTrieIndexer(db, odr, config.BloomSize, config.BloomTrieSize, disablePruning) // make bloomTrieIndexer as a child indexer of bloom indexer. indexers[1].AddChildIndexer(indexers[2]) return indexers[:] @@ -456,7 +456,7 @@ type testServer struct { func newServerEnv(t *testing.T, blocks int, protocol int, callback indexerCallback, simClock bool, newPeer bool, testCost uint64) (*testServer, func()) { db := rawdb.NewMemoryDatabase() - indexers := testIndexers(db, nil, light.TestServerIndexerConfig) + indexers := testIndexers(db, nil, light.TestServerIndexerConfig, true) var clock mclock.Clock = &mclock.System{} if simClock { @@ -499,7 +499,7 @@ func newServerEnv(t *testing.T, blocks int, protocol int, callback indexerCallba return server, teardown } -func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexerCallback, ulcServers []string, ulcFraction int, simClock bool, connect bool) (*testServer, *testClient, func()) { +func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexerCallback, ulcServers []string, ulcFraction int, simClock bool, connect bool, disablePruning bool) (*testServer, *testClient, func()) { sdb, cdb := rawdb.NewMemoryDatabase(), rawdb.NewMemoryDatabase() speers, cpeers := newServerPeerSet(), newClientPeerSet() @@ -511,8 +511,8 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexer rm := newRetrieveManager(speers, dist, func() time.Duration { return time.Millisecond * 500 }) odr := NewLesOdr(cdb, light.TestClientIndexerConfig, rm) - sindexers := testIndexers(sdb, nil, light.TestServerIndexerConfig) - cIndexers := testIndexers(cdb, odr, light.TestClientIndexerConfig) + sindexers := testIndexers(sdb, nil, light.TestServerIndexerConfig, true) + cIndexers := testIndexers(cdb, odr, light.TestClientIndexerConfig, disablePruning) scIndexer, sbIndexer, sbtIndexer := sindexers[0], sindexers[1], sindexers[2] ccIndexer, cbIndexer, cbtIndexer := cIndexers[0], cIndexers[1], cIndexers[2] @@ -542,7 +542,7 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexer } select { case <-done: - case <-time.After(3 * time.Second): + case <-time.After(10 * time.Second): t.Fatal("test peer did not connect and sync within 3s") } } diff --git a/les/ulc_test.go b/les/ulc_test.go index 273c63e4b..657b13db2 100644 --- a/les/ulc_test.go +++ b/les/ulc_test.go @@ -138,6 +138,6 @@ func newTestServerPeer(t *testing.T, blocks int, protocol int) (*testServer, *en // newTestLightPeer creates node with light sync mode func newTestLightPeer(t *testing.T, protocol int, ulcServers []string, ulcFraction int) (*testClient, func()) { - _, c, teardown := newClientServerEnv(t, 0, protocol, nil, ulcServers, ulcFraction, false, false) + _, c, teardown := newClientServerEnv(t, 0, protocol, nil, ulcServers, ulcFraction, false, false, true) return c, teardown } diff --git a/light/lightchain.go b/light/lightchain.go index 79eba62c9..6fc321ae0 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -112,7 +112,7 @@ func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus. if header := bc.GetHeaderByHash(hash); header != nil { log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash) bc.SetHead(header.Number.Uint64() - 1) - log.Error("Chain rewind was successful, resuming normal operation") + log.Info("Chain rewind was successful, resuming normal operation") } } return bc, nil @@ -155,7 +155,11 @@ func (lc *LightChain) loadLastState() error { // Corrupt or empty database, init from scratch lc.Reset() } else { - if header := lc.GetHeaderByHash(head); header != nil { + header := lc.GetHeaderByHash(head) + if header == nil { + // Corrupt or empty database, init from scratch + lc.Reset() + } else { lc.hc.SetCurrentHeader(header) } } @@ -163,7 +167,6 @@ func (lc *LightChain) loadLastState() error { header := lc.hc.CurrentHeader() headerTd := lc.GetTd(header.Hash(), header.Number.Uint64()) log.Info("Loaded most recent local header", "number", header.Number, "hash", header.Hash(), "td", headerTd, "age", common.PrettyAge(time.Unix(int64(header.Time), 0))) - return nil } @@ -431,6 +434,17 @@ func (lc *LightChain) GetTdByHash(hash common.Hash) *big.Int { return lc.hc.GetTdByHash(hash) } +// GetHeaderByNumberOdr retrieves the total difficult from the database or +// network by hash and number, caching it (associated with its hash) if found. +func (lc *LightChain) GetTdOdr(ctx context.Context, hash common.Hash, number uint64) *big.Int { + td := lc.GetTd(hash, number) + if td != nil { + return td + } + td, _ = GetTd(ctx, lc.odr, hash, number) + return td +} + // GetHeader retrieves a block header from the database by hash and number, // caching it if found. func (lc *LightChain) GetHeader(hash common.Hash, number uint64) *types.Header { diff --git a/light/odr.go b/light/odr.go index 907712ede..1ea98ca5a 100644 --- a/light/odr.go +++ b/light/odr.go @@ -82,7 +82,6 @@ func StorageTrieID(state *TrieID, addrHash, root common.Hash) *TrieID { // TrieRequest is the ODR request type for state/storage trie entries type TrieRequest struct { - OdrRequest Id *TrieID Key []byte Proof *NodeSet @@ -95,7 +94,6 @@ func (req *TrieRequest) StoreResult(db ethdb.Database) { // CodeRequest is the ODR request type for retrieving contract code type CodeRequest struct { - OdrRequest Id *TrieID // references storage trie of the account Hash common.Hash Data []byte @@ -108,9 +106,9 @@ func (req *CodeRequest) StoreResult(db ethdb.Database) { // BlockRequest is the ODR request type for retrieving block bodies type BlockRequest struct { - OdrRequest Hash common.Hash Number uint64 + Header *types.Header Rlp []byte } @@ -119,9 +117,8 @@ func (req *BlockRequest) StoreResult(db ethdb.Database) { rawdb.WriteBodyRLP(db, req.Hash, req.Number, req.Rlp) } -// ReceiptsRequest is the ODR request type for retrieving block bodies +// ReceiptsRequest is the ODR request type for retrieving receipts. type ReceiptsRequest struct { - OdrRequest Untrusted bool // Indicator whether the result retrieved is trusted or not Hash common.Hash Number uint64 @@ -138,7 +135,6 @@ func (req *ReceiptsRequest) StoreResult(db ethdb.Database) { // ChtRequest is the ODR request type for state/storage trie entries type ChtRequest struct { - OdrRequest Untrusted bool // Indicator whether the result retrieved is trusted or not PeerId string // The specified peer id from which to retrieve data. Config *IndexerConfig @@ -193,7 +189,6 @@ type TxStatus struct { // TxStatusRequest is the ODR request type for retrieving transaction status type TxStatusRequest struct { - OdrRequest Hashes []common.Hash Status []TxStatus } diff --git a/light/odr_util.go b/light/odr_util.go index 2c820d40c..aec0c7b69 100644 --- a/light/odr_util.go +++ b/light/odr_util.go @@ -19,6 +19,7 @@ package light import ( "bytes" "context" + "math/big" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -30,65 +31,83 @@ import ( var sha3Nil = crypto.Keccak256Hash(nil) +// GetHeaderByNumber retrieves the canonical block header corresponding to the +// given number. func GetHeaderByNumber(ctx context.Context, odr OdrBackend, number uint64) (*types.Header, error) { + // Try to find it in the local database first. db := odr.Database() hash := rawdb.ReadCanonicalHash(db, number) - if (hash != common.Hash{}) { - // if there is a canonical hash, there is a header too - header := rawdb.ReadHeader(db, hash, number) - if header == nil { - panic("Canonical hash present but header not found") - } - return header, nil - } - var ( - chtCount, sectionHeadNum uint64 - sectionHead common.Hash - ) - if odr.ChtIndexer() != nil { - chtCount, sectionHeadNum, sectionHead = odr.ChtIndexer().Sections() - canonicalHash := rawdb.ReadCanonicalHash(db, sectionHeadNum) - // if the CHT was injected as a trusted checkpoint, we have no canonical hash yet so we accept zero hash too - for chtCount > 0 && canonicalHash != sectionHead && canonicalHash != (common.Hash{}) { - chtCount-- - if chtCount > 0 { - sectionHeadNum = chtCount*odr.IndexerConfig().ChtSize - 1 - sectionHead = odr.ChtIndexer().SectionHead(chtCount - 1) - canonicalHash = rawdb.ReadCanonicalHash(db, sectionHeadNum) - } + // If there is a canonical hash, there should have a header too. + // But if it's pruned, re-fetch from network again. + if (hash != common.Hash{}) { + if header := rawdb.ReadHeader(db, hash, number); header != nil { + return header, nil } } - if number >= chtCount*odr.IndexerConfig().ChtSize { + // Retrieve the header via ODR, ensure the requested header is covered + // by local trusted CHT. + chts, _, chtHead := odr.ChtIndexer().Sections() + if number >= chts*odr.IndexerConfig().ChtSize { return nil, errNoTrustedCht } - r := &ChtRequest{ChtRoot: GetChtRoot(db, chtCount-1, sectionHead), ChtNum: chtCount - 1, BlockNum: number, Config: odr.IndexerConfig()} + r := &ChtRequest{ + ChtRoot: GetChtRoot(db, chts-1, chtHead), + ChtNum: chts - 1, + BlockNum: number, + Config: odr.IndexerConfig(), + } if err := odr.Retrieve(ctx, r); err != nil { return nil, err } return r.Header, nil } -// GetUntrustedHeaderByNumber fetches specified block header without correctness checking. -// Note this function should only be used in light client checkpoint syncing. +// GetUntrustedHeaderByNumber retrieves specified block header without +// correctness checking. Note this function should only be used in light +// client checkpoint syncing. func GetUntrustedHeaderByNumber(ctx context.Context, odr OdrBackend, number uint64, peerId string) (*types.Header, error) { - r := &ChtRequest{BlockNum: number, ChtNum: number / odr.IndexerConfig().ChtSize, Untrusted: true, PeerId: peerId, Config: odr.IndexerConfig()} + // todo(rjl493456442) it's a hack to retrieve headers which is not covered + // by CHT. Fix it in LES4 + r := &ChtRequest{ + BlockNum: number, + ChtNum: number / odr.IndexerConfig().ChtSize, + Untrusted: true, + PeerId: peerId, + Config: odr.IndexerConfig(), + } if err := odr.Retrieve(ctx, r); err != nil { return nil, err } return r.Header, nil } +// GetCanonicalHash retrieves the canonical block hash corresponding to the number. func GetCanonicalHash(ctx context.Context, odr OdrBackend, number uint64) (common.Hash, error) { hash := rawdb.ReadCanonicalHash(odr.Database(), number) - if (hash != common.Hash{}) { + if hash != (common.Hash{}) { return hash, nil } header, err := GetHeaderByNumber(ctx, odr, number) - if header != nil { - return header.Hash(), nil + if err != nil { + return common.Hash{}, err + } + // number -> canonical mapping already be stored in db, get it. + return header.Hash(), nil +} + +// GetTd retrieves the total difficulty corresponding to the number and hash. +func GetTd(ctx context.Context, odr OdrBackend, hash common.Hash, number uint64) (*big.Int, error) { + td := rawdb.ReadTd(odr.Database(), hash, number) + if td != nil { + return td, nil + } + _, err := GetHeaderByNumber(ctx, odr, number) + if err != nil { + return nil, err } - return common.Hash{}, err + // -> td mapping already be stored in db, get it. + return rawdb.ReadTd(odr.Database(), hash, number), nil } // GetBodyRLP retrieves the block body (transactions and uncles) in RLP encoding. @@ -96,15 +115,19 @@ func GetBodyRLP(ctx context.Context, odr OdrBackend, hash common.Hash, number ui if data := rawdb.ReadBodyRLP(odr.Database(), hash, number); data != nil { return data, nil } - r := &BlockRequest{Hash: hash, Number: number} + // Retrieve the block header first and pass it for verification. + header, err := GetHeaderByNumber(ctx, odr, number) + if err != nil { + return nil, errNoHeader + } + r := &BlockRequest{Hash: hash, Number: number, Header: header} if err := odr.Retrieve(ctx, r); err != nil { return nil, err - } else { - return r.Rlp, nil } + return r.Rlp, nil } -// GetBody retrieves the block body (transactons, uncles) corresponding to the +// GetBody retrieves the block body (transactions, uncles) corresponding to the // hash. func GetBody(ctx context.Context, odr OdrBackend, hash common.Hash, number uint64) (*types.Body, error) { data, err := GetBodyRLP(ctx, odr, hash, number) @@ -122,8 +145,8 @@ func GetBody(ctx context.Context, odr OdrBackend, hash common.Hash, number uint6 // back from the stored header and body. func GetBlock(ctx context.Context, odr OdrBackend, hash common.Hash, number uint64) (*types.Block, error) { // Retrieve the block header and body contents - header := rawdb.ReadHeader(odr.Database(), hash, number) - if header == nil { + header, err := GetHeaderByNumber(ctx, odr, number) + if err != nil { return nil, errNoHeader } body, err := GetBody(ctx, odr, hash, number) @@ -140,7 +163,11 @@ func GetBlockReceipts(ctx context.Context, odr OdrBackend, hash common.Hash, num // Assume receipts are already stored locally and attempt to retrieve. receipts := rawdb.ReadRawReceipts(odr.Database(), hash, number) if receipts == nil { - r := &ReceiptsRequest{Hash: hash, Number: number} + header, err := GetHeaderByNumber(ctx, odr, number) + if err != nil { + return nil, errNoHeader + } + r := &ReceiptsRequest{Hash: hash, Number: number, Header: header} if err := odr.Retrieve(ctx, r); err != nil { return nil, err } @@ -171,7 +198,6 @@ func GetBlockLogs(ctx context.Context, odr OdrBackend, hash common.Hash, number if err != nil { return nil, err } - // Return the logs without deriving any computed fields on the receipts logs := make([][]*types.Log, len(receipts)) for i, receipt := range receipts { logs[i] = receipt.Logs @@ -203,64 +229,51 @@ func GetUntrustedBlockLogs(ctx context.Context, odr OdrBackend, header *types.He return logs, nil } -// GetBloomBits retrieves a batch of compressed bloomBits vectors belonging to the given bit index and section indexes -func GetBloomBits(ctx context.Context, odr OdrBackend, bitIdx uint, sectionIdxList []uint64) ([][]byte, error) { - var ( - db = odr.Database() - result = make([][]byte, len(sectionIdxList)) - reqList []uint64 - reqIdx []int - ) - +// GetBloomBits retrieves a batch of compressed bloomBits vectors belonging to +// the given bit index and section indexes. +func GetBloomBits(ctx context.Context, odr OdrBackend, bit uint, sections []uint64) ([][]byte, error) { var ( - bloomTrieCount, sectionHeadNum uint64 - sectionHead common.Hash + reqIndex []int + reqSections []uint64 + db = odr.Database() + result = make([][]byte, len(sections)) ) - if odr.BloomTrieIndexer() != nil { - bloomTrieCount, sectionHeadNum, sectionHead = odr.BloomTrieIndexer().Sections() - canonicalHash := rawdb.ReadCanonicalHash(db, sectionHeadNum) - // if the BloomTrie was injected as a trusted checkpoint, we have no canonical hash yet so we accept zero hash too - for bloomTrieCount > 0 && canonicalHash != sectionHead && canonicalHash != (common.Hash{}) { - bloomTrieCount-- - if bloomTrieCount > 0 { - sectionHeadNum = bloomTrieCount*odr.IndexerConfig().BloomTrieSize - 1 - sectionHead = odr.BloomTrieIndexer().SectionHead(bloomTrieCount - 1) - canonicalHash = rawdb.ReadCanonicalHash(db, sectionHeadNum) - } - } - } - - for i, sectionIdx := range sectionIdxList { - sectionHead := rawdb.ReadCanonicalHash(db, (sectionIdx+1)*odr.IndexerConfig().BloomSize-1) - // if we don't have the canonical hash stored for this section head number, we'll still look for - // an entry with a zero sectionHead (we store it with zero section head too if we don't know it - // at the time of the retrieval) - bloomBits, err := rawdb.ReadBloomBits(db, bitIdx, sectionIdx, sectionHead) - if err == nil { + blooms, _, sectionHead := odr.BloomTrieIndexer().Sections() + for i, section := range sections { + sectionHead := rawdb.ReadCanonicalHash(db, (section+1)*odr.IndexerConfig().BloomSize-1) + // If we don't have the canonical hash stored for this section head number, + // we'll still look for an entry with a zero sectionHead (we store it with + // zero section head too if we don't know it at the time of the retrieval) + if bloomBits, _ := rawdb.ReadBloomBits(db, bit, section, sectionHead); len(bloomBits) != 0 { result[i] = bloomBits - } else { - // TODO(rjl493456442) Convert sectionIndex to BloomTrie relative index - if sectionIdx >= bloomTrieCount { - return nil, errNoTrustedBloomTrie - } - reqList = append(reqList, sectionIdx) - reqIdx = append(reqIdx, i) + continue } + // TODO(rjl493456442) Convert sectionIndex to BloomTrie relative index + if section >= blooms { + return nil, errNoTrustedBloomTrie + } + reqSections = append(reqSections, section) + reqIndex = append(reqIndex, i) } - if reqList == nil { + // Find all bloombits in database, nothing to query via odr, return. + if reqSections == nil { return result, nil } - - r := &BloomRequest{BloomTrieRoot: GetBloomTrieRoot(db, bloomTrieCount-1, sectionHead), BloomTrieNum: bloomTrieCount - 1, - BitIdx: bitIdx, SectionIndexList: reqList, Config: odr.IndexerConfig()} + // Send odr request to retrieve missing bloombits. + r := &BloomRequest{ + BloomTrieRoot: GetBloomTrieRoot(db, blooms-1, sectionHead), + BloomTrieNum: blooms - 1, + BitIdx: bit, + SectionIndexList: reqSections, + Config: odr.IndexerConfig(), + } if err := odr.Retrieve(ctx, r); err != nil { return nil, err - } else { - for i, idx := range reqIdx { - result[idx] = r.BloomBits[i] - } - return result, nil } + for i, idx := range reqIndex { + result[idx] = r.BloomBits[i] + } + return result, nil } // GetTransaction retrieves a canonical transaction by hash and also returns its position in the chain @@ -268,17 +281,16 @@ func GetTransaction(ctx context.Context, odr OdrBackend, txHash common.Hash) (*t r := &TxStatusRequest{Hashes: []common.Hash{txHash}} if err := odr.Retrieve(ctx, r); err != nil || r.Status[0].Status != core.TxStatusIncluded { return nil, common.Hash{}, 0, 0, err - } else { - pos := r.Status[0].Lookup - // first ensure that we have the header, otherwise block body retrieval will fail - // also verify if this is a canonical block by getting the header by number and checking its hash - if header, err := GetHeaderByNumber(ctx, odr, pos.BlockIndex); err != nil || header.Hash() != pos.BlockHash { - return nil, common.Hash{}, 0, 0, err - } - if body, err := GetBody(ctx, odr, pos.BlockHash, pos.BlockIndex); err != nil || uint64(len(body.Transactions)) <= pos.Index || body.Transactions[pos.Index].Hash() != txHash { - return nil, common.Hash{}, 0, 0, err - } else { - return body.Transactions[pos.Index], pos.BlockHash, pos.BlockIndex, pos.Index, nil - } } + pos := r.Status[0].Lookup + // first ensure that we have the header, otherwise block body retrieval will fail + // also verify if this is a canonical block by getting the header by number and checking its hash + if header, err := GetHeaderByNumber(ctx, odr, pos.BlockIndex); err != nil || header.Hash() != pos.BlockHash { + return nil, common.Hash{}, 0, 0, err + } + body, err := GetBody(ctx, odr, pos.BlockHash, pos.BlockIndex) + if err != nil || uint64(len(body.Transactions)) <= pos.Index || body.Transactions[pos.Index].Hash() != txHash { + return nil, common.Hash{}, 0, 0, err + } + return body.Transactions[pos.Index], pos.BlockHash, pos.BlockIndex, pos.Index, nil } diff --git a/light/postprocess.go b/light/postprocess.go index af3b25792..8efc8d656 100644 --- a/light/postprocess.go +++ b/light/postprocess.go @@ -17,6 +17,7 @@ package light import ( + "bytes" "context" "encoding/binary" "errors" @@ -24,6 +25,7 @@ import ( "math/big" "time" + "github.com/deckarep/golang-set" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/bitutil" "github.com/ethereum/go-ethereum/core" @@ -128,23 +130,27 @@ func StoreChtRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root common // ChtIndexerBackend implements core.ChainIndexerBackend. type ChtIndexerBackend struct { + disablePruning bool diskdb, trieTable ethdb.Database odr OdrBackend triedb *trie.Database + trieset mapset.Set section, sectionSize uint64 lastHash common.Hash trie *trie.Trie } // NewChtIndexer creates a Cht chain indexer -func NewChtIndexer(db ethdb.Database, odr OdrBackend, size, confirms uint64) *core.ChainIndexer { +func NewChtIndexer(db ethdb.Database, odr OdrBackend, size, confirms uint64, disablePruning bool) *core.ChainIndexer { trieTable := rawdb.NewTable(db, ChtTablePrefix) backend := &ChtIndexerBackend{ - diskdb: db, - odr: odr, - trieTable: trieTable, - triedb: trie.NewDatabaseWithCache(trieTable, 1), // Use a tiny cache only to keep memory down - sectionSize: size, + diskdb: db, + odr: odr, + trieTable: trieTable, + triedb: trie.NewDatabaseWithCache(trieTable, 1), // Use a tiny cache only to keep memory down + trieset: mapset.NewSet(), + sectionSize: size, + disablePruning: disablePruning, } return core.NewChainIndexer(db, rawdb.NewTable(db, "chtIndexV2-"), backend, size, confirms, time.Millisecond*100, "cht") } @@ -189,7 +195,6 @@ func (c *ChtIndexerBackend) Reset(ctx context.Context, section uint64, lastSecti c.trie, err = trie.New(root, c.triedb) } } - c.section = section return err } @@ -216,13 +221,83 @@ func (c *ChtIndexerBackend) Commit() error { if err != nil { return err } - c.triedb.Commit(root, false) - + // Pruning historical trie nodes if necessary. + if !c.disablePruning { + // Flush the triedb and track the latest trie nodes. + c.trieset.Clear() + c.triedb.Commit(root, false, func(hash common.Hash) { c.trieset.Add(hash) }) + + it := c.trieTable.NewIterator(nil, nil) + defer it.Release() + + var ( + deleted int + remaining int + t = time.Now() + ) + for it.Next() { + trimmed := bytes.TrimPrefix(it.Key(), []byte(ChtTablePrefix)) + if !c.trieset.Contains(common.BytesToHash(trimmed)) { + c.trieTable.Delete(trimmed) + deleted += 1 + } else { + remaining += 1 + } + } + log.Debug("Prune historical CHT trie nodes", "deleted", deleted, "remaining", remaining, "elapsed", common.PrettyDuration(time.Since(t))) + } else { + c.triedb.Commit(root, false, nil) + } log.Info("Storing CHT", "section", c.section, "head", fmt.Sprintf("%064x", c.lastHash), "root", fmt.Sprintf("%064x", root)) StoreChtRoot(c.diskdb, c.section, c.lastHash, root) return nil } +// PruneSections implements core.ChainIndexerBackend which deletes all +// chain data(except hash<->number mappings) older than the specified +// threshold. +func (c *ChtIndexerBackend) Prune(threshold uint64) error { + // Short circuit if the light pruning is disabled. + if c.disablePruning { + return nil + } + t := time.Now() + // Always keep genesis header in database. + start, end := uint64(1), (threshold+1)*c.sectionSize + + var batch = c.diskdb.NewBatch() + for { + numbers, hashes := rawdb.ReadAllCanonicalHashes(c.diskdb, start, end, 10240) + if len(numbers) == 0 { + break + } + for i := 0; i < len(numbers); i++ { + // Keep hash<->number mapping in database otherwise the hash based + // API(e.g. GetReceipt, GetLogs) will be broken. + // + // Storage size wise, the size of a mapping is ~41bytes. For one + // section is about 1.3MB which is acceptable. + // + // In order to totally get rid of this index, we need an additional + // flag to specify how many historical data light client can serve. + rawdb.DeleteCanonicalHash(batch, numbers[i]) + rawdb.DeleteBlockWithoutNumber(batch, hashes[i], numbers[i]) + } + if batch.ValueSize() > ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + return err + } + batch.Reset() + } + start = numbers[len(numbers)-1] + 1 + } + if err := batch.Write(); err != nil { + return err + } + log.Debug("Prune history headers", "threshold", threshold, "elapsed", common.PrettyDuration(time.Since(t))) + return nil +} + var ( bloomTriePrefix = []byte("bltRoot-") // bloomTriePrefix + bloomTrieNum (uint64 big endian) -> trie root hash BloomTrieTablePrefix = "blt-" @@ -245,8 +320,10 @@ func StoreBloomTrieRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root // BloomTrieIndexerBackend implements core.ChainIndexerBackend type BloomTrieIndexerBackend struct { + disablePruning bool diskdb, trieTable ethdb.Database triedb *trie.Database + trieset mapset.Set odr OdrBackend section uint64 parentSize uint64 @@ -257,15 +334,17 @@ type BloomTrieIndexerBackend struct { } // NewBloomTrieIndexer creates a BloomTrie chain indexer -func NewBloomTrieIndexer(db ethdb.Database, odr OdrBackend, parentSize, size uint64) *core.ChainIndexer { +func NewBloomTrieIndexer(db ethdb.Database, odr OdrBackend, parentSize, size uint64, disablePruning bool) *core.ChainIndexer { trieTable := rawdb.NewTable(db, BloomTrieTablePrefix) backend := &BloomTrieIndexerBackend{ - diskdb: db, - odr: odr, - trieTable: trieTable, - triedb: trie.NewDatabaseWithCache(trieTable, 1), // Use a tiny cache only to keep memory down - parentSize: parentSize, - size: size, + diskdb: db, + odr: odr, + trieTable: trieTable, + triedb: trie.NewDatabaseWithCache(trieTable, 1), // Use a tiny cache only to keep memory down + trieset: mapset.NewSet(), + parentSize: parentSize, + size: size, + disablePruning: disablePruning, } backend.bloomTrieRatio = size / parentSize backend.sectionHeads = make([]common.Hash, backend.bloomTrieRatio) @@ -303,7 +382,6 @@ func (b *BloomTrieIndexerBackend) fetchMissingNodes(ctx context.Context, section } }() } - for i := uint(0); i < types.BloomBitLength; i++ { indexCh <- i } @@ -380,10 +458,51 @@ func (b *BloomTrieIndexerBackend) Commit() error { if err != nil { return err } - b.triedb.Commit(root, false) - + // Pruning historical trie nodes if necessary. + if !b.disablePruning { + // Flush the triedb and track the latest trie nodes. + b.trieset.Clear() + b.triedb.Commit(root, false, func(hash common.Hash) { b.trieset.Add(hash) }) + + it := b.trieTable.NewIterator(nil, nil) + defer it.Release() + + var ( + deleted int + remaining int + t = time.Now() + ) + for it.Next() { + trimmed := bytes.TrimPrefix(it.Key(), []byte(BloomTrieTablePrefix)) + if !b.trieset.Contains(common.BytesToHash(trimmed)) { + b.trieTable.Delete(trimmed) + deleted += 1 + } else { + remaining += 1 + } + } + log.Debug("Prune historical bloom trie nodes", "deleted", deleted, "remaining", remaining, "elapsed", common.PrettyDuration(time.Since(t))) + } else { + b.triedb.Commit(root, false, nil) + } sectionHead := b.sectionHeads[b.bloomTrieRatio-1] - log.Info("Storing bloom trie", "section", b.section, "head", fmt.Sprintf("%064x", sectionHead), "root", fmt.Sprintf("%064x", root), "compression", float64(compSize)/float64(decompSize)) StoreBloomTrieRoot(b.diskdb, b.section, sectionHead, root) + log.Info("Storing bloom trie", "section", b.section, "head", fmt.Sprintf("%064x", sectionHead), "root", fmt.Sprintf("%064x", root), "compression", float64(compSize)/float64(decompSize)) + + return nil +} + +// Prune implements core.ChainIndexerBackend which deletes all +// bloombits which older than the specified threshold. +func (b *BloomTrieIndexerBackend) Prune(threshold uint64) error { + // Short circuit if the light pruning is disabled. + if b.disablePruning { + return nil + } + start := time.Now() + for i := uint(0); i < types.BloomBitLength; i++ { + rawdb.DeleteBloombits(b.diskdb, i, 0, threshold*b.bloomTrieRatio+b.bloomTrieRatio) + } + log.Debug("Prune history bloombits", "threshold", threshold, "elapsed", common.PrettyDuration(time.Since(start))) return nil } diff --git a/params/network_params.go b/params/network_params.go index ab2e845a4..9311b5e2d 100644 --- a/params/network_params.go +++ b/params/network_params.go @@ -53,9 +53,15 @@ const ( // CheckpointProcessConfirmations is the number before a checkpoint is generated CheckpointProcessConfirmations = 256 - // ImmutabilityThreshold is the number of blocks after which a chain segment is + // FullImmutabilityThreshold is the number of blocks after which a chain segment is // considered immutable (i.e. soft finality). It is used by the downloader as a // hard limit against deep ancestors, by the blockchain against deep reorgs, by // the freezer as the cutoff threshold and by clique as the snapshot trust limit. - ImmutabilityThreshold = 90000 + FullImmutabilityThreshold = 90000 + + // LightImmutabilityThreshold is the number of blocks after which a header chain + // segment is considered immutable for light client(i.e. soft finality). It is used by + // the downloader as a hard limit against deep ancestors, by the blockchain against deep + // reorgs, by the light pruner as the pruning validity guarantee. + LightImmutabilityThreshold = 30000 ) diff --git a/trie/database.go b/trie/database.go index 00d4baddf..4f310a776 100644 --- a/trie/database.go +++ b/trie/database.go @@ -693,7 +693,7 @@ func (db *Database) Cap(limit common.StorageSize) error { // // Note, this method is a non-synchronized mutator. It is unsafe to call this // concurrently with other mutators. -func (db *Database) Commit(node common.Hash, report bool) error { +func (db *Database) Commit(node common.Hash, report bool, callback func(common.Hash)) error { // Create a database batch to flush persistent data out. It is important that // outside code doesn't see an inconsistent state (referenced data removed from // memory cache during commit but not yet in persistent storage). This is ensured @@ -732,7 +732,7 @@ func (db *Database) Commit(node common.Hash, report bool) error { nodes, storage := len(db.dirties), db.dirtiesSize uncacher := &cleaner{db} - if err := db.commit(node, batch, uncacher); err != nil { + if err := db.commit(node, batch, uncacher, callback); err != nil { log.Error("Failed to commit trie from trie database", "err", err) return err } @@ -771,7 +771,7 @@ func (db *Database) Commit(node common.Hash, report bool) error { } // commit is the private locked version of Commit. -func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleaner) error { +func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleaner, callback func(common.Hash)) error { // If the node does not exist, it's a previously committed node node, ok := db.dirties[hash] if !ok { @@ -780,7 +780,7 @@ func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleane var err error node.forChilds(func(child common.Hash) { if err == nil { - err = db.commit(child, batch, uncacher) + err = db.commit(child, batch, uncacher, callback) } }) if err != nil { @@ -789,6 +789,9 @@ func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleane if err := batch.Put(hash[:], node.rlp()); err != nil { return err } + if callback != nil { + callback(hash) + } // If we've reached an optimal batch size, commit and start over if batch.ValueSize() >= ethdb.IdealBatchSize { if err := batch.Write(); err != nil { diff --git a/trie/iterator_test.go b/trie/iterator_test.go index 54dd3069f..75a0a99e5 100644 --- a/trie/iterator_test.go +++ b/trie/iterator_test.go @@ -301,7 +301,7 @@ func testIteratorContinueAfterError(t *testing.T, memonly bool) { } tr.Commit(nil) if !memonly { - triedb.Commit(tr.Hash(), true) + triedb.Commit(tr.Hash(), true, nil) } wantNodeCount := checkIteratorNoDups(t, tr.NodeIterator(nil), nil) @@ -392,7 +392,7 @@ func testIteratorContinueAfterSeekError(t *testing.T, memonly bool) { } root, _ := ctr.Commit(nil) if !memonly { - triedb.Commit(root, true) + triedb.Commit(root, true, nil) } barNodeHash := common.HexToHash("05041990364eb72fcb1127652ce40d8bab765f2bfe53225b1170d276cc101c2e") var ( diff --git a/trie/trie_test.go b/trie/trie_test.go index 172572ddd..588562146 100644 --- a/trie/trie_test.go +++ b/trie/trie_test.go @@ -88,7 +88,7 @@ func testMissingNode(t *testing.T, memonly bool) { updateString(trie, "123456", "asdfasdfasdfasdfasdfasdfasdfasdf") root, _ := trie.Commit(nil) if !memonly { - triedb.Commit(root, true) + triedb.Commit(root, true, nil) } trie, _ = New(root, triedb)