les: historical data garbage collection (#19570)

This change introduces garbage collection for the light client. Historical
chain data is deleted periodically. If you want to disable the GC, use
the --light.nopruning flag.
release/1.9
gary rong 4 years ago committed by GitHub
parent b8dd0890b3
commit 6eef141aef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      cmd/geth/main.go
  2. 1
      cmd/geth/usage.go
  3. 18
      cmd/utils/flags.go
  4. 2
      consensus/clique/clique.go
  5. 8
      core/blockchain.go
  6. 9
      core/chain_indexer.go
  7. 4
      core/chain_indexer_test.go
  8. 2
      core/chain_makers.go
  9. 8
      core/dao_test.go
  10. 2
      core/genesis.go
  11. 33
      core/rawdb/accessors_chain.go
  12. 33
      core/rawdb/accessors_chain_test.go
  13. 22
      core/rawdb/accessors_indexes.go
  14. 45
      core/rawdb/accessors_indexes_test.go
  15. 8
      core/rawdb/freezer.go
  16. 6
      core/state/statedb_test.go
  17. 4
      eth/api_backend.go
  18. 5
      eth/bloombits.go
  19. 9
      eth/config.go
  20. 20
      eth/downloader/downloader.go
  21. 3
      eth/downloader/downloader_test.go
  22. 2
      eth/downloader/testchain_test.go
  23. 6
      eth/gen_config.go
  24. 2
      graphql/graphql.go
  25. 25
      internal/ethapi/api.go
  26. 2
      internal/ethapi/backend.go
  27. 7
      les/api_backend.go
  28. 9
      les/client.go
  29. 10
      les/odr_requests.go
  30. 2
      les/odr_test.go
  31. 98
      les/pruner.go
  32. 197
      les/pruner_test.go
  33. 2
      les/request_test.go
  34. 4
      les/server.go
  35. 4
      les/sync_test.go
  36. 16
      les/test_helper.go
  37. 2
      les/ulc_test.go
  38. 20
      light/lightchain.go
  39. 9
      light/odr.go
  40. 212
      light/odr_util.go
  41. 159
      light/postprocess.go
  42. 10
      params/network_params.go
  43. 11
      trie/database.go
  44. 4
      trie/iterator_test.go
  45. 2
      trie/trie_test.go

@ -98,6 +98,7 @@ var (
utils.LightEgressFlag, utils.LightEgressFlag,
utils.LightMaxPeersFlag, utils.LightMaxPeersFlag,
utils.LegacyLightPeersFlag, utils.LegacyLightPeersFlag,
utils.LightNoPruneFlag,
utils.LightKDFFlag, utils.LightKDFFlag,
utils.UltraLightServersFlag, utils.UltraLightServersFlag,
utils.UltraLightFractionFlag, utils.UltraLightFractionFlag,

@ -96,6 +96,7 @@ var AppHelpFlagGroups = []flagGroup{
utils.UltraLightServersFlag, utils.UltraLightServersFlag,
utils.UltraLightFractionFlag, utils.UltraLightFractionFlag,
utils.UltraLightOnlyAnnounceFlag, utils.UltraLightOnlyAnnounceFlag,
utils.LightNoPruneFlag,
}, },
}, },
{ {

@ -282,6 +282,10 @@ var (
Name: "ulc.onlyannounce", Name: "ulc.onlyannounce",
Usage: "Ultra light server sends announcements only", Usage: "Ultra light server sends announcements only",
} }
LightNoPruneFlag = cli.BoolFlag{
Name: "light.nopruning",
Usage: "Disable ancient light chain data pruning",
}
// Ethash settings // Ethash settings
EthashCacheDirFlag = DirectoryFlag{ EthashCacheDirFlag = DirectoryFlag{
Name: "ethash.cachedir", Name: "ethash.cachedir",
@ -1070,6 +1074,9 @@ func setLes(ctx *cli.Context, cfg *eth.Config) {
if ctx.GlobalIsSet(UltraLightOnlyAnnounceFlag.Name) { if ctx.GlobalIsSet(UltraLightOnlyAnnounceFlag.Name) {
cfg.UltraLightOnlyAnnounce = ctx.GlobalBool(UltraLightOnlyAnnounceFlag.Name) cfg.UltraLightOnlyAnnounce = ctx.GlobalBool(UltraLightOnlyAnnounceFlag.Name)
} }
if ctx.GlobalIsSet(LightNoPruneFlag.Name) {
cfg.LightNoPrune = ctx.GlobalBool(LightNoPruneFlag.Name)
}
} }
// makeDatabaseHandles raises out the number of allowed file handles per process // makeDatabaseHandles raises out the number of allowed file handles per process
@ -1800,12 +1807,17 @@ func MakeChainDatabase(ctx *cli.Context, stack *node.Node) ethdb.Database {
var ( var (
cache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheDatabaseFlag.Name) / 100 cache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheDatabaseFlag.Name) / 100
handles = makeDatabaseHandles() handles = makeDatabaseHandles()
err error
chainDb ethdb.Database
) )
name := "chaindata"
if ctx.GlobalString(SyncModeFlag.Name) == "light" { if ctx.GlobalString(SyncModeFlag.Name) == "light" {
name = "lightchaindata" name := "lightchaindata"
chainDb, err = stack.OpenDatabase(name, cache, handles, "")
} else {
name := "chaindata"
chainDb, err = stack.OpenDatabaseWithFreezer(name, cache, handles, ctx.GlobalString(AncientFlag.Name), "")
} }
chainDb, err := stack.OpenDatabaseWithFreezer(name, cache, handles, ctx.GlobalString(AncientFlag.Name), "")
if err != nil { if err != nil {
Fatalf("Could not open database: %v", err) Fatalf("Could not open database: %v", err)
} }

@ -369,7 +369,7 @@ func (c *Clique) snapshot(chain consensus.ChainReader, number uint64, hash commo
// at a checkpoint block without a parent (light client CHT), or we have piled // at a checkpoint block without a parent (light client CHT), or we have piled
// up more headers than allowed to be reorged (chain reinit from a freezer), // up more headers than allowed to be reorged (chain reinit from a freezer),
// consider the checkpoint trusted and snapshot it. // consider the checkpoint trusted and snapshot it.
if number == 0 || (number%c.config.Epoch == 0 && (len(headers) > params.ImmutabilityThreshold || chain.GetHeaderByNumber(number-1) == nil)) { if number == 0 || (number%c.config.Epoch == 0 && (len(headers) > params.FullImmutabilityThreshold || chain.GetHeaderByNumber(number-1) == nil)) {
checkpoint := chain.GetHeaderByNumber(number) checkpoint := chain.GetHeaderByNumber(number)
if checkpoint != nil { if checkpoint != nil {
hash := checkpoint.Hash() hash := checkpoint.Hash()

@ -901,14 +901,14 @@ func (bc *BlockChain) Stop() {
recent := bc.GetBlockByNumber(number - offset) recent := bc.GetBlockByNumber(number - offset)
log.Info("Writing cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "root", recent.Root()) log.Info("Writing cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "root", recent.Root())
if err := triedb.Commit(recent.Root(), true); err != nil { if err := triedb.Commit(recent.Root(), true, nil); err != nil {
log.Error("Failed to commit recent state trie", "err", err) log.Error("Failed to commit recent state trie", "err", err)
} }
} }
} }
if snapBase != (common.Hash{}) { if snapBase != (common.Hash{}) {
log.Info("Writing snapshot state to disk", "root", snapBase) log.Info("Writing snapshot state to disk", "root", snapBase)
if err := triedb.Commit(snapBase, true); err != nil { if err := triedb.Commit(snapBase, true, nil); err != nil {
log.Error("Failed to commit recent state trie", "err", err) log.Error("Failed to commit recent state trie", "err", err)
} }
} }
@ -1442,7 +1442,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
// If we're running an archive node, always flush // If we're running an archive node, always flush
if bc.cacheConfig.TrieDirtyDisabled { if bc.cacheConfig.TrieDirtyDisabled {
if err := triedb.Commit(root, false); err != nil { if err := triedb.Commit(root, false, nil); err != nil {
return NonStatTy, err return NonStatTy, err
} }
} else { } else {
@ -1476,7 +1476,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/TriesInMemory) log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/TriesInMemory)
} }
// Flush an entire trie and restart the counters // Flush an entire trie and restart the counters
triedb.Commit(header.Root, true) triedb.Commit(header.Root, true, nil)
lastWrite = chosen lastWrite = chosen
bc.gcproc = 0 bc.gcproc = 0
} }

@ -46,6 +46,9 @@ type ChainIndexerBackend interface {
// Commit finalizes the section metadata and stores it into the database. // Commit finalizes the section metadata and stores it into the database.
Commit() error Commit() error
// Prune deletes the chain index older than the given threshold.
Prune(threshold uint64) error
} }
// ChainIndexerChain interface is used for connecting the indexer to a blockchain // ChainIndexerChain interface is used for connecting the indexer to a blockchain
@ -386,7 +389,6 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com
c.log.Trace("Processing new chain section", "section", section) c.log.Trace("Processing new chain section", "section", section)
// Reset and partial processing // Reset and partial processing
if err := c.backend.Reset(c.ctx, section, lastHead); err != nil { if err := c.backend.Reset(c.ctx, section, lastHead); err != nil {
c.setValidSections(0) c.setValidSections(0)
return common.Hash{}, err return common.Hash{}, err
@ -459,6 +461,11 @@ func (c *ChainIndexer) AddChildIndexer(indexer *ChainIndexer) {
} }
} }
// Prune deletes all chain data older than given threshold.
func (c *ChainIndexer) Prune(threshold uint64) error {
return c.backend.Prune(threshold)
}
// loadValidSections reads the number of valid sections from the index database // loadValidSections reads the number of valid sections from the index database
// and caches is into the local state. // and caches is into the local state.
func (c *ChainIndexer) loadValidSections() { func (c *ChainIndexer) loadValidSections() {

@ -236,3 +236,7 @@ func (b *testChainIndexBackend) Commit() error {
} }
return nil return nil
} }
func (b *testChainIndexBackend) Prune(threshold uint64) error {
return nil
}

@ -220,7 +220,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse
if err != nil { if err != nil {
panic(fmt.Sprintf("state write error: %v", err)) panic(fmt.Sprintf("state write error: %v", err))
} }
if err := statedb.Database().TrieDB().Commit(root, false); err != nil { if err := statedb.Database().TrieDB().Commit(root, false, nil); err != nil {
panic(fmt.Sprintf("trie write error: %v", err)) panic(fmt.Sprintf("trie write error: %v", err))
} }
return block, b.receipts return block, b.receipts

@ -79,7 +79,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
if _, err := bc.InsertChain(blocks); err != nil { if _, err := bc.InsertChain(blocks); err != nil {
t.Fatalf("failed to import contra-fork chain for expansion: %v", err) t.Fatalf("failed to import contra-fork chain for expansion: %v", err)
} }
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true); err != nil { if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true, nil); err != nil {
t.Fatalf("failed to commit contra-fork head for expansion: %v", err) t.Fatalf("failed to commit contra-fork head for expansion: %v", err)
} }
blocks, _ = GenerateChain(&proConf, conBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {}) blocks, _ = GenerateChain(&proConf, conBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {})
@ -104,7 +104,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
if _, err := bc.InsertChain(blocks); err != nil { if _, err := bc.InsertChain(blocks); err != nil {
t.Fatalf("failed to import pro-fork chain for expansion: %v", err) t.Fatalf("failed to import pro-fork chain for expansion: %v", err)
} }
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true); err != nil { if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true, nil); err != nil {
t.Fatalf("failed to commit pro-fork head for expansion: %v", err) t.Fatalf("failed to commit pro-fork head for expansion: %v", err)
} }
blocks, _ = GenerateChain(&conConf, proBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {}) blocks, _ = GenerateChain(&conConf, proBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {})
@ -130,7 +130,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
if _, err := bc.InsertChain(blocks); err != nil { if _, err := bc.InsertChain(blocks); err != nil {
t.Fatalf("failed to import contra-fork chain for expansion: %v", err) t.Fatalf("failed to import contra-fork chain for expansion: %v", err)
} }
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true); err != nil { if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true, nil); err != nil {
t.Fatalf("failed to commit contra-fork head for expansion: %v", err) t.Fatalf("failed to commit contra-fork head for expansion: %v", err)
} }
blocks, _ = GenerateChain(&proConf, conBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {}) blocks, _ = GenerateChain(&proConf, conBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {})
@ -150,7 +150,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
if _, err := bc.InsertChain(blocks); err != nil { if _, err := bc.InsertChain(blocks); err != nil {
t.Fatalf("failed to import pro-fork chain for expansion: %v", err) t.Fatalf("failed to import pro-fork chain for expansion: %v", err)
} }
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true); err != nil { if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true, nil); err != nil {
t.Fatalf("failed to commit pro-fork head for expansion: %v", err) t.Fatalf("failed to commit pro-fork head for expansion: %v", err)
} }
blocks, _ = GenerateChain(&conConf, proBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {}) blocks, _ = GenerateChain(&conConf, proBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {})

@ -285,7 +285,7 @@ func (g *Genesis) ToBlock(db ethdb.Database) *types.Block {
head.Difficulty = params.GenesisDifficulty head.Difficulty = params.GenesisDifficulty
} }
statedb.Commit(false) statedb.Commit(false)
statedb.Database().TrieDB().Commit(root, true) statedb.Database().TrieDB().Commit(root, true, nil)
return types.NewBlock(head, nil, nil, nil) return types.NewBlock(head, nil, nil, nil)
} }

@ -80,6 +80,39 @@ func ReadAllHashes(db ethdb.Iteratee, number uint64) []common.Hash {
return hashes return hashes
} }
// ReadAllCanonicalHashes retrieves all canonical number and hash mappings at the
// certain chain range. If the accumulated entries reaches the given threshold,
// abort the iteration and return the semi-finish result.
func ReadAllCanonicalHashes(db ethdb.Iteratee, from uint64, to uint64, limit int) ([]uint64, []common.Hash) {
// Short circuit if the limit is 0.
if limit == 0 {
return nil, nil
}
var (
numbers []uint64
hashes []common.Hash
)
// Construct the key prefix of start point.
start, end := headerHashKey(from), headerHashKey(to)
it := db.NewIterator(nil, start)
defer it.Release()
for it.Next() {
if bytes.Compare(it.Key(), end) >= 0 {
break
}
if key := it.Key(); len(key) == len(headerPrefix)+8+1 && bytes.Equal(key[len(key)-1:], headerHashSuffix) {
numbers = append(numbers, binary.BigEndian.Uint64(key[len(headerPrefix):len(headerPrefix)+8]))
hashes = append(hashes, common.BytesToHash(it.Value()))
// If the accumulated entries reaches the limit threshold, return.
if len(numbers) >= limit {
break
}
}
}
return numbers, hashes
}
// ReadHeaderNumber returns the header number assigned to a hash. // ReadHeaderNumber returns the header number assigned to a hash.
func ReadHeaderNumber(db ethdb.KeyValueReader, hash common.Hash) *uint64 { func ReadHeaderNumber(db ethdb.KeyValueReader, hash common.Hash) *uint64 {
data, _ := db.Get(headerNumberKey(hash)) data, _ := db.Get(headerNumberKey(hash))

@ -23,6 +23,7 @@ import (
"io/ioutil" "io/ioutil"
"math/big" "math/big"
"os" "os"
"reflect"
"testing" "testing"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -424,3 +425,35 @@ func TestAncientStorage(t *testing.T) {
t.Fatalf("invalid td returned") t.Fatalf("invalid td returned")
} }
} }
func TestCanonicalHashIteration(t *testing.T) {
var cases = []struct {
from, to uint64
limit int
expect []uint64
}{
{1, 8, 0, nil},
{1, 8, 1, []uint64{1}},
{1, 8, 10, []uint64{1, 2, 3, 4, 5, 6, 7}},
{1, 9, 10, []uint64{1, 2, 3, 4, 5, 6, 7, 8}},
{2, 9, 10, []uint64{2, 3, 4, 5, 6, 7, 8}},
{9, 10, 10, nil},
}
// Test empty db iteration
db := NewMemoryDatabase()
numbers, _ := ReadAllCanonicalHashes(db, 0, 10, 10)
if len(numbers) != 0 {
t.Fatalf("No entry should be returned to iterate an empty db")
}
// Fill database with testing data.
for i := uint64(1); i <= 8; i++ {
WriteCanonicalHash(db, common.Hash{}, i)
WriteTd(db, common.Hash{}, i, big.NewInt(10)) // Write some interferential data
}
for i, c := range cases {
numbers, _ := ReadAllCanonicalHashes(db, c.from, c.to, c.limit)
if !reflect.DeepEqual(numbers, c.expect) {
t.Fatalf("Case %d failed, want %v, got %v", i, c.expect, numbers)
}
}
}

@ -17,6 +17,7 @@
package rawdb package rawdb
import ( import (
"bytes"
"math/big" "math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -151,3 +152,24 @@ func WriteBloomBits(db ethdb.KeyValueWriter, bit uint, section uint64, head comm
log.Crit("Failed to store bloom bits", "err", err) log.Crit("Failed to store bloom bits", "err", err)
} }
} }
// DeleteBloombits removes all compressed bloom bits vector belonging to the
// given section range and bit index.
func DeleteBloombits(db ethdb.Database, bit uint, from uint64, to uint64) {
start, end := bloomBitsKey(bit, from, common.Hash{}), bloomBitsKey(bit, to, common.Hash{})
it := db.NewIterator(nil, start)
defer it.Release()
for it.Next() {
if bytes.Compare(it.Key(), end) >= 0 {
break
}
if len(it.Key()) != len(bloomBitsPrefix)+2+8+32 {
continue
}
db.Delete(it.Key())
}
if it.Error() != nil {
log.Crit("Failed to delete bloom bits", "err", it.Error())
}
}

@ -17,12 +17,14 @@
package rawdb package rawdb
import ( import (
"bytes"
"math/big" "math/big"
"testing" "testing"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
) )
@ -106,3 +108,46 @@ func TestLookupStorage(t *testing.T) {
}) })
} }
} }
func TestDeleteBloomBits(t *testing.T) {
// Prepare testing data
db := NewMemoryDatabase()
for i := uint(0); i < 2; i++ {
for s := uint64(0); s < 2; s++ {
WriteBloomBits(db, i, s, params.MainnetGenesisHash, []byte{0x01, 0x02})
WriteBloomBits(db, i, s, params.RinkebyGenesisHash, []byte{0x01, 0x02})
}
}
check := func(bit uint, section uint64, head common.Hash, exist bool) {
bits, _ := ReadBloomBits(db, bit, section, head)
if exist && !bytes.Equal(bits, []byte{0x01, 0x02}) {
t.Fatalf("Bloombits mismatch")
}
if !exist && len(bits) > 0 {
t.Fatalf("Bloombits should be removed")
}
}
// Check the existence of written data.
check(0, 0, params.MainnetGenesisHash, true)
check(0, 0, params.RinkebyGenesisHash, true)
// Check the existence of deleted data.
DeleteBloombits(db, 0, 0, 1)
check(0, 0, params.MainnetGenesisHash, false)
check(0, 0, params.RinkebyGenesisHash, false)
check(0, 1, params.MainnetGenesisHash, true)
check(0, 1, params.RinkebyGenesisHash, true)
// Check the existence of deleted data.
DeleteBloombits(db, 0, 0, 2)
check(0, 0, params.MainnetGenesisHash, false)
check(0, 0, params.RinkebyGenesisHash, false)
check(0, 1, params.MainnetGenesisHash, false)
check(0, 1, params.RinkebyGenesisHash, false)
// Bit1 shouldn't be affect.
check(1, 0, params.MainnetGenesisHash, true)
check(1, 0, params.RinkebyGenesisHash, true)
check(1, 1, params.MainnetGenesisHash, true)
check(1, 1, params.RinkebyGenesisHash, true)
}

@ -287,12 +287,12 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
backoff = true backoff = true
continue continue
case *number < params.ImmutabilityThreshold: case *number < params.FullImmutabilityThreshold:
log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", params.ImmutabilityThreshold) log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", params.FullImmutabilityThreshold)
backoff = true backoff = true
continue continue
case *number-params.ImmutabilityThreshold <= f.frozen: case *number-params.FullImmutabilityThreshold <= f.frozen:
log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", f.frozen) log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", f.frozen)
backoff = true backoff = true
continue continue
@ -304,7 +304,7 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
continue continue
} }
// Seems we have data ready to be frozen, process in usable batches // Seems we have data ready to be frozen, process in usable batches
limit := *number - params.ImmutabilityThreshold limit := *number - params.FullImmutabilityThreshold
if limit-f.frozen > freezerBatchLimit { if limit-f.frozen > freezerBatchLimit {
limit = f.frozen + freezerBatchLimit limit = f.frozen + freezerBatchLimit
} }

@ -55,7 +55,7 @@ func TestUpdateLeaks(t *testing.T) {
} }
root := state.IntermediateRoot(false) root := state.IntermediateRoot(false)
if err := state.Database().TrieDB().Commit(root, false); err != nil { if err := state.Database().TrieDB().Commit(root, false, nil); err != nil {
t.Errorf("can not commit trie %v to persistent database", root.Hex()) t.Errorf("can not commit trie %v to persistent database", root.Hex())
} }
@ -106,7 +106,7 @@ func TestIntermediateLeaks(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("failed to commit transition state: %v", err) t.Fatalf("failed to commit transition state: %v", err)
} }
if err = transState.Database().TrieDB().Commit(transRoot, false); err != nil { if err = transState.Database().TrieDB().Commit(transRoot, false, nil); err != nil {
t.Errorf("can not commit trie %v to persistent database", transRoot.Hex()) t.Errorf("can not commit trie %v to persistent database", transRoot.Hex())
} }
@ -114,7 +114,7 @@ func TestIntermediateLeaks(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("failed to commit final state: %v", err) t.Fatalf("failed to commit final state: %v", err)
} }
if err = finalState.Database().TrieDB().Commit(finalRoot, false); err != nil { if err = finalState.Database().TrieDB().Commit(finalRoot, false, nil); err != nil {
t.Errorf("can not commit trie %v to persistent database", finalRoot.Hex()) t.Errorf("can not commit trie %v to persistent database", finalRoot.Hex())
} }

@ -185,8 +185,8 @@ func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*typ
return logs, nil return logs, nil
} }
func (b *EthAPIBackend) GetTd(blockHash common.Hash) *big.Int { func (b *EthAPIBackend) GetTd(ctx context.Context, hash common.Hash) *big.Int {
return b.eth.blockchain.GetTdByHash(blockHash) return b.eth.blockchain.GetTdByHash(hash)
} }
func (b *EthAPIBackend) GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header) (*vm.EVM, func() error, error) { func (b *EthAPIBackend) GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header) (*vm.EVM, func() error, error) {

@ -136,3 +136,8 @@ func (b *BloomIndexer) Commit() error {
} }
return batch.Write() return batch.Write()
} }
// PruneSections returns an empty error since we don't support pruning here.
func (b *BloomIndexer) Prune(threshold uint64) error {
return nil
}

@ -122,10 +122,11 @@ type Config struct {
Whitelist map[uint64]common.Hash `toml:"-"` Whitelist map[uint64]common.Hash `toml:"-"`
// Light client options // Light client options
LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests
LightIngress int `toml:",omitempty"` // Incoming bandwidth limit for light servers LightIngress int `toml:",omitempty"` // Incoming bandwidth limit for light servers
LightEgress int `toml:",omitempty"` // Outgoing bandwidth limit for light servers LightEgress int `toml:",omitempty"` // Outgoing bandwidth limit for light servers
LightPeers int `toml:",omitempty"` // Maximum number of LES client peers LightPeers int `toml:",omitempty"` // Maximum number of LES client peers
LightNoPrune bool `toml:",omitempty"` // Whether to disable light chain pruning
// Ultra Light client options // Ultra Light client options
UltraLightServers []string `toml:",omitempty"` // List of trusted ultra light servers UltraLightServers []string `toml:",omitempty"` // List of trusted ultra light servers

@ -42,7 +42,6 @@ var (
MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly
MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
MaxStateFetch = 384 // Amount of node state values to allow fetching per request MaxStateFetch = 384 // Amount of node state values to allow fetching per request
@ -56,10 +55,11 @@ var (
qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
maxResultsProcess = 2048 // Number of content download results to import at once into the chain maxResultsProcess = 2048 // Number of content download results to import at once into the chain
maxForkAncestry uint64 = params.ImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it) fullMaxForkAncestry uint64 = params.FullImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it)
lightMaxForkAncestry uint64 = params.LightImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it)
reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection
reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs
@ -490,10 +490,10 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
// The peer would start to feed us valid blocks until head, resulting in all of // The peer would start to feed us valid blocks until head, resulting in all of
// the blocks might be written into the ancient store. A following mini-reorg // the blocks might be written into the ancient store. A following mini-reorg
// could cause issues. // could cause issues.
if d.checkpoint != 0 && d.checkpoint > maxForkAncestry+1 { if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 {
d.ancientLimit = d.checkpoint d.ancientLimit = d.checkpoint
} else if height > maxForkAncestry+1 { } else if height > fullMaxForkAncestry+1 {
d.ancientLimit = height - maxForkAncestry - 1 d.ancientLimit = height - fullMaxForkAncestry - 1
} }
frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here. frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.
// If a part of blockchain data has already been written into active store, // If a part of blockchain data has already been written into active store,
@ -727,6 +727,10 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight) p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight)
// Recap floor value for binary search // Recap floor value for binary search
maxForkAncestry := fullMaxForkAncestry
if d.getMode() == LightSync {
maxForkAncestry = lightMaxForkAncestry
}
if localHeight >= maxForkAncestry { if localHeight >= maxForkAncestry {
// We're above the max reorg threshold, find the earliest fork point // We're above the max reorg threshold, find the earliest fork point
floor = int64(localHeight - maxForkAncestry) floor = int64(localHeight - maxForkAncestry)

@ -37,7 +37,8 @@ import (
// Reduce some of the parameters to make the tester faster. // Reduce some of the parameters to make the tester faster.
func init() { func init() {
maxForkAncestry = 10000 fullMaxForkAncestry = 10000
lightMaxForkAncestry = 10000
blockCacheItems = 1024 blockCacheItems = 1024
fsHeaderContCheck = 500 * time.Millisecond fsHeaderContCheck = 500 * time.Millisecond
} }

@ -45,7 +45,7 @@ var testChainBase = newTestChain(blockCacheItems+200, testGenesis)
var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain
func init() { func init() {
var forkLen = int(maxForkAncestry + 50) var forkLen = int(fullMaxForkAncestry + 50)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(3) wg.Add(3)
go func() { testChainForkLightA = testChainBase.makeFork(forkLen, false, 1); wg.Done() }() go func() { testChainForkLightA = testChainBase.makeFork(forkLen, false, 1); wg.Done() }()

@ -29,6 +29,7 @@ func (c Config) MarshalTOML() (interface{}, error) {
LightIngress int `toml:",omitempty"` LightIngress int `toml:",omitempty"`
LightEgress int `toml:",omitempty"` LightEgress int `toml:",omitempty"`
LightPeers int `toml:",omitempty"` LightPeers int `toml:",omitempty"`
LightNoPrune bool `toml:",omitempty"`
UltraLightServers []string `toml:",omitempty"` UltraLightServers []string `toml:",omitempty"`
UltraLightFraction int `toml:",omitempty"` UltraLightFraction int `toml:",omitempty"`
UltraLightOnlyAnnounce bool `toml:",omitempty"` UltraLightOnlyAnnounce bool `toml:",omitempty"`
@ -66,6 +67,7 @@ func (c Config) MarshalTOML() (interface{}, error) {
enc.LightIngress = c.LightIngress enc.LightIngress = c.LightIngress
enc.LightEgress = c.LightEgress enc.LightEgress = c.LightEgress
enc.LightPeers = c.LightPeers enc.LightPeers = c.LightPeers
enc.LightNoPrune = c.LightNoPrune
enc.UltraLightServers = c.UltraLightServers enc.UltraLightServers = c.UltraLightServers
enc.UltraLightFraction = c.UltraLightFraction enc.UltraLightFraction = c.UltraLightFraction
enc.UltraLightOnlyAnnounce = c.UltraLightOnlyAnnounce enc.UltraLightOnlyAnnounce = c.UltraLightOnlyAnnounce
@ -107,6 +109,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
LightIngress *int `toml:",omitempty"` LightIngress *int `toml:",omitempty"`
LightEgress *int `toml:",omitempty"` LightEgress *int `toml:",omitempty"`
LightPeers *int `toml:",omitempty"` LightPeers *int `toml:",omitempty"`
LightNoPrune *bool `toml:",omitempty"`
UltraLightServers []string `toml:",omitempty"` UltraLightServers []string `toml:",omitempty"`
UltraLightFraction *int `toml:",omitempty"` UltraLightFraction *int `toml:",omitempty"`
UltraLightOnlyAnnounce *bool `toml:",omitempty"` UltraLightOnlyAnnounce *bool `toml:",omitempty"`
@ -171,6 +174,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
if dec.LightPeers != nil { if dec.LightPeers != nil {
c.LightPeers = *dec.LightPeers c.LightPeers = *dec.LightPeers
} }
if dec.LightNoPrune != nil {
c.LightNoPrune = *dec.LightNoPrune
}
if dec.UltraLightServers != nil { if dec.UltraLightServers != nil {
c.UltraLightServers = dec.UltraLightServers c.UltraLightServers = dec.UltraLightServers
} }

@ -584,7 +584,7 @@ func (b *Block) TotalDifficulty(ctx context.Context) (hexutil.Big, error) {
} }
h = header.Hash() h = header.Hash()
} }
return hexutil.Big(*b.backend.GetTd(h)), nil return hexutil.Big(*b.backend.GetTd(ctx, h)), nil
} }
// BlockNumberArgs encapsulates arguments to accessors that specify a block number. // BlockNumberArgs encapsulates arguments to accessors that specify a block number.

@ -36,7 +36,6 @@ import (
"github.com/ethereum/go-ethereum/consensus/clique" "github.com/ethereum/go-ethereum/consensus/clique"
"github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
@ -625,7 +624,7 @@ func (s *PublicBlockChainAPI) GetProof(ctx context.Context, address common.Addre
func (s *PublicBlockChainAPI) GetHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) { func (s *PublicBlockChainAPI) GetHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) {
header, err := s.b.HeaderByNumber(ctx, number) header, err := s.b.HeaderByNumber(ctx, number)
if header != nil && err == nil { if header != nil && err == nil {
response := s.rpcMarshalHeader(header) response := s.rpcMarshalHeader(ctx, header)
if number == rpc.PendingBlockNumber { if number == rpc.PendingBlockNumber {
// Pending header need to nil out a few fields // Pending header need to nil out a few fields
for _, field := range []string{"hash", "nonce", "miner"} { for _, field := range []string{"hash", "nonce", "miner"} {
@ -641,7 +640,7 @@ func (s *PublicBlockChainAPI) GetHeaderByNumber(ctx context.Context, number rpc.
func (s *PublicBlockChainAPI) GetHeaderByHash(ctx context.Context, hash common.Hash) map[string]interface{} { func (s *PublicBlockChainAPI) GetHeaderByHash(ctx context.Context, hash common.Hash) map[string]interface{} {
header, _ := s.b.HeaderByHash(ctx, hash) header, _ := s.b.HeaderByHash(ctx, hash)
if header != nil { if header != nil {
return s.rpcMarshalHeader(header) return s.rpcMarshalHeader(ctx, header)
} }
return nil return nil
} }
@ -654,7 +653,7 @@ func (s *PublicBlockChainAPI) GetHeaderByHash(ctx context.Context, hash common.H
func (s *PublicBlockChainAPI) GetBlockByNumber(ctx context.Context, number rpc.BlockNumber, fullTx bool) (map[string]interface{}, error) { func (s *PublicBlockChainAPI) GetBlockByNumber(ctx context.Context, number rpc.BlockNumber, fullTx bool) (map[string]interface{}, error) {
block, err := s.b.BlockByNumber(ctx, number) block, err := s.b.BlockByNumber(ctx, number)
if block != nil && err == nil { if block != nil && err == nil {
response, err := s.rpcMarshalBlock(block, true, fullTx) response, err := s.rpcMarshalBlock(ctx, block, true, fullTx)
if err == nil && number == rpc.PendingBlockNumber { if err == nil && number == rpc.PendingBlockNumber {
// Pending blocks need to nil out a few fields // Pending blocks need to nil out a few fields
for _, field := range []string{"hash", "nonce", "miner"} { for _, field := range []string{"hash", "nonce", "miner"} {
@ -671,7 +670,7 @@ func (s *PublicBlockChainAPI) GetBlockByNumber(ctx context.Context, number rpc.B
func (s *PublicBlockChainAPI) GetBlockByHash(ctx context.Context, hash common.Hash, fullTx bool) (map[string]interface{}, error) { func (s *PublicBlockChainAPI) GetBlockByHash(ctx context.Context, hash common.Hash, fullTx bool) (map[string]interface{}, error) {
block, err := s.b.BlockByHash(ctx, hash) block, err := s.b.BlockByHash(ctx, hash)
if block != nil { if block != nil {
return s.rpcMarshalBlock(block, true, fullTx) return s.rpcMarshalBlock(ctx, block, true, fullTx)
} }
return nil, err return nil, err
} }
@ -687,7 +686,7 @@ func (s *PublicBlockChainAPI) GetUncleByBlockNumberAndIndex(ctx context.Context,
return nil, nil return nil, nil
} }
block = types.NewBlockWithHeader(uncles[index]) block = types.NewBlockWithHeader(uncles[index])
return s.rpcMarshalBlock(block, false, false) return s.rpcMarshalBlock(ctx, block, false, false)
} }
return nil, err return nil, err
} }
@ -703,7 +702,7 @@ func (s *PublicBlockChainAPI) GetUncleByBlockHashAndIndex(ctx context.Context, b
return nil, nil return nil, nil
} }
block = types.NewBlockWithHeader(uncles[index]) block = types.NewBlockWithHeader(uncles[index])
return s.rpcMarshalBlock(block, false, false) return s.rpcMarshalBlock(ctx, block, false, false)
} }
return nil, err return nil, err
} }
@ -1173,21 +1172,21 @@ func RPCMarshalBlock(block *types.Block, inclTx bool, fullTx bool) (map[string]i
// rpcMarshalHeader uses the generalized output filler, then adds the total difficulty field, which requires // rpcMarshalHeader uses the generalized output filler, then adds the total difficulty field, which requires
// a `PublicBlockchainAPI`. // a `PublicBlockchainAPI`.
func (s *PublicBlockChainAPI) rpcMarshalHeader(header *types.Header) map[string]interface{} { func (s *PublicBlockChainAPI) rpcMarshalHeader(ctx context.Context, header *types.Header) map[string]interface{} {
fields := RPCMarshalHeader(header) fields := RPCMarshalHeader(header)
fields["totalDifficulty"] = (*hexutil.Big)(s.b.GetTd(header.Hash())) fields["totalDifficulty"] = (*hexutil.Big)(s.b.GetTd(ctx, header.Hash()))
return fields return fields
} }
// rpcMarshalBlock uses the generalized output filler, then adds the total difficulty field, which requires // rpcMarshalBlock uses the generalized output filler, then adds the total difficulty field, which requires
// a `PublicBlockchainAPI`. // a `PublicBlockchainAPI`.
func (s *PublicBlockChainAPI) rpcMarshalBlock(b *types.Block, inclTx bool, fullTx bool) (map[string]interface{}, error) { func (s *PublicBlockChainAPI) rpcMarshalBlock(ctx context.Context, b *types.Block, inclTx bool, fullTx bool) (map[string]interface{}, error) {
fields, err := RPCMarshalBlock(b, inclTx, fullTx) fields, err := RPCMarshalBlock(b, inclTx, fullTx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if inclTx { if inclTx {
fields["totalDifficulty"] = (*hexutil.Big)(s.b.GetTd(b.Hash())) fields["totalDifficulty"] = (*hexutil.Big)(s.b.GetTd(ctx, b.Hash()))
} }
return fields, err return fields, err
} }
@ -1393,8 +1392,8 @@ func (s *PublicTransactionPoolAPI) GetRawTransactionByHash(ctx context.Context,
// GetTransactionReceipt returns the transaction receipt for the given transaction hash. // GetTransactionReceipt returns the transaction receipt for the given transaction hash.
func (s *PublicTransactionPoolAPI) GetTransactionReceipt(ctx context.Context, hash common.Hash) (map[string]interface{}, error) { func (s *PublicTransactionPoolAPI) GetTransactionReceipt(ctx context.Context, hash common.Hash) (map[string]interface{}, error) {
tx, blockHash, blockNumber, index := rawdb.ReadTransaction(s.b.ChainDb(), hash) tx, blockHash, blockNumber, index, err := s.b.GetTransaction(ctx, hash)
if tx == nil { if err != nil {
return nil, nil return nil, nil
} }
receipts, err := s.b.GetReceipts(ctx, blockHash) receipts, err := s.b.GetReceipts(ctx, blockHash)

@ -59,7 +59,7 @@ type Backend interface {
StateAndHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*state.StateDB, *types.Header, error) StateAndHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*state.StateDB, *types.Header, error)
StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error) StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error)
GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error)
GetTd(hash common.Hash) *big.Int GetTd(ctx context.Context, hash common.Hash) *big.Int
GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header) (*vm.EVM, func() error, error) GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header) (*vm.EVM, func() error, error)
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription

@ -162,8 +162,11 @@ func (b *LesApiBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*typ
return nil, nil return nil, nil
} }
func (b *LesApiBackend) GetTd(hash common.Hash) *big.Int { func (b *LesApiBackend) GetTd(ctx context.Context, hash common.Hash) *big.Int {
return b.eth.blockchain.GetTdByHash(hash) if number := rawdb.ReadHeaderNumber(b.eth.chainDb, hash); number != nil {
return b.eth.blockchain.GetTdOdr(ctx, hash, *number)
}
return nil
} }
func (b *LesApiBackend) GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header) (*vm.EVM, func() error, error) { func (b *LesApiBackend) GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header) (*vm.EVM, func() error, error) {

@ -62,6 +62,7 @@ type LightEthereum struct {
serverPool *serverPool serverPool *serverPool
valueTracker *lpc.ValueTracker valueTracker *lpc.ValueTracker
dialCandidates enode.Iterator dialCandidates enode.Iterator
pruner *pruner
bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
@ -121,8 +122,8 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
leth.relay = newLesTxRelay(peers, leth.retriever) leth.relay = newLesTxRelay(peers, leth.retriever)
leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.retriever) leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.retriever)
leth.chtIndexer = light.NewChtIndexer(chainDb, leth.odr, params.CHTFrequency, params.HelperTrieConfirmations) leth.chtIndexer = light.NewChtIndexer(chainDb, leth.odr, params.CHTFrequency, params.HelperTrieConfirmations, config.LightNoPrune)
leth.bloomTrieIndexer = light.NewBloomTrieIndexer(chainDb, leth.odr, params.BloomBitsBlocksClient, params.BloomTrieFrequency) leth.bloomTrieIndexer = light.NewBloomTrieIndexer(chainDb, leth.odr, params.BloomBitsBlocksClient, params.BloomTrieFrequency, config.LightNoPrune)
leth.odr.SetIndexers(leth.chtIndexer, leth.bloomTrieIndexer, leth.bloomIndexer) leth.odr.SetIndexers(leth.chtIndexer, leth.bloomTrieIndexer, leth.bloomIndexer)
checkpoint := config.Checkpoint checkpoint := config.Checkpoint
@ -149,6 +150,9 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
leth.chtIndexer.Start(leth.blockchain) leth.chtIndexer.Start(leth.blockchain)
leth.bloomIndexer.Start(leth.blockchain) leth.bloomIndexer.Start(leth.blockchain)
// Start a light chain pruner to delete useless historical data.
leth.pruner = newPruner(chainDb, leth.chtIndexer, leth.bloomTrieIndexer)
// Rewind the chain in case of an incompatible config upgrade. // Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok { if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat) log.Warn("Rewinding chain to upgrade configuration", "err", compat)
@ -302,6 +306,7 @@ func (s *LightEthereum) Stop() error {
s.handler.stop() s.handler.stop()
s.txPool.Stop() s.txPool.Stop()
s.engine.Close() s.engine.Close()
s.pruner.close()
s.eventMux.Stop() s.eventMux.Stop()
s.chainDb.Close() s.chainDb.Close()
s.wg.Wait() s.wg.Wait()

@ -110,14 +110,16 @@ func (r *BlockRequest) Validate(db ethdb.Database, msg *Msg) error {
body := bodies[0] body := bodies[0]
// Retrieve our stored header and validate block content against it // Retrieve our stored header and validate block content against it
header := rawdb.ReadHeader(db, r.Hash, r.Number) if r.Header == nil {
if header == nil { r.Header = rawdb.ReadHeader(db, r.Hash, r.Number)
}
if r.Header == nil {
return errHeaderUnavailable return errHeaderUnavailable
} }
if header.TxHash != types.DeriveSha(types.Transactions(body.Transactions)) { if r.Header.TxHash != types.DeriveSha(types.Transactions(body.Transactions)) {
return errTxHashMismatch return errTxHashMismatch
} }
if header.UncleHash != types.CalcUncleHash(body.Uncles) { if r.Header.UncleHash != types.CalcUncleHash(body.Uncles) {
return errUncleHashMismatch return errUncleHashMismatch
} }
// Validations passed, encode and store RLP // Validations passed, encode and store RLP

@ -183,7 +183,7 @@ func odrTxStatus(ctx context.Context, db ethdb.Database, config *params.ChainCon
// testOdr tests odr requests whose validation guaranteed by block headers. // testOdr tests odr requests whose validation guaranteed by block headers.
func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn odrTestFn) { func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn odrTestFn) {
// Assemble the test environment // Assemble the test environment
server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true) server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true, true)
defer tearDown() defer tearDown()
// Ensure the client has synced all necessary data. // Ensure the client has synced all necessary data.

@ -0,0 +1,98 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package les
import (
"sync"
"time"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
)
// pruner is responsible for pruning historical light chain data.
type pruner struct {
db ethdb.Database
indexers []*core.ChainIndexer
closeCh chan struct{}
wg sync.WaitGroup
}
// newPruner returns a light chain pruner instance.
func newPruner(db ethdb.Database, indexers ...*core.ChainIndexer) *pruner {
pruner := &pruner{
db: db,
indexers: indexers,
closeCh: make(chan struct{}),
}
pruner.wg.Add(1)
go pruner.loop()
return pruner
}
// close notifies all background goroutines belonging to pruner to exit.
func (p *pruner) close() {
close(p.closeCh)
p.wg.Wait()
}
// loop periodically queries the status of chain indexers and prunes useless
// historical chain data. Notably, whenever Geth restarts, it will iterate
// all historical sections even they don't exist at all(below checkpoint) so
// that light client can prune cached chain data that was ODRed after pruning
// that section.
func (p *pruner) loop() {
defer p.wg.Done()
// cleanTicker is the ticker used to trigger a history clean 2 times a day.
var cleanTicker = time.NewTicker(12 * time.Hour)
// pruning finds the sections that have been processed by all indexers
// and deletes all historical chain data.
// Note, if some indexers don't support pruning(e.g. eth.BloomIndexer),
// pruning operations can be silently ignored.
pruning := func() {
min := uint64(math.MaxUint64)
for _, indexer := range p.indexers {
sections, _, _ := indexer.Sections()
if sections < min {
min = sections
}
}
// Always keep the latest section data in database.
if min < 2 || len(p.indexers) == 0 {
return
}
for _, indexer := range p.indexers {
if err := indexer.Prune(min - 2); err != nil {
log.Debug("Failed to prune historical data", "err", err)
return
}
}
p.db.Compact(nil, nil) // Compact entire database, ensure all removed data are deleted.
}
for {
pruning()
select {
case <-cleanTicker.C:
case <-p.closeCh:
return
}
}
}

@ -0,0 +1,197 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package les
import (
"bytes"
"context"
"encoding/binary"
"testing"
"time"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/light"
)
func TestLightPruner(t *testing.T) {
config := light.TestClientIndexerConfig
waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) {
for {
cs, _, _ := cIndexer.Sections()
bts, _, _ := btIndexer.Sections()
if cs >= 3 && bts >= 3 {
break
}
time.Sleep(10 * time.Millisecond)
}
}
server, client, tearDown := newClientServerEnv(t, int(3*config.ChtSize+config.ChtConfirms), 2, waitIndexers, nil, 0, false, true, false)
defer tearDown()
// checkDB iterates the chain with given prefix, resolves the block number
// with given callback and ensures this entry should exist or not.
checkDB := func(from, to uint64, prefix []byte, resolve func(key, value []byte) *uint64, exist bool) bool {
it := client.db.NewIterator(prefix, nil)
defer it.Release()
var next = from
for it.Next() {
number := resolve(it.Key(), it.Value())
if number == nil || *number < from {
continue
} else if *number > to {
return true
}
if exist {
if *number != next {
return false
}
next++
} else {
return false
}
}
return true
}
// checkPruned checks and ensures the stale chain data has been pruned.
checkPruned := func(from, to uint64) {
// Iterate canonical hash
if !checkDB(from, to, []byte("h"), func(key, value []byte) *uint64 {
if len(key) == 1+8+1 && bytes.Equal(key[9:10], []byte("n")) {
n := binary.BigEndian.Uint64(key[1:9])
return &n
}
return nil
}, false) {
t.Fatalf("canonical hash mappings are not properly pruned")
}
// Iterate header
if !checkDB(from, to, []byte("h"), func(key, value []byte) *uint64 {
if len(key) == 1+8+32 {
n := binary.BigEndian.Uint64(key[1:9])
return &n
}
return nil
}, false) {
t.Fatalf("headers are not properly pruned")
}
// Iterate body
if !checkDB(from, to, []byte("b"), func(key, value []byte) *uint64 {
if len(key) == 1+8+32 {
n := binary.BigEndian.Uint64(key[1:9])
return &n
}
return nil
}, false) {
t.Fatalf("block bodies are not properly pruned")
}
// Iterate receipts
if !checkDB(from, to, []byte("r"), func(key, value []byte) *uint64 {
if len(key) == 1+8+32 {
n := binary.BigEndian.Uint64(key[1:9])
return &n
}
return nil
}, false) {
t.Fatalf("receipts are not properly pruned")
}
// Iterate td
if !checkDB(from, to, []byte("h"), func(key, value []byte) *uint64 {
if len(key) == 1+8+32+1 && bytes.Equal(key[41:42], []byte("t")) {
n := binary.BigEndian.Uint64(key[1:9])
return &n
}
return nil
}, false) {
t.Fatalf("tds are not properly pruned")
}
}
// Start light pruner.
time.Sleep(1500 * time.Millisecond) // Ensure light client has finished the syncing and indexing
newPruner(client.db, client.chtIndexer, client.bloomTrieIndexer)
time.Sleep(1500 * time.Millisecond) // Ensure pruner have enough time to prune data.
checkPruned(1, config.ChtSize-1)
// Ensure all APIs still work after pruning.
var cases = []struct {
from, to uint64
methodName string
method func(uint64) bool
}{
{
1, 10, "GetHeaderByNumber",
func(n uint64) bool {
_, err := light.GetHeaderByNumber(context.Background(), client.handler.backend.odr, n)
return err == nil
},
},
{
11, 20, "GetCanonicalHash",
func(n uint64) bool {
_, err := light.GetCanonicalHash(context.Background(), client.handler.backend.odr, n)
return err == nil
},
},
{
21, 30, "GetTd",
func(n uint64) bool {
_, err := light.GetTd(context.Background(), client.handler.backend.odr, server.handler.blockchain.GetHeaderByNumber(n).Hash(), n)
return err == nil
},
},
{
31, 40, "GetBodyRLP",
func(n uint64) bool {
_, err := light.GetBodyRLP(context.Background(), client.handler.backend.odr, server.handler.blockchain.GetHeaderByNumber(n).Hash(), n)
return err == nil
},
},
{
41, 50, "GetBlock",
func(n uint64) bool {
_, err := light.GetBlock(context.Background(), client.handler.backend.odr, server.handler.blockchain.GetHeaderByNumber(n).Hash(), n)
return err == nil
},
},
{
51, 60, "GetBlockReceipts",
func(n uint64) bool {
_, err := light.GetBlockReceipts(context.Background(), client.handler.backend.odr, server.handler.blockchain.GetHeaderByNumber(n).Hash(), n)
return err == nil
},
},
}
for _, c := range cases {
for i := c.from; i <= c.to; i++ {
if !c.method(i) {
t.Fatalf("rpc method %s failed, number %d", c.methodName, i)
}
}
}
// Check GetBloombits
_, err := light.GetBloomBits(context.Background(), client.handler.backend.odr, 0, []uint64{0})
if err != nil {
t.Fatalf("Failed to retrieve bloombits of pruned section: %v", err)
}
// Ensure the ODR cached data can be cleaned by pruner.
newPruner(client.db, client.chtIndexer, client.bloomTrieIndexer)
time.Sleep(50 * time.Millisecond) // Ensure pruner have enough time to prune data.
checkPruned(1, config.ChtSize-1) // Ensure all cached data(by odr) is cleaned.
}

@ -79,7 +79,7 @@ func tfCodeAccess(db ethdb.Database, bhash common.Hash, num uint64) light.OdrReq
func testAccess(t *testing.T, protocol int, fn accessTestFn) { func testAccess(t *testing.T, protocol int, fn accessTestFn) {
// Assemble the test environment // Assemble the test environment
server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true) server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true, true)
defer tearDown() defer tearDown()
// Ensure the client has synced all necessary data. // Ensure the client has synced all necessary data.

@ -77,8 +77,8 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
iConfig: light.DefaultServerIndexerConfig, iConfig: light.DefaultServerIndexerConfig,
chainDb: e.ChainDb(), chainDb: e.ChainDb(),
chainReader: e.BlockChain(), chainReader: e.BlockChain(),
chtIndexer: light.NewChtIndexer(e.ChainDb(), nil, params.CHTFrequency, params.HelperTrieProcessConfirmations), chtIndexer: light.NewChtIndexer(e.ChainDb(), nil, params.CHTFrequency, params.HelperTrieProcessConfirmations, true),
bloomTrieIndexer: light.NewBloomTrieIndexer(e.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency), bloomTrieIndexer: light.NewBloomTrieIndexer(e.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency, true),
closeCh: make(chan struct{}), closeCh: make(chan struct{}),
}, },
archiveMode: e.ArchiveMode(), archiveMode: e.ArchiveMode(),

@ -54,7 +54,7 @@ func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) {
} }
} }
// Generate 512+4 blocks (totally 1 CHT sections) // Generate 512+4 blocks (totally 1 CHT sections)
server, client, tearDown := newClientServerEnv(t, int(config.ChtSize+config.ChtConfirms), protocol, waitIndexers, nil, 0, false, false) server, client, tearDown := newClientServerEnv(t, int(config.ChtSize+config.ChtConfirms), protocol, waitIndexers, nil, 0, false, false, true)
defer tearDown() defer tearDown()
expected := config.ChtSize + config.ChtConfirms expected := config.ChtSize + config.ChtConfirms
@ -144,7 +144,7 @@ func testMissOracleBackend(t *testing.T, hasCheckpoint bool) {
} }
} }
// Generate 512+4 blocks (totally 1 CHT sections) // Generate 512+4 blocks (totally 1 CHT sections)
server, client, tearDown := newClientServerEnv(t, int(config.ChtSize+config.ChtConfirms), 3, waitIndexers, nil, 0, false, false) server, client, tearDown := newClientServerEnv(t, int(config.ChtSize+config.ChtConfirms), 3, waitIndexers, nil, 0, false, false, true)
defer tearDown() defer tearDown()
expected := config.ChtSize + config.ChtConfirms expected := config.ChtSize + config.ChtConfirms

@ -158,11 +158,11 @@ func prepare(n int, backend *backends.SimulatedBackend) {
} }
// testIndexers creates a set of indexers with specified params for testing purpose. // testIndexers creates a set of indexers with specified params for testing purpose.
func testIndexers(db ethdb.Database, odr light.OdrBackend, config *light.IndexerConfig) []*core.ChainIndexer { func testIndexers(db ethdb.Database, odr light.OdrBackend, config *light.IndexerConfig, disablePruning bool) []*core.ChainIndexer {
var indexers [3]*core.ChainIndexer var indexers [3]*core.ChainIndexer
indexers[0] = light.NewChtIndexer(db, odr, config.ChtSize, config.ChtConfirms) indexers[0] = light.NewChtIndexer(db, odr, config.ChtSize, config.ChtConfirms, disablePruning)
indexers[1] = eth.NewBloomIndexer(db, config.BloomSize, config.BloomConfirms) indexers[1] = eth.NewBloomIndexer(db, config.BloomSize, config.BloomConfirms)
indexers[2] = light.NewBloomTrieIndexer(db, odr, config.BloomSize, config.BloomTrieSize) indexers[2] = light.NewBloomTrieIndexer(db, odr, config.BloomSize, config.BloomTrieSize, disablePruning)
// make bloomTrieIndexer as a child indexer of bloom indexer. // make bloomTrieIndexer as a child indexer of bloom indexer.
indexers[1].AddChildIndexer(indexers[2]) indexers[1].AddChildIndexer(indexers[2])
return indexers[:] return indexers[:]
@ -456,7 +456,7 @@ type testServer struct {
func newServerEnv(t *testing.T, blocks int, protocol int, callback indexerCallback, simClock bool, newPeer bool, testCost uint64) (*testServer, func()) { func newServerEnv(t *testing.T, blocks int, protocol int, callback indexerCallback, simClock bool, newPeer bool, testCost uint64) (*testServer, func()) {
db := rawdb.NewMemoryDatabase() db := rawdb.NewMemoryDatabase()
indexers := testIndexers(db, nil, light.TestServerIndexerConfig) indexers := testIndexers(db, nil, light.TestServerIndexerConfig, true)
var clock mclock.Clock = &mclock.System{} var clock mclock.Clock = &mclock.System{}
if simClock { if simClock {
@ -499,7 +499,7 @@ func newServerEnv(t *testing.T, blocks int, protocol int, callback indexerCallba
return server, teardown return server, teardown
} }
func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexerCallback, ulcServers []string, ulcFraction int, simClock bool, connect bool) (*testServer, *testClient, func()) { func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexerCallback, ulcServers []string, ulcFraction int, simClock bool, connect bool, disablePruning bool) (*testServer, *testClient, func()) {
sdb, cdb := rawdb.NewMemoryDatabase(), rawdb.NewMemoryDatabase() sdb, cdb := rawdb.NewMemoryDatabase(), rawdb.NewMemoryDatabase()
speers, cpeers := newServerPeerSet(), newClientPeerSet() speers, cpeers := newServerPeerSet(), newClientPeerSet()
@ -511,8 +511,8 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexer
rm := newRetrieveManager(speers, dist, func() time.Duration { return time.Millisecond * 500 }) rm := newRetrieveManager(speers, dist, func() time.Duration { return time.Millisecond * 500 })
odr := NewLesOdr(cdb, light.TestClientIndexerConfig, rm) odr := NewLesOdr(cdb, light.TestClientIndexerConfig, rm)
sindexers := testIndexers(sdb, nil, light.TestServerIndexerConfig) sindexers := testIndexers(sdb, nil, light.TestServerIndexerConfig, true)
cIndexers := testIndexers(cdb, odr, light.TestClientIndexerConfig) cIndexers := testIndexers(cdb, odr, light.TestClientIndexerConfig, disablePruning)
scIndexer, sbIndexer, sbtIndexer := sindexers[0], sindexers[1], sindexers[2] scIndexer, sbIndexer, sbtIndexer := sindexers[0], sindexers[1], sindexers[2]
ccIndexer, cbIndexer, cbtIndexer := cIndexers[0], cIndexers[1], cIndexers[2] ccIndexer, cbIndexer, cbtIndexer := cIndexers[0], cIndexers[1], cIndexers[2]
@ -542,7 +542,7 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexer
} }
select { select {
case <-done: case <-done:
case <-time.After(3 * time.Second): case <-time.After(10 * time.Second):
t.Fatal("test peer did not connect and sync within 3s") t.Fatal("test peer did not connect and sync within 3s")
} }
} }

@ -138,6 +138,6 @@ func newTestServerPeer(t *testing.T, blocks int, protocol int) (*testServer, *en
// newTestLightPeer creates node with light sync mode // newTestLightPeer creates node with light sync mode
func newTestLightPeer(t *testing.T, protocol int, ulcServers []string, ulcFraction int) (*testClient, func()) { func newTestLightPeer(t *testing.T, protocol int, ulcServers []string, ulcFraction int) (*testClient, func()) {
_, c, teardown := newClientServerEnv(t, 0, protocol, nil, ulcServers, ulcFraction, false, false) _, c, teardown := newClientServerEnv(t, 0, protocol, nil, ulcServers, ulcFraction, false, false, true)
return c, teardown return c, teardown
} }

@ -112,7 +112,7 @@ func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.
if header := bc.GetHeaderByHash(hash); header != nil { if header := bc.GetHeaderByHash(hash); header != nil {
log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash) log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash)
bc.SetHead(header.Number.Uint64() - 1) bc.SetHead(header.Number.Uint64() - 1)
log.Error("Chain rewind was successful, resuming normal operation") log.Info("Chain rewind was successful, resuming normal operation")
} }
} }
return bc, nil return bc, nil
@ -155,7 +155,11 @@ func (lc *LightChain) loadLastState() error {
// Corrupt or empty database, init from scratch // Corrupt or empty database, init from scratch
lc.Reset() lc.Reset()
} else { } else {
if header := lc.GetHeaderByHash(head); header != nil { header := lc.GetHeaderByHash(head)
if header == nil {
// Corrupt or empty database, init from scratch
lc.Reset()
} else {
lc.hc.SetCurrentHeader(header) lc.hc.SetCurrentHeader(header)
} }
} }
@ -163,7 +167,6 @@ func (lc *LightChain) loadLastState() error {
header := lc.hc.CurrentHeader() header := lc.hc.CurrentHeader()
headerTd := lc.GetTd(header.Hash(), header.Number.Uint64()) headerTd := lc.GetTd(header.Hash(), header.Number.Uint64())
log.Info("Loaded most recent local header", "number", header.Number, "hash", header.Hash(), "td", headerTd, "age", common.PrettyAge(time.Unix(int64(header.Time), 0))) log.Info("Loaded most recent local header", "number", header.Number, "hash", header.Hash(), "td", headerTd, "age", common.PrettyAge(time.Unix(int64(header.Time), 0)))
return nil return nil
} }
@ -431,6 +434,17 @@ func (lc *LightChain) GetTdByHash(hash common.Hash) *big.Int {
return lc.hc.GetTdByHash(hash) return lc.hc.GetTdByHash(hash)
} }
// GetHeaderByNumberOdr retrieves the total difficult from the database or
// network by hash and number, caching it (associated with its hash) if found.
func (lc *LightChain) GetTdOdr(ctx context.Context, hash common.Hash, number uint64) *big.Int {
td := lc.GetTd(hash, number)
if td != nil {
return td
}
td, _ = GetTd(ctx, lc.odr, hash, number)
return td
}
// GetHeader retrieves a block header from the database by hash and number, // GetHeader retrieves a block header from the database by hash and number,
// caching it if found. // caching it if found.
func (lc *LightChain) GetHeader(hash common.Hash, number uint64) *types.Header { func (lc *LightChain) GetHeader(hash common.Hash, number uint64) *types.Header {

@ -82,7 +82,6 @@ func StorageTrieID(state *TrieID, addrHash, root common.Hash) *TrieID {
// TrieRequest is the ODR request type for state/storage trie entries // TrieRequest is the ODR request type for state/storage trie entries
type TrieRequest struct { type TrieRequest struct {
OdrRequest
Id *TrieID Id *TrieID
Key []byte Key []byte
Proof *NodeSet Proof *NodeSet
@ -95,7 +94,6 @@ func (req *TrieRequest) StoreResult(db ethdb.Database) {
// CodeRequest is the ODR request type for retrieving contract code // CodeRequest is the ODR request type for retrieving contract code
type CodeRequest struct { type CodeRequest struct {
OdrRequest
Id *TrieID // references storage trie of the account Id *TrieID // references storage trie of the account
Hash common.Hash Hash common.Hash
Data []byte Data []byte
@ -108,9 +106,9 @@ func (req *CodeRequest) StoreResult(db ethdb.Database) {
// BlockRequest is the ODR request type for retrieving block bodies // BlockRequest is the ODR request type for retrieving block bodies
type BlockRequest struct { type BlockRequest struct {
OdrRequest
Hash common.Hash Hash common.Hash
Number uint64 Number uint64
Header *types.Header
Rlp []byte Rlp []byte
} }
@ -119,9 +117,8 @@ func (req *BlockRequest) StoreResult(db ethdb.Database) {
rawdb.WriteBodyRLP(db, req.Hash, req.Number, req.Rlp) rawdb.WriteBodyRLP(db, req.Hash, req.Number, req.Rlp)
} }
// ReceiptsRequest is the ODR request type for retrieving block bodies // ReceiptsRequest is the ODR request type for retrieving receipts.
type ReceiptsRequest struct { type ReceiptsRequest struct {
OdrRequest
Untrusted bool // Indicator whether the result retrieved is trusted or not Untrusted bool // Indicator whether the result retrieved is trusted or not
Hash common.Hash Hash common.Hash
Number uint64 Number uint64
@ -138,7 +135,6 @@ func (req *ReceiptsRequest) StoreResult(db ethdb.Database) {
// ChtRequest is the ODR request type for state/storage trie entries // ChtRequest is the ODR request type for state/storage trie entries
type ChtRequest struct { type ChtRequest struct {
OdrRequest
Untrusted bool // Indicator whether the result retrieved is trusted or not Untrusted bool // Indicator whether the result retrieved is trusted or not
PeerId string // The specified peer id from which to retrieve data. PeerId string // The specified peer id from which to retrieve data.
Config *IndexerConfig Config *IndexerConfig
@ -193,7 +189,6 @@ type TxStatus struct {
// TxStatusRequest is the ODR request type for retrieving transaction status // TxStatusRequest is the ODR request type for retrieving transaction status
type TxStatusRequest struct { type TxStatusRequest struct {
OdrRequest
Hashes []common.Hash Hashes []common.Hash
Status []TxStatus Status []TxStatus
} }

@ -19,6 +19,7 @@ package light
import ( import (
"bytes" "bytes"
"context" "context"
"math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
@ -30,65 +31,83 @@ import (
var sha3Nil = crypto.Keccak256Hash(nil) var sha3Nil = crypto.Keccak256Hash(nil)
// GetHeaderByNumber retrieves the canonical block header corresponding to the
// given number.
func GetHeaderByNumber(ctx context.Context, odr OdrBackend, number uint64) (*types.Header, error) { func GetHeaderByNumber(ctx context.Context, odr OdrBackend, number uint64) (*types.Header, error) {
// Try to find it in the local database first.
db := odr.Database() db := odr.Database()
hash := rawdb.ReadCanonicalHash(db, number) hash := rawdb.ReadCanonicalHash(db, number)
if (hash != common.Hash{}) {
// if there is a canonical hash, there is a header too
header := rawdb.ReadHeader(db, hash, number)
if header == nil {
panic("Canonical hash present but header not found")
}
return header, nil
}
var ( // If there is a canonical hash, there should have a header too.
chtCount, sectionHeadNum uint64 // But if it's pruned, re-fetch from network again.
sectionHead common.Hash if (hash != common.Hash{}) {
) if header := rawdb.ReadHeader(db, hash, number); header != nil {
if odr.ChtIndexer() != nil { return header, nil
chtCount, sectionHeadNum, sectionHead = odr.ChtIndexer().Sections()
canonicalHash := rawdb.ReadCanonicalHash(db, sectionHeadNum)
// if the CHT was injected as a trusted checkpoint, we have no canonical hash yet so we accept zero hash too
for chtCount > 0 && canonicalHash != sectionHead && canonicalHash != (common.Hash{}) {
chtCount--
if chtCount > 0 {
sectionHeadNum = chtCount*odr.IndexerConfig().ChtSize - 1
sectionHead = odr.ChtIndexer().SectionHead(chtCount - 1)
canonicalHash = rawdb.ReadCanonicalHash(db, sectionHeadNum)
}
} }
} }
if number >= chtCount*odr.IndexerConfig().ChtSize { // Retrieve the header via ODR, ensure the requested header is covered
// by local trusted CHT.
chts, _, chtHead := odr.ChtIndexer().Sections()
if number >= chts*odr.IndexerConfig().ChtSize {
return nil, errNoTrustedCht return nil, errNoTrustedCht
} }
r := &ChtRequest{ChtRoot: GetChtRoot(db, chtCount-1, sectionHead), ChtNum: chtCount - 1, BlockNum: number, Config: odr.IndexerConfig()} r := &ChtRequest{
ChtRoot: GetChtRoot(db, chts-1, chtHead),
ChtNum: chts - 1,
BlockNum: number,
Config: odr.IndexerConfig(),
}
if err := odr.Retrieve(ctx, r); err != nil { if err := odr.Retrieve(ctx, r); err != nil {
return nil, err return nil, err
} }
return r.Header, nil return r.Header, nil
} }
// GetUntrustedHeaderByNumber fetches specified block header without correctness checking. // GetUntrustedHeaderByNumber retrieves specified block header without
// Note this function should only be used in light client checkpoint syncing. // correctness checking. Note this function should only be used in light
// client checkpoint syncing.
func GetUntrustedHeaderByNumber(ctx context.Context, odr OdrBackend, number uint64, peerId string) (*types.Header, error) { func GetUntrustedHeaderByNumber(ctx context.Context, odr OdrBackend, number uint64, peerId string) (*types.Header, error) {
r := &ChtRequest{BlockNum: number, ChtNum: number / odr.IndexerConfig().ChtSize, Untrusted: true, PeerId: peerId, Config: odr.IndexerConfig()} // todo(rjl493456442) it's a hack to retrieve headers which is not covered
// by CHT. Fix it in LES4
r := &ChtRequest{
BlockNum: number,
ChtNum: number / odr.IndexerConfig().ChtSize,
Untrusted: true,
PeerId: peerId,
Config: odr.IndexerConfig(),
}
if err := odr.Retrieve(ctx, r); err != nil { if err := odr.Retrieve(ctx, r); err != nil {
return nil, err return nil, err
} }
return r.Header, nil return r.Header, nil
} }
// GetCanonicalHash retrieves the canonical block hash corresponding to the number.
func GetCanonicalHash(ctx context.Context, odr OdrBackend, number uint64) (common.Hash, error) { func GetCanonicalHash(ctx context.Context, odr OdrBackend, number uint64) (common.Hash, error) {
hash := rawdb.ReadCanonicalHash(odr.Database(), number) hash := rawdb.ReadCanonicalHash(odr.Database(), number)
if (hash != common.Hash{}) { if hash != (common.Hash{}) {
return hash, nil return hash, nil
} }
header, err := GetHeaderByNumber(ctx, odr, number) header, err := GetHeaderByNumber(ctx, odr, number)
if header != nil { if err != nil {
return header.Hash(), nil return common.Hash{}, err
}
// number -> canonical mapping already be stored in db, get it.
return header.Hash(), nil
}
// GetTd retrieves the total difficulty corresponding to the number and hash.
func GetTd(ctx context.Context, odr OdrBackend, hash common.Hash, number uint64) (*big.Int, error) {
td := rawdb.ReadTd(odr.Database(), hash, number)
if td != nil {
return td, nil
}
_, err := GetHeaderByNumber(ctx, odr, number)
if err != nil {
return nil, err
} }
return common.Hash{}, err // <hash, number> -> td mapping already be stored in db, get it.
return rawdb.ReadTd(odr.Database(), hash, number), nil
} }
// GetBodyRLP retrieves the block body (transactions and uncles) in RLP encoding. // GetBodyRLP retrieves the block body (transactions and uncles) in RLP encoding.
@ -96,15 +115,19 @@ func GetBodyRLP(ctx context.Context, odr OdrBackend, hash common.Hash, number ui
if data := rawdb.ReadBodyRLP(odr.Database(), hash, number); data != nil { if data := rawdb.ReadBodyRLP(odr.Database(), hash, number); data != nil {
return data, nil return data, nil
} }
r := &BlockRequest{Hash: hash, Number: number} // Retrieve the block header first and pass it for verification.
header, err := GetHeaderByNumber(ctx, odr, number)
if err != nil {
return nil, errNoHeader
}
r := &BlockRequest{Hash: hash, Number: number, Header: header}
if err := odr.Retrieve(ctx, r); err != nil { if err := odr.Retrieve(ctx, r); err != nil {
return nil, err return nil, err
} else {
return r.Rlp, nil
} }
return r.Rlp, nil
} }
// GetBody retrieves the block body (transactons, uncles) corresponding to the // GetBody retrieves the block body (transactions, uncles) corresponding to the
// hash. // hash.
func GetBody(ctx context.Context, odr OdrBackend, hash common.Hash, number uint64) (*types.Body, error) { func GetBody(ctx context.Context, odr OdrBackend, hash common.Hash, number uint64) (*types.Body, error) {
data, err := GetBodyRLP(ctx, odr, hash, number) data, err := GetBodyRLP(ctx, odr, hash, number)
@ -122,8 +145,8 @@ func GetBody(ctx context.Context, odr OdrBackend, hash common.Hash, number uint6
// back from the stored header and body. // back from the stored header and body.
func GetBlock(ctx context.Context, odr OdrBackend, hash common.Hash, number uint64) (*types.Block, error) { func GetBlock(ctx context.Context, odr OdrBackend, hash common.Hash, number uint64) (*types.Block, error) {
// Retrieve the block header and body contents // Retrieve the block header and body contents
header := rawdb.ReadHeader(odr.Database(), hash, number) header, err := GetHeaderByNumber(ctx, odr, number)
if header == nil { if err != nil {
return nil, errNoHeader return nil, errNoHeader
} }
body, err := GetBody(ctx, odr, hash, number) body, err := GetBody(ctx, odr, hash, number)
@ -140,7 +163,11 @@ func GetBlockReceipts(ctx context.Context, odr OdrBackend, hash common.Hash, num
// Assume receipts are already stored locally and attempt to retrieve. // Assume receipts are already stored locally and attempt to retrieve.
receipts := rawdb.ReadRawReceipts(odr.Database(), hash, number) receipts := rawdb.ReadRawReceipts(odr.Database(), hash, number)
if receipts == nil { if receipts == nil {
r := &ReceiptsRequest{Hash: hash, Number: number} header, err := GetHeaderByNumber(ctx, odr, number)
if err != nil {
return nil, errNoHeader
}
r := &ReceiptsRequest{Hash: hash, Number: number, Header: header}
if err := odr.Retrieve(ctx, r); err != nil { if err := odr.Retrieve(ctx, r); err != nil {
return nil, err return nil, err
} }
@ -171,7 +198,6 @@ func GetBlockLogs(ctx context.Context, odr OdrBackend, hash common.Hash, number
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Return the logs without deriving any computed fields on the receipts
logs := make([][]*types.Log, len(receipts)) logs := make([][]*types.Log, len(receipts))
for i, receipt := range receipts { for i, receipt := range receipts {
logs[i] = receipt.Logs logs[i] = receipt.Logs
@ -203,64 +229,51 @@ func GetUntrustedBlockLogs(ctx context.Context, odr OdrBackend, header *types.He
return logs, nil return logs, nil
} }
// GetBloomBits retrieves a batch of compressed bloomBits vectors belonging to the given bit index and section indexes // GetBloomBits retrieves a batch of compressed bloomBits vectors belonging to
func GetBloomBits(ctx context.Context, odr OdrBackend, bitIdx uint, sectionIdxList []uint64) ([][]byte, error) { // the given bit index and section indexes.
var ( func GetBloomBits(ctx context.Context, odr OdrBackend, bit uint, sections []uint64) ([][]byte, error) {
db = odr.Database()
result = make([][]byte, len(sectionIdxList))
reqList []uint64
reqIdx []int
)
var ( var (
bloomTrieCount, sectionHeadNum uint64 reqIndex []int
sectionHead common.Hash reqSections []uint64
db = odr.Database()
result = make([][]byte, len(sections))
) )
if odr.BloomTrieIndexer() != nil { blooms, _, sectionHead := odr.BloomTrieIndexer().Sections()
bloomTrieCount, sectionHeadNum, sectionHead = odr.BloomTrieIndexer().Sections() for i, section := range sections {
canonicalHash := rawdb.ReadCanonicalHash(db, sectionHeadNum) sectionHead := rawdb.ReadCanonicalHash(db, (section+1)*odr.IndexerConfig().BloomSize-1)
// if the BloomTrie was injected as a trusted checkpoint, we have no canonical hash yet so we accept zero hash too // If we don't have the canonical hash stored for this section head number,
for bloomTrieCount > 0 && canonicalHash != sectionHead && canonicalHash != (common.Hash{}) { // we'll still look for an entry with a zero sectionHead (we store it with
bloomTrieCount-- // zero section head too if we don't know it at the time of the retrieval)
if bloomTrieCount > 0 { if bloomBits, _ := rawdb.ReadBloomBits(db, bit, section, sectionHead); len(bloomBits) != 0 {
sectionHeadNum = bloomTrieCount*odr.IndexerConfig().BloomTrieSize - 1
sectionHead = odr.BloomTrieIndexer().SectionHead(bloomTrieCount - 1)
canonicalHash = rawdb.ReadCanonicalHash(db, sectionHeadNum)
}
}
}
for i, sectionIdx := range sectionIdxList {
sectionHead := rawdb.ReadCanonicalHash(db, (sectionIdx+1)*odr.IndexerConfig().BloomSize-1)
// if we don't have the canonical hash stored for this section head number, we'll still look for
// an entry with a zero sectionHead (we store it with zero section head too if we don't know it
// at the time of the retrieval)
bloomBits, err := rawdb.ReadBloomBits(db, bitIdx, sectionIdx, sectionHead)
if err == nil {
result[i] = bloomBits result[i] = bloomBits
} else { continue
// TODO(rjl493456442) Convert sectionIndex to BloomTrie relative index
if sectionIdx >= bloomTrieCount {
return nil, errNoTrustedBloomTrie
}
reqList = append(reqList, sectionIdx)
reqIdx = append(reqIdx, i)
} }
// TODO(rjl493456442) Convert sectionIndex to BloomTrie relative index
if section >= blooms {
return nil, errNoTrustedBloomTrie
}
reqSections = append(reqSections, section)
reqIndex = append(reqIndex, i)
} }
if reqList == nil { // Find all bloombits in database, nothing to query via odr, return.
if reqSections == nil {
return result, nil return result, nil
} }
// Send odr request to retrieve missing bloombits.
r := &BloomRequest{BloomTrieRoot: GetBloomTrieRoot(db, bloomTrieCount-1, sectionHead), BloomTrieNum: bloomTrieCount - 1, r := &BloomRequest{
BitIdx: bitIdx, SectionIndexList: reqList, Config: odr.IndexerConfig()} BloomTrieRoot: GetBloomTrieRoot(db, blooms-1, sectionHead),
BloomTrieNum: blooms - 1,
BitIdx: bit,
SectionIndexList: reqSections,
Config: odr.IndexerConfig(),
}
if err := odr.Retrieve(ctx, r); err != nil { if err := odr.Retrieve(ctx, r); err != nil {
return nil, err return nil, err
} else {
for i, idx := range reqIdx {
result[idx] = r.BloomBits[i]
}
return result, nil
} }
for i, idx := range reqIndex {
result[idx] = r.BloomBits[i]
}
return result, nil
} }
// GetTransaction retrieves a canonical transaction by hash and also returns its position in the chain // GetTransaction retrieves a canonical transaction by hash and also returns its position in the chain
@ -268,17 +281,16 @@ func GetTransaction(ctx context.Context, odr OdrBackend, txHash common.Hash) (*t
r := &TxStatusRequest{Hashes: []common.Hash{txHash}} r := &TxStatusRequest{Hashes: []common.Hash{txHash}}
if err := odr.Retrieve(ctx, r); err != nil || r.Status[0].Status != core.TxStatusIncluded { if err := odr.Retrieve(ctx, r); err != nil || r.Status[0].Status != core.TxStatusIncluded {
return nil, common.Hash{}, 0, 0, err return nil, common.Hash{}, 0, 0, err
} else {
pos := r.Status[0].Lookup
// first ensure that we have the header, otherwise block body retrieval will fail
// also verify if this is a canonical block by getting the header by number and checking its hash
if header, err := GetHeaderByNumber(ctx, odr, pos.BlockIndex); err != nil || header.Hash() != pos.BlockHash {
return nil, common.Hash{}, 0, 0, err
}
if body, err := GetBody(ctx, odr, pos.BlockHash, pos.BlockIndex); err != nil || uint64(len(body.Transactions)) <= pos.Index || body.Transactions[pos.Index].Hash() != txHash {
return nil, common.Hash{}, 0, 0, err
} else {
return body.Transactions[pos.Index], pos.BlockHash, pos.BlockIndex, pos.Index, nil
}
} }
pos := r.Status[0].Lookup
// first ensure that we have the header, otherwise block body retrieval will fail
// also verify if this is a canonical block by getting the header by number and checking its hash
if header, err := GetHeaderByNumber(ctx, odr, pos.BlockIndex); err != nil || header.Hash() != pos.BlockHash {
return nil, common.Hash{}, 0, 0, err
}
body, err := GetBody(ctx, odr, pos.BlockHash, pos.BlockIndex)
if err != nil || uint64(len(body.Transactions)) <= pos.Index || body.Transactions[pos.Index].Hash() != txHash {
return nil, common.Hash{}, 0, 0, err
}
return body.Transactions[pos.Index], pos.BlockHash, pos.BlockIndex, pos.Index, nil
} }

@ -17,6 +17,7 @@
package light package light
import ( import (
"bytes"
"context" "context"
"encoding/binary" "encoding/binary"
"errors" "errors"
@ -24,6 +25,7 @@ import (
"math/big" "math/big"
"time" "time"
"github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/bitutil" "github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
@ -128,23 +130,27 @@ func StoreChtRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root common
// ChtIndexerBackend implements core.ChainIndexerBackend. // ChtIndexerBackend implements core.ChainIndexerBackend.
type ChtIndexerBackend struct { type ChtIndexerBackend struct {
disablePruning bool
diskdb, trieTable ethdb.Database diskdb, trieTable ethdb.Database
odr OdrBackend odr OdrBackend
triedb *trie.Database triedb *trie.Database
trieset mapset.Set
section, sectionSize uint64 section, sectionSize uint64
lastHash common.Hash lastHash common.Hash
trie *trie.Trie trie *trie.Trie
} }
// NewChtIndexer creates a Cht chain indexer // NewChtIndexer creates a Cht chain indexer
func NewChtIndexer(db ethdb.Database, odr OdrBackend, size, confirms uint64) *core.ChainIndexer { func NewChtIndexer(db ethdb.Database, odr OdrBackend, size, confirms uint64, disablePruning bool) *core.ChainIndexer {
trieTable := rawdb.NewTable(db, ChtTablePrefix) trieTable := rawdb.NewTable(db, ChtTablePrefix)
backend := &ChtIndexerBackend{ backend := &ChtIndexerBackend{
diskdb: db, diskdb: db,
odr: odr, odr: odr,
trieTable: trieTable, trieTable: trieTable,
triedb: trie.NewDatabaseWithCache(trieTable, 1), // Use a tiny cache only to keep memory down triedb: trie.NewDatabaseWithCache(trieTable, 1), // Use a tiny cache only to keep memory down
sectionSize: size, trieset: mapset.NewSet(),
sectionSize: size,
disablePruning: disablePruning,
} }
return core.NewChainIndexer(db, rawdb.NewTable(db, "chtIndexV2-"), backend, size, confirms, time.Millisecond*100, "cht") return core.NewChainIndexer(db, rawdb.NewTable(db, "chtIndexV2-"), backend, size, confirms, time.Millisecond*100, "cht")
} }
@ -189,7 +195,6 @@ func (c *ChtIndexerBackend) Reset(ctx context.Context, section uint64, lastSecti
c.trie, err = trie.New(root, c.triedb) c.trie, err = trie.New(root, c.triedb)
} }
} }
c.section = section c.section = section
return err return err
} }
@ -216,13 +221,83 @@ func (c *ChtIndexerBackend) Commit() error {
if err != nil { if err != nil {
return err return err
} }
c.triedb.Commit(root, false) // Pruning historical trie nodes if necessary.
if !c.disablePruning {
// Flush the triedb and track the latest trie nodes.
c.trieset.Clear()
c.triedb.Commit(root, false, func(hash common.Hash) { c.trieset.Add(hash) })
it := c.trieTable.NewIterator(nil, nil)
defer it.Release()
var (
deleted int
remaining int
t = time.Now()
)
for it.Next() {
trimmed := bytes.TrimPrefix(it.Key(), []byte(ChtTablePrefix))
if !c.trieset.Contains(common.BytesToHash(trimmed)) {
c.trieTable.Delete(trimmed)
deleted += 1
} else {
remaining += 1
}
}
log.Debug("Prune historical CHT trie nodes", "deleted", deleted, "remaining", remaining, "elapsed", common.PrettyDuration(time.Since(t)))
} else {
c.triedb.Commit(root, false, nil)
}
log.Info("Storing CHT", "section", c.section, "head", fmt.Sprintf("%064x", c.lastHash), "root", fmt.Sprintf("%064x", root)) log.Info("Storing CHT", "section", c.section, "head", fmt.Sprintf("%064x", c.lastHash), "root", fmt.Sprintf("%064x", root))
StoreChtRoot(c.diskdb, c.section, c.lastHash, root) StoreChtRoot(c.diskdb, c.section, c.lastHash, root)
return nil return nil
} }
// PruneSections implements core.ChainIndexerBackend which deletes all
// chain data(except hash<->number mappings) older than the specified
// threshold.
func (c *ChtIndexerBackend) Prune(threshold uint64) error {
// Short circuit if the light pruning is disabled.
if c.disablePruning {
return nil
}
t := time.Now()
// Always keep genesis header in database.
start, end := uint64(1), (threshold+1)*c.sectionSize
var batch = c.diskdb.NewBatch()
for {
numbers, hashes := rawdb.ReadAllCanonicalHashes(c.diskdb, start, end, 10240)
if len(numbers) == 0 {
break
}
for i := 0; i < len(numbers); i++ {
// Keep hash<->number mapping in database otherwise the hash based
// API(e.g. GetReceipt, GetLogs) will be broken.
//
// Storage size wise, the size of a mapping is ~41bytes. For one
// section is about 1.3MB which is acceptable.
//
// In order to totally get rid of this index, we need an additional
// flag to specify how many historical data light client can serve.
rawdb.DeleteCanonicalHash(batch, numbers[i])
rawdb.DeleteBlockWithoutNumber(batch, hashes[i], numbers[i])
}
if batch.ValueSize() > ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
return err
}
batch.Reset()
}
start = numbers[len(numbers)-1] + 1
}
if err := batch.Write(); err != nil {
return err
}
log.Debug("Prune history headers", "threshold", threshold, "elapsed", common.PrettyDuration(time.Since(t)))
return nil
}
var ( var (
bloomTriePrefix = []byte("bltRoot-") // bloomTriePrefix + bloomTrieNum (uint64 big endian) -> trie root hash bloomTriePrefix = []byte("bltRoot-") // bloomTriePrefix + bloomTrieNum (uint64 big endian) -> trie root hash
BloomTrieTablePrefix = "blt-" BloomTrieTablePrefix = "blt-"
@ -245,8 +320,10 @@ func StoreBloomTrieRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root
// BloomTrieIndexerBackend implements core.ChainIndexerBackend // BloomTrieIndexerBackend implements core.ChainIndexerBackend
type BloomTrieIndexerBackend struct { type BloomTrieIndexerBackend struct {
disablePruning bool
diskdb, trieTable ethdb.Database diskdb, trieTable ethdb.Database
triedb *trie.Database triedb *trie.Database
trieset mapset.Set
odr OdrBackend odr OdrBackend
section uint64 section uint64
parentSize uint64 parentSize uint64
@ -257,15 +334,17 @@ type BloomTrieIndexerBackend struct {
} }
// NewBloomTrieIndexer creates a BloomTrie chain indexer // NewBloomTrieIndexer creates a BloomTrie chain indexer
func NewBloomTrieIndexer(db ethdb.Database, odr OdrBackend, parentSize, size uint64) *core.ChainIndexer { func NewBloomTrieIndexer(db ethdb.Database, odr OdrBackend, parentSize, size uint64, disablePruning bool) *core.ChainIndexer {
trieTable := rawdb.NewTable(db, BloomTrieTablePrefix) trieTable := rawdb.NewTable(db, BloomTrieTablePrefix)
backend := &BloomTrieIndexerBackend{ backend := &BloomTrieIndexerBackend{
diskdb: db, diskdb: db,
odr: odr, odr: odr,
trieTable: trieTable, trieTable: trieTable,
triedb: trie.NewDatabaseWithCache(trieTable, 1), // Use a tiny cache only to keep memory down triedb: trie.NewDatabaseWithCache(trieTable, 1), // Use a tiny cache only to keep memory down
parentSize: parentSize, trieset: mapset.NewSet(),
size: size, parentSize: parentSize,
size: size,
disablePruning: disablePruning,
} }
backend.bloomTrieRatio = size / parentSize backend.bloomTrieRatio = size / parentSize
backend.sectionHeads = make([]common.Hash, backend.bloomTrieRatio) backend.sectionHeads = make([]common.Hash, backend.bloomTrieRatio)
@ -303,7 +382,6 @@ func (b *BloomTrieIndexerBackend) fetchMissingNodes(ctx context.Context, section
} }
}() }()
} }
for i := uint(0); i < types.BloomBitLength; i++ { for i := uint(0); i < types.BloomBitLength; i++ {
indexCh <- i indexCh <- i
} }
@ -380,10 +458,51 @@ func (b *BloomTrieIndexerBackend) Commit() error {
if err != nil { if err != nil {
return err return err
} }
b.triedb.Commit(root, false) // Pruning historical trie nodes if necessary.
if !b.disablePruning {
// Flush the triedb and track the latest trie nodes.
b.trieset.Clear()
b.triedb.Commit(root, false, func(hash common.Hash) { b.trieset.Add(hash) })
it := b.trieTable.NewIterator(nil, nil)
defer it.Release()
var (
deleted int
remaining int
t = time.Now()
)
for it.Next() {
trimmed := bytes.TrimPrefix(it.Key(), []byte(BloomTrieTablePrefix))
if !b.trieset.Contains(common.BytesToHash(trimmed)) {
b.trieTable.Delete(trimmed)
deleted += 1
} else {
remaining += 1
}
}
log.Debug("Prune historical bloom trie nodes", "deleted", deleted, "remaining", remaining, "elapsed", common.PrettyDuration(time.Since(t)))
} else {
b.triedb.Commit(root, false, nil)
}
sectionHead := b.sectionHeads[b.bloomTrieRatio-1] sectionHead := b.sectionHeads[b.bloomTrieRatio-1]
log.Info("Storing bloom trie", "section", b.section, "head", fmt.Sprintf("%064x", sectionHead), "root", fmt.Sprintf("%064x", root), "compression", float64(compSize)/float64(decompSize))
StoreBloomTrieRoot(b.diskdb, b.section, sectionHead, root) StoreBloomTrieRoot(b.diskdb, b.section, sectionHead, root)
log.Info("Storing bloom trie", "section", b.section, "head", fmt.Sprintf("%064x", sectionHead), "root", fmt.Sprintf("%064x", root), "compression", float64(compSize)/float64(decompSize))
return nil
}
// Prune implements core.ChainIndexerBackend which deletes all
// bloombits which older than the specified threshold.
func (b *BloomTrieIndexerBackend) Prune(threshold uint64) error {
// Short circuit if the light pruning is disabled.
if b.disablePruning {
return nil
}
start := time.Now()
for i := uint(0); i < types.BloomBitLength; i++ {
rawdb.DeleteBloombits(b.diskdb, i, 0, threshold*b.bloomTrieRatio+b.bloomTrieRatio)
}
log.Debug("Prune history bloombits", "threshold", threshold, "elapsed", common.PrettyDuration(time.Since(start)))
return nil return nil
} }

@ -53,9 +53,15 @@ const (
// CheckpointProcessConfirmations is the number before a checkpoint is generated // CheckpointProcessConfirmations is the number before a checkpoint is generated
CheckpointProcessConfirmations = 256 CheckpointProcessConfirmations = 256
// ImmutabilityThreshold is the number of blocks after which a chain segment is // FullImmutabilityThreshold is the number of blocks after which a chain segment is
// considered immutable (i.e. soft finality). It is used by the downloader as a // considered immutable (i.e. soft finality). It is used by the downloader as a
// hard limit against deep ancestors, by the blockchain against deep reorgs, by // hard limit against deep ancestors, by the blockchain against deep reorgs, by
// the freezer as the cutoff threshold and by clique as the snapshot trust limit. // the freezer as the cutoff threshold and by clique as the snapshot trust limit.
ImmutabilityThreshold = 90000 FullImmutabilityThreshold = 90000
// LightImmutabilityThreshold is the number of blocks after which a header chain
// segment is considered immutable for light client(i.e. soft finality). It is used by
// the downloader as a hard limit against deep ancestors, by the blockchain against deep
// reorgs, by the light pruner as the pruning validity guarantee.
LightImmutabilityThreshold = 30000
) )

@ -693,7 +693,7 @@ func (db *Database) Cap(limit common.StorageSize) error {
// //
// Note, this method is a non-synchronized mutator. It is unsafe to call this // Note, this method is a non-synchronized mutator. It is unsafe to call this
// concurrently with other mutators. // concurrently with other mutators.
func (db *Database) Commit(node common.Hash, report bool) error { func (db *Database) Commit(node common.Hash, report bool, callback func(common.Hash)) error {
// Create a database batch to flush persistent data out. It is important that // Create a database batch to flush persistent data out. It is important that
// outside code doesn't see an inconsistent state (referenced data removed from // outside code doesn't see an inconsistent state (referenced data removed from
// memory cache during commit but not yet in persistent storage). This is ensured // memory cache during commit but not yet in persistent storage). This is ensured
@ -732,7 +732,7 @@ func (db *Database) Commit(node common.Hash, report bool) error {
nodes, storage := len(db.dirties), db.dirtiesSize nodes, storage := len(db.dirties), db.dirtiesSize
uncacher := &cleaner{db} uncacher := &cleaner{db}
if err := db.commit(node, batch, uncacher); err != nil { if err := db.commit(node, batch, uncacher, callback); err != nil {
log.Error("Failed to commit trie from trie database", "err", err) log.Error("Failed to commit trie from trie database", "err", err)
return err return err
} }
@ -771,7 +771,7 @@ func (db *Database) Commit(node common.Hash, report bool) error {
} }
// commit is the private locked version of Commit. // commit is the private locked version of Commit.
func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleaner) error { func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleaner, callback func(common.Hash)) error {
// If the node does not exist, it's a previously committed node // If the node does not exist, it's a previously committed node
node, ok := db.dirties[hash] node, ok := db.dirties[hash]
if !ok { if !ok {
@ -780,7 +780,7 @@ func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleane
var err error var err error
node.forChilds(func(child common.Hash) { node.forChilds(func(child common.Hash) {
if err == nil { if err == nil {
err = db.commit(child, batch, uncacher) err = db.commit(child, batch, uncacher, callback)
} }
}) })
if err != nil { if err != nil {
@ -789,6 +789,9 @@ func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleane
if err := batch.Put(hash[:], node.rlp()); err != nil { if err := batch.Put(hash[:], node.rlp()); err != nil {
return err return err
} }
if callback != nil {
callback(hash)
}
// If we've reached an optimal batch size, commit and start over // If we've reached an optimal batch size, commit and start over
if batch.ValueSize() >= ethdb.IdealBatchSize { if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {

@ -301,7 +301,7 @@ func testIteratorContinueAfterError(t *testing.T, memonly bool) {
} }
tr.Commit(nil) tr.Commit(nil)
if !memonly { if !memonly {
triedb.Commit(tr.Hash(), true) triedb.Commit(tr.Hash(), true, nil)
} }
wantNodeCount := checkIteratorNoDups(t, tr.NodeIterator(nil), nil) wantNodeCount := checkIteratorNoDups(t, tr.NodeIterator(nil), nil)
@ -392,7 +392,7 @@ func testIteratorContinueAfterSeekError(t *testing.T, memonly bool) {
} }
root, _ := ctr.Commit(nil) root, _ := ctr.Commit(nil)
if !memonly { if !memonly {
triedb.Commit(root, true) triedb.Commit(root, true, nil)
} }
barNodeHash := common.HexToHash("05041990364eb72fcb1127652ce40d8bab765f2bfe53225b1170d276cc101c2e") barNodeHash := common.HexToHash("05041990364eb72fcb1127652ce40d8bab765f2bfe53225b1170d276cc101c2e")
var ( var (

@ -88,7 +88,7 @@ func testMissingNode(t *testing.T, memonly bool) {
updateString(trie, "123456", "asdfasdfasdfasdfasdfasdfasdfasdf") updateString(trie, "123456", "asdfasdfasdfasdfasdfasdfasdfasdf")
root, _ := trie.Commit(nil) root, _ := trie.Commit(nil)
if !memonly { if !memonly {
triedb.Commit(root, true) triedb.Commit(root, true, nil)
} }
trie, _ = New(root, triedb) trie, _ = New(root, triedb)

Loading…
Cancel
Save