From 87c0ba92136a75db0ab2aba1046d4a9860375d6a Mon Sep 17 00:00:00 2001 From: gary rong Date: Fri, 21 Aug 2020 20:10:40 +0800 Subject: [PATCH] core, eth, les, trie: add a prefix to contract code (#21080) --- cmd/evm/internal/t8ntool/execution.go | 5 +- cmd/geth/retesteth.go | 2 +- consensus/clique/clique.go | 3 +- consensus/ethash/consensus.go | 3 +- core/block_validator.go | 5 +- core/blockchain.go | 25 +++- core/blockchain_test.go | 5 +- core/genesis.go | 3 +- core/rawdb/accessors_chain_test.go | 4 +- core/rawdb/accessors_indexes_test.go | 28 +++- core/rawdb/accessors_metadata.go | 17 --- core/rawdb/accessors_state.go | 96 +++++++++++++ core/rawdb/chain_iterator_test.go | 4 +- core/rawdb/database.go | 4 + core/rawdb/schema.go | 16 +++ core/state/database.go | 35 ++++- core/state/iterator_test.go | 6 +- core/state/statedb.go | 21 +-- core/state/sync.go | 2 +- core/state/sync_test.go | 95 +++++++++---- core/tx_pool_test.go | 3 +- core/types/block.go | 8 +- core/types/block_test.go | 28 +++- core/types/derive_sha.go | 17 ++- eth/downloader/downloader.go | 2 +- eth/downloader/queue.go | 5 +- eth/downloader/statesync.go | 8 +- eth/fetcher/block_fetcher.go | 5 +- eth/fetcher/block_fetcher_test.go | 3 +- eth/handler.go | 11 +- les/odr_requests.go | 4 +- les/server_handler.go | 2 +- light/odr.go | 2 +- light/odr_test.go | 2 +- light/trie.go | 4 +- miner/worker.go | 2 + trie/database.go | 129 ++++++------------ trie/secure_trie.go | 3 +- trie/sync.go | 187 ++++++++++++++++---------- trie/sync_bloom.go | 19 ++- trie/sync_test.go | 36 +++-- trie/trie.go | 6 + 42 files changed, 579 insertions(+), 286 deletions(-) create mode 100644 core/rawdb/accessors_state.go diff --git a/cmd/evm/internal/t8ntool/execution.go b/cmd/evm/internal/t8ntool/execution.go index 0fd6b869fc..75586d588b 100644 --- a/cmd/evm/internal/t8ntool/execution.go +++ b/cmd/evm/internal/t8ntool/execution.go @@ -34,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" "golang.org/x/crypto/sha3" ) @@ -220,8 +221,8 @@ func (pre *Prestate) Apply(vmConfig vm.Config, chainConfig *params.ChainConfig, } execRs := &ExecutionResult{ StateRoot: root, - TxRoot: types.DeriveSha(includedTxs), - ReceiptRoot: types.DeriveSha(receipts), + TxRoot: types.DeriveSha(includedTxs, new(trie.Trie)), + ReceiptRoot: types.DeriveSha(receipts, new(trie.Trie)), Bloom: types.CreateBloom(receipts), LogsHash: rlpHash(statedb.Logs()), Receipts: receipts, diff --git a/cmd/geth/retesteth.go b/cmd/geth/retesteth.go index f4ec832789..1d4c15d1e6 100644 --- a/cmd/geth/retesteth.go +++ b/cmd/geth/retesteth.go @@ -248,7 +248,7 @@ func (e *NoRewardEngine) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header.Root = statedb.IntermediateRoot(chain.Config().IsEIP158(header.Number)) // Header seems complete, assemble into a block and return - return types.NewBlock(header, txs, uncles, receipts), nil + return types.NewBlock(header, txs, uncles, receipts, new(trie.Trie)), nil } } diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index a2e61bbc05..02f2451133 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -39,6 +39,7 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/trie" lru "github.com/hashicorp/golang-lru" "golang.org/x/crypto/sha3" ) @@ -565,7 +566,7 @@ func (c *Clique) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header * header.UncleHash = types.CalcUncleHash(nil) // Assemble and return the final block for sealing - return types.NewBlock(header, txs, nil, receipts), nil + return types.NewBlock(header, txs, nil, receipts, new(trie.Trie)), nil } // Authorize injects a private key into the consensus engine to mint new blocks diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index bbc554951d..bdc02098af 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" "golang.org/x/crypto/sha3" ) @@ -583,7 +584,7 @@ func (ethash *Ethash) FinalizeAndAssemble(chain consensus.ChainHeaderReader, hea header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number)) // Header seems complete, assemble into a block and return - return types.NewBlock(header, txs, uncles, receipts), nil + return types.NewBlock(header, txs, uncles, receipts, new(trie.Trie)), nil } // SealHash returns the hash of a block prior to it being sealed. diff --git a/core/block_validator.go b/core/block_validator.go index b36ca56d7f..b7af12ff9e 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -23,6 +23,7 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/trie" ) // BlockValidator is responsible for validating block headers, uncles and @@ -61,7 +62,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { if hash := types.CalcUncleHash(block.Uncles()); hash != header.UncleHash { return fmt.Errorf("uncle root hash mismatch: have %x, want %x", hash, header.UncleHash) } - if hash := types.DeriveSha(block.Transactions()); hash != header.TxHash { + if hash := types.DeriveSha(block.Transactions(), new(trie.Trie)); hash != header.TxHash { return fmt.Errorf("transaction root hash mismatch: have %x, want %x", hash, header.TxHash) } if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) { @@ -89,7 +90,7 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD return fmt.Errorf("invalid bloom (remote: %x local: %x)", header.Bloom, rbloom) } // Tre receipt Trie's root (R = (Tr [[H1, R1], ... [Hn, R1]])) - receiptSha := types.DeriveSha(receipts) + receiptSha := types.DeriveSha(receipts, new(trie.Trie)) if receiptSha != header.ReceiptHash { return fmt.Errorf("invalid receipt root hash (remote: %x local: %x)", header.ReceiptHash, receiptSha) } diff --git a/core/blockchain.go b/core/blockchain.go index 9dc1fa9c65..8434d2193f 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -112,7 +112,10 @@ const ( // - Version 7 // The following incompatible database changes were added: // * Use freezer as the ancient database to maintain all ancient data - BlockChainVersion uint64 = 7 + // - Version 8 + // The following incompatible database changes were added: + // * New scheme for contract code in order to separate the codes and trie nodes + BlockChainVersion uint64 = 8 ) // CacheConfig contains the configuration values for the trie caching/pruning @@ -895,12 +898,30 @@ func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types. return uncles } -// TrieNode retrieves a blob of data associated with a trie node (or code hash) +// TrieNode retrieves a blob of data associated with a trie node // either from ephemeral in-memory cache, or from persistent storage. func (bc *BlockChain) TrieNode(hash common.Hash) ([]byte, error) { return bc.stateCache.TrieDB().Node(hash) } +// ContractCode retrieves a blob of data associated with a contract hash +// either from ephemeral in-memory cache, or from persistent storage. +func (bc *BlockChain) ContractCode(hash common.Hash) ([]byte, error) { + return bc.stateCache.ContractCode(common.Hash{}, hash) +} + +// ContractCodeWithPrefix retrieves a blob of data associated with a contract +// hash either from ephemeral in-memory cache, or from persistent storage. +// +// If the code doesn't exist in the in-memory cache, check the storage with +// new code scheme. +func (bc *BlockChain) ContractCodeWithPrefix(hash common.Hash) ([]byte, error) { + type codeReader interface { + ContractCodeWithPrefix(addrHash, codeHash common.Hash) ([]byte, error) + } + return bc.stateCache.(codeReader).ContractCodeWithPrefix(common.Hash{}, hash) +} + // Stop stops the blockchain service. If any imports are currently in progress // it will abort them using the procInterrupt. func (bc *BlockChain) Stop() { diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 41fc4920ca..7ec62b11dd 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -36,6 +36,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/trie" ) // So we can deterministically seed different blockchains @@ -681,12 +682,12 @@ func TestFastVsFullChains(t *testing.T) { } if fblock, arblock, anblock := fast.GetBlockByHash(hash), archive.GetBlockByHash(hash), ancient.GetBlockByHash(hash); fblock.Hash() != arblock.Hash() || anblock.Hash() != arblock.Hash() { t.Errorf("block #%d [%x]: block mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock, anblock, arblock) - } else if types.DeriveSha(fblock.Transactions()) != types.DeriveSha(arblock.Transactions()) || types.DeriveSha(anblock.Transactions()) != types.DeriveSha(arblock.Transactions()) { + } else if types.DeriveSha(fblock.Transactions(), new(trie.Trie)) != types.DeriveSha(arblock.Transactions(), new(trie.Trie)) || types.DeriveSha(anblock.Transactions(), new(trie.Trie)) != types.DeriveSha(arblock.Transactions(), new(trie.Trie)) { t.Errorf("block #%d [%x]: transactions mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock.Transactions(), anblock.Transactions(), arblock.Transactions()) } else if types.CalcUncleHash(fblock.Uncles()) != types.CalcUncleHash(arblock.Uncles()) || types.CalcUncleHash(anblock.Uncles()) != types.CalcUncleHash(arblock.Uncles()) { t.Errorf("block #%d [%x]: uncles mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock.Uncles(), anblock, arblock.Uncles()) } - if freceipts, anreceipts, areceipts := rawdb.ReadReceipts(fastDb, hash, *rawdb.ReadHeaderNumber(fastDb, hash), fast.Config()), rawdb.ReadReceipts(ancientDb, hash, *rawdb.ReadHeaderNumber(ancientDb, hash), fast.Config()), rawdb.ReadReceipts(archiveDb, hash, *rawdb.ReadHeaderNumber(archiveDb, hash), fast.Config()); types.DeriveSha(freceipts) != types.DeriveSha(areceipts) { + if freceipts, anreceipts, areceipts := rawdb.ReadReceipts(fastDb, hash, *rawdb.ReadHeaderNumber(fastDb, hash), fast.Config()), rawdb.ReadReceipts(ancientDb, hash, *rawdb.ReadHeaderNumber(ancientDb, hash), fast.Config()), rawdb.ReadReceipts(archiveDb, hash, *rawdb.ReadHeaderNumber(archiveDb, hash), fast.Config()); types.DeriveSha(freceipts, new(trie.Trie)) != types.DeriveSha(areceipts, new(trie.Trie)) { t.Errorf("block #%d [%x]: receipts mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, freceipts, anreceipts, areceipts) } } diff --git a/core/genesis.go b/core/genesis.go index a4790854b6..4525b9c174 100644 --- a/core/genesis.go +++ b/core/genesis.go @@ -36,6 +36,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" ) //go:generate gencodec -type Genesis -field-override genesisSpecMarshaling -out gen_genesis.go @@ -287,7 +288,7 @@ func (g *Genesis) ToBlock(db ethdb.Database) *types.Block { statedb.Commit(false) statedb.Database().TrieDB().Commit(root, true, nil) - return types.NewBlock(head, nil, nil, nil) + return types.NewBlock(head, nil, nil, nil, new(trie.Trie)) } // Commit writes the block and state of a genesis specification to the database. diff --git a/core/rawdb/accessors_chain_test.go b/core/rawdb/accessors_chain_test.go index 3eba2a3b4e..074c24d8fe 100644 --- a/core/rawdb/accessors_chain_test.go +++ b/core/rawdb/accessors_chain_test.go @@ -84,7 +84,7 @@ func TestBodyStorage(t *testing.T) { WriteBody(db, hash, 0, body) if entry := ReadBody(db, hash, 0); entry == nil { t.Fatalf("Stored body not found") - } else if types.DeriveSha(types.Transactions(entry.Transactions)) != types.DeriveSha(types.Transactions(body.Transactions)) || types.CalcUncleHash(entry.Uncles) != types.CalcUncleHash(body.Uncles) { + } else if types.DeriveSha(types.Transactions(entry.Transactions), newHasher()) != types.DeriveSha(types.Transactions(body.Transactions), newHasher()) || types.CalcUncleHash(entry.Uncles) != types.CalcUncleHash(body.Uncles) { t.Fatalf("Retrieved body mismatch: have %v, want %v", entry, body) } if entry := ReadBodyRLP(db, hash, 0); entry == nil { @@ -138,7 +138,7 @@ func TestBlockStorage(t *testing.T) { } if entry := ReadBody(db, block.Hash(), block.NumberU64()); entry == nil { t.Fatalf("Stored body not found") - } else if types.DeriveSha(types.Transactions(entry.Transactions)) != types.DeriveSha(block.Transactions()) || types.CalcUncleHash(entry.Uncles) != types.CalcUncleHash(block.Uncles()) { + } else if types.DeriveSha(types.Transactions(entry.Transactions), newHasher()) != types.DeriveSha(block.Transactions(), newHasher()) || types.CalcUncleHash(entry.Uncles) != types.CalcUncleHash(block.Uncles()) { t.Fatalf("Retrieved body mismatch: have %v, want %v", entry, block.Body()) } // Delete the block and verify the execution diff --git a/core/rawdb/accessors_indexes_test.go b/core/rawdb/accessors_indexes_test.go index 49d00f9900..87338c62bf 100644 --- a/core/rawdb/accessors_indexes_test.go +++ b/core/rawdb/accessors_indexes_test.go @@ -18,6 +18,7 @@ package rawdb import ( "bytes" + "hash" "math/big" "testing" @@ -26,8 +27,33 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "golang.org/x/crypto/sha3" ) +// testHasher is the helper tool for transaction/receipt list hashing. +// The original hasher is trie, in order to get rid of import cycle, +// use the testing hasher instead. +type testHasher struct { + hasher hash.Hash +} + +func newHasher() *testHasher { + return &testHasher{hasher: sha3.NewLegacyKeccak256()} +} + +func (h *testHasher) Reset() { + h.hasher.Reset() +} + +func (h *testHasher) Update(key, val []byte) { + h.hasher.Write(key) + h.hasher.Write(val) +} + +func (h *testHasher) Hash() common.Hash { + return common.BytesToHash(h.hasher.Sum(nil)) +} + // Tests that positional lookup metadata can be stored and retrieved. func TestLookupStorage(t *testing.T) { tests := []struct { @@ -73,7 +99,7 @@ func TestLookupStorage(t *testing.T) { tx3 := types.NewTransaction(3, common.BytesToAddress([]byte{0x33}), big.NewInt(333), 3333, big.NewInt(33333), []byte{0x33, 0x33, 0x33}) txs := []*types.Transaction{tx1, tx2, tx3} - block := types.NewBlock(&types.Header{Number: big.NewInt(314)}, txs, nil, nil) + block := types.NewBlock(&types.Header{Number: big.NewInt(314)}, txs, nil, nil, newHasher()) // Check that no transactions entries are in a pristine database for i, tx := range txs { diff --git a/core/rawdb/accessors_metadata.go b/core/rawdb/accessors_metadata.go index f8d09fbddf..14a302a127 100644 --- a/core/rawdb/accessors_metadata.go +++ b/core/rawdb/accessors_metadata.go @@ -79,20 +79,3 @@ func WriteChainConfig(db ethdb.KeyValueWriter, hash common.Hash, cfg *params.Cha log.Crit("Failed to store chain config", "err", err) } } - -// ReadPreimage retrieves a single preimage of the provided hash. -func ReadPreimage(db ethdb.KeyValueReader, hash common.Hash) []byte { - data, _ := db.Get(preimageKey(hash)) - return data -} - -// WritePreimages writes the provided set of preimages to the database. -func WritePreimages(db ethdb.KeyValueWriter, preimages map[common.Hash][]byte) { - for hash, preimage := range preimages { - if err := db.Put(preimageKey(hash), preimage); err != nil { - log.Crit("Failed to store trie preimage", "err", err) - } - } - preimageCounter.Inc(int64(len(preimages))) - preimageHitCounter.Inc(int64(len(preimages))) -} diff --git a/core/rawdb/accessors_state.go b/core/rawdb/accessors_state.go new file mode 100644 index 0000000000..6112de03ad --- /dev/null +++ b/core/rawdb/accessors_state.go @@ -0,0 +1,96 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +// ReadPreimage retrieves a single preimage of the provided hash. +func ReadPreimage(db ethdb.KeyValueReader, hash common.Hash) []byte { + data, _ := db.Get(preimageKey(hash)) + return data +} + +// WritePreimages writes the provided set of preimages to the database. +func WritePreimages(db ethdb.KeyValueWriter, preimages map[common.Hash][]byte) { + for hash, preimage := range preimages { + if err := db.Put(preimageKey(hash), preimage); err != nil { + log.Crit("Failed to store trie preimage", "err", err) + } + } + preimageCounter.Inc(int64(len(preimages))) + preimageHitCounter.Inc(int64(len(preimages))) +} + +// ReadCode retrieves the contract code of the provided code hash. +func ReadCode(db ethdb.KeyValueReader, hash common.Hash) []byte { + // Try with the legacy code scheme first, if not then try with current + // scheme. Since most of the code will be found with legacy scheme. + // + // todo(rjl493456442) change the order when we forcibly upgrade the code + // scheme with snapshot. + data, _ := db.Get(hash[:]) + if len(data) != 0 { + return data + } + return ReadCodeWithPrefix(db, hash) +} + +// ReadCodeWithPrefix retrieves the contract code of the provided code hash. +// The main difference between this function and ReadCode is this function +// will only check the existence with latest scheme(with prefix). +func ReadCodeWithPrefix(db ethdb.KeyValueReader, hash common.Hash) []byte { + data, _ := db.Get(codeKey(hash)) + return data +} + +// WriteCode writes the provided contract code database. +func WriteCode(db ethdb.KeyValueWriter, hash common.Hash, code []byte) { + if err := db.Put(codeKey(hash), code); err != nil { + log.Crit("Failed to store contract code", "err", err) + } +} + +// DeleteCode deletes the specified contract code from the database. +func DeleteCode(db ethdb.KeyValueWriter, hash common.Hash) { + if err := db.Delete(codeKey(hash)); err != nil { + log.Crit("Failed to delete contract code", "err", err) + } +} + +// ReadTrieNode retrieves the trie node of the provided hash. +func ReadTrieNode(db ethdb.KeyValueReader, hash common.Hash) []byte { + data, _ := db.Get(hash.Bytes()) + return data +} + +// WriteTrieNode writes the provided trie node database. +func WriteTrieNode(db ethdb.KeyValueWriter, hash common.Hash, node []byte) { + if err := db.Put(hash.Bytes(), node); err != nil { + log.Crit("Failed to store trie node", "err", err) + } +} + +// DeleteTrieNode deletes the specified trie node from the database. +func DeleteTrieNode(db ethdb.KeyValueWriter, hash common.Hash) { + if err := db.Delete(hash.Bytes()); err != nil { + log.Crit("Failed to delete trie node", "err", err) + } +} diff --git a/core/rawdb/chain_iterator_test.go b/core/rawdb/chain_iterator_test.go index c99a97c5f8..c635cd2f12 100644 --- a/core/rawdb/chain_iterator_test.go +++ b/core/rawdb/chain_iterator_test.go @@ -34,11 +34,11 @@ func TestChainIterator(t *testing.T) { var txs []*types.Transaction for i := uint64(0); i <= 10; i++ { if i == 0 { - block = types.NewBlock(&types.Header{Number: big.NewInt(int64(i))}, nil, nil, nil) // Empty genesis block + block = types.NewBlock(&types.Header{Number: big.NewInt(int64(i))}, nil, nil, nil, newHasher()) // Empty genesis block } else { tx := types.NewTransaction(i, common.BytesToAddress([]byte{0x11}), big.NewInt(111), 1111, big.NewInt(11111), []byte{0x11, 0x11, 0x11}) txs = append(txs, tx) - block = types.NewBlock(&types.Header{Number: big.NewInt(int64(i))}, []*types.Transaction{tx}, nil, nil) + block = types.NewBlock(&types.Header{Number: big.NewInt(int64(i))}, []*types.Transaction{tx}, nil, nil, newHasher()) } WriteBlock(chainDb, block) WriteCanonicalHash(chainDb, block.Hash(), block.NumberU64()) diff --git a/core/rawdb/database.go b/core/rawdb/database.go index d22ca1c529..316b5addf3 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -258,6 +258,7 @@ func InspectDatabase(db ethdb.Database) error { numHashPairing common.StorageSize hashNumPairing common.StorageSize trieSize common.StorageSize + codeSize common.StorageSize txlookupSize common.StorageSize accountSnapSize common.StorageSize storageSnapSize common.StorageSize @@ -316,6 +317,8 @@ func InspectDatabase(db ethdb.Database) error { chtTrieNodes += size case bytes.HasPrefix(key, []byte("blt-")) && len(key) == 4+common.HashLength: bloomTrieNodes += size + case bytes.HasPrefix(key, codePrefix) && len(key) == len(codePrefix)+common.HashLength: + codeSize += size case len(key) == common.HashLength: trieSize += size default: @@ -355,6 +358,7 @@ func InspectDatabase(db ethdb.Database) error { {"Key-Value store", "Block hash->number", hashNumPairing.String()}, {"Key-Value store", "Transaction index", txlookupSize.String()}, {"Key-Value store", "Bloombit index", bloomBitsSize.String()}, + {"Key-Value store", "Contract codes", codeSize.String()}, {"Key-Value store", "Trie nodes", trieSize.String()}, {"Key-Value store", "Trie preimages", preimageSize.String()}, {"Key-Value store", "Account snapshot", accountSnapSize.String()}, diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index b87e7888cc..e2b093a34a 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -18,6 +18,7 @@ package rawdb import ( + "bytes" "encoding/binary" "github.com/ethereum/go-ethereum/common" @@ -69,6 +70,7 @@ var ( bloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash -> bloom bits SnapshotAccountPrefix = []byte("a") // SnapshotAccountPrefix + account hash -> account trie value SnapshotStoragePrefix = []byte("o") // SnapshotStoragePrefix + account hash + storage hash -> storage trie value + codePrefix = []byte("c") // codePrefix + code hash -> account code preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage configPrefix = []byte("ethereum-config-") // config prefix for the db @@ -192,6 +194,20 @@ func preimageKey(hash common.Hash) []byte { return append(preimagePrefix, hash.Bytes()...) } +// codeKey = codePrefix + hash +func codeKey(hash common.Hash) []byte { + return append(codePrefix, hash.Bytes()...) +} + +// IsCodeKey reports whether the given byte slice is the key of contract code, +// if so return the raw code hash as well. +func IsCodeKey(key []byte) (bool, []byte) { + if bytes.HasPrefix(key, codePrefix) && len(key) == common.HashLength+len(codePrefix) { + return true, key[len(codePrefix):] + } + return false, nil +} + // configKey = configPrefix + hash func configKey(hash common.Hash) []byte { return append(configPrefix, hash.Bytes()...) diff --git a/core/state/database.go b/core/state/database.go index 7bcec6d003..a9342f5179 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -17,9 +17,12 @@ package state import ( + "errors" "fmt" + "github.com/VictoriaMetrics/fastcache" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/trie" lru "github.com/hashicorp/golang-lru" @@ -28,6 +31,9 @@ import ( const ( // Number of codehash->size associations to keep. codeSizeCacheSize = 100000 + + // Cache size granted for caching clean code. + codeCacheSize = 64 * 1024 * 1024 ) // Database wraps access to tries and contract code. @@ -111,12 +117,14 @@ func NewDatabaseWithCache(db ethdb.Database, cache int, journal string) Database return &cachingDB{ db: trie.NewDatabaseWithCache(db, cache, journal), codeSizeCache: csc, + codeCache: fastcache.New(codeCacheSize), } } type cachingDB struct { db *trie.Database codeSizeCache *lru.Cache + codeCache *fastcache.Cache } // OpenTrie opens the main account trie at a specific root hash. @@ -141,11 +149,32 @@ func (db *cachingDB) CopyTrie(t Trie) Trie { // ContractCode retrieves a particular contract's code. func (db *cachingDB) ContractCode(addrHash, codeHash common.Hash) ([]byte, error) { - code, err := db.db.Node(codeHash) - if err == nil { + if code := db.codeCache.Get(nil, codeHash.Bytes()); len(code) > 0 { + return code, nil + } + code := rawdb.ReadCode(db.db.DiskDB(), codeHash) + if len(code) > 0 { + db.codeCache.Set(codeHash.Bytes(), code) + db.codeSizeCache.Add(codeHash, len(code)) + return code, nil + } + return nil, errors.New("not found") +} + +// ContractCodeWithPrefix retrieves a particular contract's code. If the +// code can't be found in the cache, then check the existence with **new** +// db scheme. +func (db *cachingDB) ContractCodeWithPrefix(addrHash, codeHash common.Hash) ([]byte, error) { + if code := db.codeCache.Get(nil, codeHash.Bytes()); len(code) > 0 { + return code, nil + } + code := rawdb.ReadCodeWithPrefix(db.db.DiskDB(), codeHash) + if len(code) > 0 { + db.codeCache.Set(codeHash.Bytes(), code) db.codeSizeCache.Add(codeHash, len(code)) + return code, nil } - return code, err + return nil, errors.New("not found") } // ContractCodeSize retrieves a particular contracts code's size. diff --git a/core/state/iterator_test.go b/core/state/iterator_test.go index 5060f7a651..d1afe9ca3e 100644 --- a/core/state/iterator_test.go +++ b/core/state/iterator_test.go @@ -28,6 +28,7 @@ import ( func TestNodeIteratorCoverage(t *testing.T) { // Create some arbitrary test state to iterate db, root, _ := makeTestState() + db.TrieDB().Commit(root, false, nil) state, err := New(root, db, nil) if err != nil { @@ -42,7 +43,10 @@ func TestNodeIteratorCoverage(t *testing.T) { } // Cross check the iterated hashes and the database/nodepool content for hash := range hashes { - if _, err := db.TrieDB().Node(hash); err != nil { + if _, err = db.TrieDB().Node(hash); err != nil { + _, err = db.ContractCode(common.Hash{}, hash) + } + if err != nil { t.Errorf("failed to retrieve reported node %x", hash) } } diff --git a/core/state/statedb.go b/core/state/statedb.go index 0134a9d443..cd020e6543 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -25,6 +25,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" @@ -42,9 +43,6 @@ type revision struct { var ( // emptyRoot is the known root hash of an empty trie. emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421") - - // emptyCode is the known hash of the empty EVM bytecode. - emptyCode = crypto.Keccak256Hash(nil) ) type proofList [][]byte @@ -589,7 +587,10 @@ func (s *StateDB) createObject(addr common.Address) (newobj, prev *stateObject) s.journal.append(resetObjectChange{prev: prev, prevdestruct: prevdestruct}) } s.setStateObject(newobj) - return newobj, prev + if prev != nil && !prev.deleted { + return newobj, prev + } + return newobj, nil } // CreateAccount explicitly creates a state object. If a state object with the address @@ -816,11 +817,12 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { s.IntermediateRoot(deleteEmptyObjects) // Commit objects to the trie, measuring the elapsed time + codeWriter := s.db.TrieDB().DiskDB().NewBatch() for addr := range s.stateObjectsDirty { if obj := s.stateObjects[addr]; !obj.deleted { // Write any contract code associated with the state object if obj.code != nil && obj.dirtyCode { - s.db.TrieDB().InsertBlob(common.BytesToHash(obj.CodeHash()), obj.code) + rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code) obj.dirtyCode = false } // Write any storage changes in the state object to its storage trie @@ -832,6 +834,11 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { if len(s.stateObjectsDirty) > 0 { s.stateObjectsDirty = make(map[common.Address]struct{}) } + if codeWriter.ValueSize() > 0 { + if err := codeWriter.Write(); err != nil { + log.Crit("Failed to commit dirty codes", "error", err) + } + } // Write the account trie changes, measuing the amount of wasted time var start time.Time if metrics.EnabledExpensive { @@ -847,10 +854,6 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { if account.Root != emptyRoot { s.db.TrieDB().Reference(account.Root, parent) } - code := common.BytesToHash(account.CodeHash) - if code != emptyCode { - s.db.TrieDB().Reference(code, parent) - } return nil }) if metrics.EnabledExpensive { diff --git a/core/state/sync.go b/core/state/sync.go index ef79305273..052cfad7bb 100644 --- a/core/state/sync.go +++ b/core/state/sync.go @@ -34,7 +34,7 @@ func NewStateSync(root common.Hash, database ethdb.KeyValueReader, bloom *trie.S return err } syncer.AddSubTrie(obj.Root, 64, parent, nil) - syncer.AddRawEntry(common.BytesToHash(obj.CodeHash), 64, parent) + syncer.AddCodeEntry(common.BytesToHash(obj.CodeHash), 64, parent) return nil } syncer = trie.NewSync(root, database, callback, bloom) diff --git a/core/state/sync_test.go b/core/state/sync_test.go index 924c8c2f90..17670750ed 100644 --- a/core/state/sync_test.go +++ b/core/state/sync_test.go @@ -133,13 +133,17 @@ func TestEmptyStateSync(t *testing.T) { // Tests that given a root hash, a state can sync iteratively on a single thread, // requesting retrieval tasks and returning all of them in one go. -func TestIterativeStateSyncIndividual(t *testing.T) { testIterativeStateSync(t, 1) } -func TestIterativeStateSyncBatched(t *testing.T) { testIterativeStateSync(t, 100) } +func TestIterativeStateSyncIndividual(t *testing.T) { testIterativeStateSync(t, 1, false) } +func TestIterativeStateSyncBatched(t *testing.T) { testIterativeStateSync(t, 100, false) } +func TestIterativeStateSyncIndividualFromDisk(t *testing.T) { testIterativeStateSync(t, 1, true) } +func TestIterativeStateSyncBatchedFromDisk(t *testing.T) { testIterativeStateSync(t, 100, true) } -func testIterativeStateSync(t *testing.T, count int) { +func testIterativeStateSync(t *testing.T, count int, commit bool) { // Create a random state to copy srcDb, srcRoot, srcAccounts := makeTestState() - + if commit { + srcDb.TrieDB().Commit(srcRoot, false, nil) + } // Create a destination state and sync with the scheduler dstDb := rawdb.NewMemoryDatabase() sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb)) @@ -149,13 +153,18 @@ func testIterativeStateSync(t *testing.T, count int) { results := make([]trie.SyncResult, len(queue)) for i, hash := range queue { data, err := srcDb.TrieDB().Node(hash) + if err != nil { + data, err = srcDb.ContractCode(common.Hash{}, hash) + } if err != nil { t.Fatalf("failed to retrieve node data for %x", hash) } results[i] = trie.SyncResult{Hash: hash, Data: data} } - if _, index, err := sched.Process(results); err != nil { - t.Fatalf("failed to process result #%d: %v", index, err) + for _, result := range results { + if err := sched.Process(result); err != nil { + t.Fatalf("failed to process result %v", err) + } } batch := dstDb.NewBatch() if err := sched.Commit(batch); err != nil { @@ -184,13 +193,18 @@ func TestIterativeDelayedStateSync(t *testing.T) { results := make([]trie.SyncResult, len(queue)/2+1) for i, hash := range queue[:len(results)] { data, err := srcDb.TrieDB().Node(hash) + if err != nil { + data, err = srcDb.ContractCode(common.Hash{}, hash) + } if err != nil { t.Fatalf("failed to retrieve node data for %x", hash) } results[i] = trie.SyncResult{Hash: hash, Data: data} } - if _, index, err := sched.Process(results); err != nil { - t.Fatalf("failed to process result #%d: %v", index, err) + for _, result := range results { + if err := sched.Process(result); err != nil { + t.Fatalf("failed to process result %v", err) + } } batch := dstDb.NewBatch() if err := sched.Commit(batch); err != nil { @@ -226,14 +240,19 @@ func testIterativeRandomStateSync(t *testing.T, count int) { results := make([]trie.SyncResult, 0, len(queue)) for hash := range queue { data, err := srcDb.TrieDB().Node(hash) + if err != nil { + data, err = srcDb.ContractCode(common.Hash{}, hash) + } if err != nil { t.Fatalf("failed to retrieve node data for %x", hash) } results = append(results, trie.SyncResult{Hash: hash, Data: data}) } // Feed the retrieved results back and queue new tasks - if _, index, err := sched.Process(results); err != nil { - t.Fatalf("failed to process result #%d: %v", index, err) + for _, result := range results { + if err := sched.Process(result); err != nil { + t.Fatalf("failed to process result %v", err) + } } batch := dstDb.NewBatch() if err := sched.Commit(batch); err != nil { @@ -270,6 +289,9 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) { delete(queue, hash) data, err := srcDb.TrieDB().Node(hash) + if err != nil { + data, err = srcDb.ContractCode(common.Hash{}, hash) + } if err != nil { t.Fatalf("failed to retrieve node data for %x", hash) } @@ -280,8 +302,10 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) { } } // Feed the retrieved results back and queue new tasks - if _, index, err := sched.Process(results); err != nil { - t.Fatalf("failed to process result #%d: %v", index, err) + for _, result := range results { + if err := sched.Process(result); err != nil { + t.Fatalf("failed to process result %v", err) + } } batch := dstDb.NewBatch() if err := sched.Commit(batch); err != nil { @@ -302,6 +326,15 @@ func TestIncompleteStateSync(t *testing.T) { // Create a random state to copy srcDb, srcRoot, srcAccounts := makeTestState() + // isCode reports whether the hash is contract code hash. + isCode := func(hash common.Hash) bool { + for _, acc := range srcAccounts { + if hash == crypto.Keccak256Hash(acc.code) { + return true + } + } + return false + } checkTrieConsistency(srcDb.TrieDB().DiskDB().(ethdb.Database), srcRoot) // Create a destination state and sync with the scheduler @@ -315,14 +348,19 @@ func TestIncompleteStateSync(t *testing.T) { results := make([]trie.SyncResult, len(queue)) for i, hash := range queue { data, err := srcDb.TrieDB().Node(hash) + if err != nil { + data, err = srcDb.ContractCode(common.Hash{}, hash) + } if err != nil { t.Fatalf("failed to retrieve node data for %x", hash) } results[i] = trie.SyncResult{Hash: hash, Data: data} } // Process each of the state nodes - if _, index, err := sched.Process(results); err != nil { - t.Fatalf("failed to process result #%d: %v", index, err) + for _, result := range results { + if err := sched.Process(result); err != nil { + t.Fatalf("failed to process result %v", err) + } } batch := dstDb.NewBatch() if err := sched.Commit(batch); err != nil { @@ -333,12 +371,9 @@ func TestIncompleteStateSync(t *testing.T) { added = append(added, result.Hash) } // Check that all known sub-tries added so far are complete or missing entirely. - checkSubtries: for _, hash := range added { - for _, acc := range srcAccounts { - if hash == crypto.Keccak256Hash(acc.code) { - continue checkSubtries // skip trie check of code nodes. - } + if isCode(hash) { + continue } // Can't use checkStateConsistency here because subtrie keys may have odd // length and crash in LeafKey. @@ -351,13 +386,25 @@ func TestIncompleteStateSync(t *testing.T) { } // Sanity check that removing any node from the database is detected for _, node := range added[1:] { - key := node.Bytes() - value, _ := dstDb.Get(key) - - dstDb.Delete(key) + var ( + key = node.Bytes() + code = isCode(node) + val []byte + ) + if code { + val = rawdb.ReadCode(dstDb, node) + rawdb.DeleteCode(dstDb, node) + } else { + val = rawdb.ReadTrieNode(dstDb, node) + rawdb.DeleteTrieNode(dstDb, node) + } if err := checkStateConsistency(dstDb, added[0]); err == nil { t.Fatalf("trie inconsistency not caught, missing: %x", key) } - dstDb.Put(key, value) + if code { + rawdb.WriteCode(dstDb, node, val) + } else { + rawdb.WriteTrieNode(dstDb, node, val) + } } } diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index f87d6fbea9..4fca734e65 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -34,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/trie" ) // testTxPoolConfig is a transaction pool configuration without stateful disk @@ -54,7 +55,7 @@ type testBlockChain struct { func (bc *testBlockChain) CurrentBlock() *types.Block { return types.NewBlock(&types.Header{ GasLimit: bc.gasLimit, - }, nil, nil, nil) + }, nil, nil, nil, new(trie.Trie)) } func (bc *testBlockChain) GetBlock(hash common.Hash, number uint64) *types.Block { diff --git a/core/types/block.go b/core/types/block.go index 8316cd7f3a..8096ebb755 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -35,7 +35,7 @@ import ( ) var ( - EmptyRootHash = DeriveSha(Transactions{}) + EmptyRootHash = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421") EmptyUncleHash = rlpHash([]*Header(nil)) ) @@ -221,14 +221,14 @@ type storageblock struct { // The values of TxHash, UncleHash, ReceiptHash and Bloom in header // are ignored and set to values derived from the given txs, uncles // and receipts. -func NewBlock(header *Header, txs []*Transaction, uncles []*Header, receipts []*Receipt) *Block { +func NewBlock(header *Header, txs []*Transaction, uncles []*Header, receipts []*Receipt, hasher Hasher) *Block { b := &Block{header: CopyHeader(header), td: new(big.Int)} // TODO: panic if len(txs) != len(receipts) if len(txs) == 0 { b.header.TxHash = EmptyRootHash } else { - b.header.TxHash = DeriveSha(Transactions(txs)) + b.header.TxHash = DeriveSha(Transactions(txs), hasher) b.transactions = make(Transactions, len(txs)) copy(b.transactions, txs) } @@ -236,7 +236,7 @@ func NewBlock(header *Header, txs []*Transaction, uncles []*Header, receipts []* if len(receipts) == 0 { b.header.ReceiptHash = EmptyRootHash } else { - b.header.ReceiptHash = DeriveSha(Receipts(receipts)) + b.header.ReceiptHash = DeriveSha(Receipts(receipts), hasher) b.header.Bloom = CreateBloom(receipts) } diff --git a/core/types/block_test.go b/core/types/block_test.go index 46ad00c6eb..4dfdcf9545 100644 --- a/core/types/block_test.go +++ b/core/types/block_test.go @@ -18,6 +18,7 @@ package types import ( "bytes" + "hash" "math/big" "reflect" "testing" @@ -27,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "golang.org/x/crypto/sha3" ) // from bcValidBlockTest.json, "SimpleTx" @@ -90,6 +92,30 @@ func BenchmarkEncodeBlock(b *testing.B) { } } +// testHasher is the helper tool for transaction/receipt list hashing. +// The original hasher is trie, in order to get rid of import cycle, +// use the testing hasher instead. +type testHasher struct { + hasher hash.Hash +} + +func newHasher() *testHasher { + return &testHasher{hasher: sha3.NewLegacyKeccak256()} +} + +func (h *testHasher) Reset() { + h.hasher.Reset() +} + +func (h *testHasher) Update(key, val []byte) { + h.hasher.Write(key) + h.hasher.Write(val) +} + +func (h *testHasher) Hash() common.Hash { + return common.BytesToHash(h.hasher.Sum(nil)) +} + func makeBenchBlock() *Block { var ( key, _ = crypto.GenerateKey() @@ -128,5 +154,5 @@ func makeBenchBlock() *Block { Extra: []byte("benchmark uncle"), } } - return NewBlock(header, txs, uncles, receipts) + return NewBlock(header, txs, uncles, receipts, newHasher()) } diff --git a/core/types/derive_sha.go b/core/types/derive_sha.go index 00c42c5bc6..7d40c7f660 100644 --- a/core/types/derive_sha.go +++ b/core/types/derive_sha.go @@ -21,21 +21,28 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/trie" ) +// DerivableList is the interface which can derive the hash. type DerivableList interface { Len() int GetRlp(i int) []byte } -func DeriveSha(list DerivableList) common.Hash { +// Hasher is the tool used to calculate the hash of derivable list. +type Hasher interface { + Reset() + Update([]byte, []byte) + Hash() common.Hash +} + +func DeriveSha(list DerivableList, hasher Hasher) common.Hash { + hasher.Reset() keybuf := new(bytes.Buffer) - trie := new(trie.Trie) for i := 0; i < list.Len(); i++ { keybuf.Reset() rlp.Encode(keybuf, uint(i)) - trie.Update(keybuf.Bytes(), list.GetRlp(i)) + hasher.Update(keybuf.Bytes(), list.GetRlp(i)) } - return trie.Hash() + return hasher.Hash() } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f3b0926d77..59b5abaa60 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -109,7 +109,7 @@ type Downloader struct { peers *peerSet // Set of active peers from which download can proceed stateDB ethdb.Database // Database to state sync into (and deduplicate via) - stateBloom *trie.SyncBloom // Bloom filter for fast trie node existence checks + stateBloom *trie.SyncBloom // Bloom filter for fast trie node and contract code existence checks // Statistics syncStatsChainOrigin uint64 // Origin block number where syncing started at diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 87225cb625..aba4d5dbf7 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/trie" ) const ( @@ -771,7 +772,7 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi q.lock.Lock() defer q.lock.Unlock() validate := func(index int, header *types.Header) error { - if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash { + if types.DeriveSha(types.Transactions(txLists[index]), new(trie.Trie)) != header.TxHash { return errInvalidBody } if types.CalcUncleHash(uncleLists[index]) != header.UncleHash { @@ -796,7 +797,7 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, q.lock.Lock() defer q.lock.Unlock() validate := func(index int, header *types.Header) error { - if types.DeriveSha(types.Receipts(receiptList[index])) != header.ReceiptHash { + if types.DeriveSha(types.Receipts(receiptList[index]), new(trie.Trie)) != header.ReceiptHash { return errInvalidReceipt } return nil diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index 25c8fccb5b..bf9e96fe2a 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -474,7 +474,7 @@ func (s *stateSync) process(req *stateReq) (int, error) { // Iterate over all the delivered data and inject one-by-one into the trie for _, blob := range req.response { - _, hash, err := s.processNodeData(blob) + hash, err := s.processNodeData(blob) switch err { case nil: s.numUncommitted++ @@ -512,13 +512,13 @@ func (s *stateSync) process(req *stateReq) (int, error) { // processNodeData tries to inject a trie node data blob delivered from a remote // peer into the state trie, returning whether anything useful was written or any // error occurred. -func (s *stateSync) processNodeData(blob []byte) (bool, common.Hash, error) { +func (s *stateSync) processNodeData(blob []byte) (common.Hash, error) { res := trie.SyncResult{Data: blob} s.keccak.Reset() s.keccak.Write(blob) s.keccak.Sum(res.Hash[:0]) - committed, _, err := s.sched.Process([]trie.SyncResult{res}) - return committed, res.Hash, err + err := s.sched.Process(res) + return res.Hash, err } // updateStats bumps the various state sync progress counters and displays a log diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index 2c2dabad96..270aaf5918 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/trie" ) const ( @@ -540,7 +541,7 @@ func (f *BlockFetcher) loop() { announce.time = task.time // If the block is empty (header only), short circuit into the final import queue - if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) { + if header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash { log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash()) block := types.NewBlockWithHeader(header) @@ -619,7 +620,7 @@ func (f *BlockFetcher) loop() { continue } if txnHash == (common.Hash{}) { - txnHash = types.DeriveSha(types.Transactions(task.transactions[i])) + txnHash = types.DeriveSha(types.Transactions(task.transactions[i]), new(trie.Trie)) } if txnHash != announce.header.TxHash { continue diff --git a/eth/fetcher/block_fetcher_test.go b/eth/fetcher/block_fetcher_test.go index a6854ffcf8..3220002a99 100644 --- a/eth/fetcher/block_fetcher_test.go +++ b/eth/fetcher/block_fetcher_test.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/trie" ) var ( @@ -38,7 +39,7 @@ var ( testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") testAddress = crypto.PubkeyToAddress(testKey.PublicKey) genesis = core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000)) - unknownBlock = types.NewBlock(&types.Header{GasLimit: params.GenesisGasLimit}, nil, nil, nil) + unknownBlock = types.NewBlock(&types.Header{GasLimit: params.GenesisGasLimit}, nil, nil, nil, new(trie.Trie)) ) // makeChain creates a chain of n blocks starting at and including parent. diff --git a/eth/handler.go b/eth/handler.go index 1a15765dda..3a051abf5e 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -608,7 +608,14 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Retrieve the requested state entry, stopping if enough was found - if entry, err := pm.blockchain.TrieNode(hash); err == nil { + // todo now the code and trienode is mixed in the protocol level, + // separate these two types. + entry, err := pm.blockchain.TrieNode(hash) + if len(entry) == 0 || err != nil { + // Read the contract code with prefix only to save unnecessary lookups. + entry, err = pm.blockchain.ContractCodeWithPrefix(hash) + } + if err == nil && len(entry) > 0 { data = append(data, entry) bytes += len(entry) } @@ -703,7 +710,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { log.Warn("Propagated block has invalid uncles", "have", hash, "exp", request.Block.UncleHash()) break // TODO(karalabe): return error eventually, but wait a few releases } - if hash := types.DeriveSha(request.Block.Transactions()); hash != request.Block.TxHash() { + if hash := types.DeriveSha(request.Block.Transactions(), new(trie.Trie)); hash != request.Block.TxHash() { log.Warn("Propagated block has invalid body", "have", hash, "exp", request.Block.TxHash()) break // TODO(karalabe): return error eventually, but wait a few releases } diff --git a/les/odr_requests.go b/les/odr_requests.go index 8c1e0102f5..3cc55c98d8 100644 --- a/les/odr_requests.go +++ b/les/odr_requests.go @@ -116,7 +116,7 @@ func (r *BlockRequest) Validate(db ethdb.Database, msg *Msg) error { if r.Header == nil { return errHeaderUnavailable } - if r.Header.TxHash != types.DeriveSha(types.Transactions(body.Transactions)) { + if r.Header.TxHash != types.DeriveSha(types.Transactions(body.Transactions), new(trie.Trie)) { return errTxHashMismatch } if r.Header.UncleHash != types.CalcUncleHash(body.Uncles) { @@ -174,7 +174,7 @@ func (r *ReceiptsRequest) Validate(db ethdb.Database, msg *Msg) error { if r.Header == nil { return errHeaderUnavailable } - if r.Header.ReceiptHash != types.DeriveSha(receipt) { + if r.Header.ReceiptHash != types.DeriveSha(receipt, new(trie.Trie)) { return errReceiptHashMismatch } // Validations passed, store and return diff --git a/les/server_handler.go b/les/server_handler.go index c474363232..463f51cb43 100644 --- a/les/server_handler.go +++ b/les/server_handler.go @@ -489,7 +489,7 @@ func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error { p.bumpInvalid() continue } - code, err := triedb.Node(common.BytesToHash(account.CodeHash)) + code, err := h.blockchain.StateCache().ContractCode(common.BytesToHash(request.AccKey), common.BytesToHash(account.CodeHash)) if err != nil { p.Log().Warn("Failed to retrieve account code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "codehash", common.BytesToHash(account.CodeHash), "err", err) continue diff --git a/light/odr.go b/light/odr.go index 1ea98ca5aa..0b854b0b6c 100644 --- a/light/odr.go +++ b/light/odr.go @@ -101,7 +101,7 @@ type CodeRequest struct { // StoreResult stores the retrieved data in local database func (req *CodeRequest) StoreResult(db ethdb.Database) { - db.Put(req.Hash[:], req.Data) + rawdb.WriteCode(db, req.Hash, req.Data) } // BlockRequest is the ODR request type for retrieving block bodies diff --git a/light/odr_test.go b/light/odr_test.go index 78bf373e60..5f7f4d96cb 100644 --- a/light/odr_test.go +++ b/light/odr_test.go @@ -87,7 +87,7 @@ func (odr *testOdr) Retrieve(ctx context.Context, req OdrRequest) error { t.Prove(req.Key, 0, nodes) req.Proof = nodes case *CodeRequest: - req.Data, _ = odr.sdb.Get(req.Hash[:]) + req.Data = rawdb.ReadCode(odr.sdb, req.Hash) } req.StoreResult(odr.ldb) return nil diff --git a/light/trie.go b/light/trie.go index 0d69e74e21..3eb05f4a3f 100644 --- a/light/trie.go +++ b/light/trie.go @@ -22,6 +22,7 @@ import ( "fmt" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" @@ -70,7 +71,8 @@ func (db *odrDatabase) ContractCode(addrHash, codeHash common.Hash) ([]byte, err if codeHash == sha3Nil { return nil, nil } - if code, err := db.backend.Database().Get(codeHash[:]); err == nil { + code := rawdb.ReadCode(db.backend.Database(), codeHash) + if len(code) != 0 { return code, nil } id := *db.id diff --git a/miner/worker.go b/miner/worker.go index 177e727283..f042fd8e33 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -34,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/trie" ) const ( @@ -711,6 +712,7 @@ func (w *worker) updateSnapshot() { w.current.txs, uncles, w.current.receipts, + new(trie.Trie), ) w.snapshotState = w.current.state.Copy() diff --git a/trie/database.go b/trie/database.go index 0e9f306e63..fa8906b7a3 100644 --- a/trie/database.go +++ b/trie/database.go @@ -27,6 +27,7 @@ import ( "github.com/VictoriaMetrics/fastcache" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" @@ -57,15 +58,6 @@ var ( memcacheCommitSizeMeter = metrics.NewRegisteredMeter("trie/memcache/commit/size", nil) ) -// secureKeyPrefix is the database key prefix used to store trie node preimages. -var secureKeyPrefix = []byte("secure-key-") - -// secureKeyPrefixLength is the length of the above prefix -const secureKeyPrefixLength = 11 - -// secureKeyLength is the length of the above prefix + 32byte hash. -const secureKeyLength = secureKeyPrefixLength + 32 - // Database is an intermediate write layer between the trie data structures and // the disk database. The aim is to accumulate trie writes in-memory and only // periodically flush a couple tries to disk, garbage collecting the remainder. @@ -78,7 +70,7 @@ type Database struct { diskdb ethdb.KeyValueStore // Persistent storage for matured trie nodes cleans *fastcache.Cache // GC friendly memory cache of clean node RLPs - dirties map[common.Hash]*cachedNode // Data and references relationships of dirty nodes + dirties map[common.Hash]*cachedNode // Data and references relationships of dirty trie nodes oldest common.Hash // Oldest tracked node, flush-list head newest common.Hash // Newest tracked node, flush-list tail @@ -139,8 +131,8 @@ type rawShortNode struct { func (n rawShortNode) cache() (hashNode, bool) { panic("this should never end up in a live trie") } func (n rawShortNode) fstring(ind string) string { panic("this should never end up in a live trie") } -// cachedNode is all the information we know about a single cached node in the -// memory database write layer. +// cachedNode is all the information we know about a single cached trie node +// in the memory database write layer. type cachedNode struct { node node // Cached collapsed trie node, or raw rlp data size uint16 // Byte size of the useful cached data @@ -161,8 +153,8 @@ var cachedNodeSize = int(reflect.TypeOf(cachedNode{}).Size()) // reference map. const cachedNodeChildrenSize = 48 -// rlp returns the raw rlp encoded blob of the cached node, either directly from -// the cache, or by regenerating it from the collapsed node. +// rlp returns the raw rlp encoded blob of the cached trie node, either directly +// from the cache, or by regenerating it from the collapsed node. func (n *cachedNode) rlp() []byte { if node, ok := n.node.(rawNode); ok { return node @@ -183,9 +175,9 @@ func (n *cachedNode) obj(hash common.Hash) node { return expandNode(hash[:], n.node) } -// forChilds invokes the callback for all the tracked children of this node, -// both the implicit ones from inside the node as well as the explicit ones -//from outside the node. +// forChilds invokes the callback for all the tracked children of this node, +// both the implicit ones from inside the node as well as the explicit ones +// from outside the node. func (n *cachedNode) forChilds(onChild func(hash common.Hash)) { for child := range n.children { onChild(child) @@ -305,25 +297,14 @@ func NewDatabaseWithCache(diskdb ethdb.KeyValueStore, cache int, journal string) } // DiskDB retrieves the persistent storage backing the trie database. -func (db *Database) DiskDB() ethdb.KeyValueReader { +func (db *Database) DiskDB() ethdb.KeyValueStore { return db.diskdb } -// InsertBlob writes a new reference tracked blob to the memory database if it's -// yet unknown. This method should only be used for non-trie nodes that require -// reference counting, since trie nodes are garbage collected directly through -// their embedded children. -func (db *Database) InsertBlob(hash common.Hash, blob []byte) { - db.lock.Lock() - defer db.lock.Unlock() - - db.insert(hash, len(blob), rawNode(blob)) -} - -// insert inserts a collapsed trie node into the memory database. This method is -// a more generic version of InsertBlob, supporting both raw blob insertions as -// well ex trie node insertions. The blob size must be specified to allow proper -// size tracking. +// insert inserts a collapsed trie node into the memory database. +// The blob size must be specified to allow proper size tracking. +// All nodes inserted by this function will be reference tracked +// and in theory should only used for **trie nodes** insertion. func (db *Database) insert(hash common.Hash, size int, node node) { // If the node's already cached, skip if _, ok := db.dirties[hash]; ok { @@ -430,39 +411,30 @@ func (db *Database) Node(hash common.Hash) ([]byte, error) { memcacheDirtyMissMeter.Mark(1) // Content unavailable in memory, attempt to retrieve from disk - enc, err := db.diskdb.Get(hash[:]) - if err == nil && enc != nil { + enc := rawdb.ReadTrieNode(db.diskdb, hash) + if len(enc) != 0 { if db.cleans != nil { db.cleans.Set(hash[:], enc) memcacheCleanMissMeter.Mark(1) memcacheCleanWriteMeter.Mark(int64(len(enc))) } + return enc, nil } - return enc, err + return nil, errors.New("not found") } // preimage retrieves a cached trie node pre-image from memory. If it cannot be // found cached, the method queries the persistent database for the content. -func (db *Database) preimage(hash common.Hash) ([]byte, error) { +func (db *Database) preimage(hash common.Hash) []byte { // Retrieve the node from cache if available db.lock.RLock() preimage := db.preimages[hash] db.lock.RUnlock() if preimage != nil { - return preimage, nil + return preimage } - // Content unavailable in memory, attempt to retrieve from disk - return db.diskdb.Get(secureKey(hash)) -} - -// secureKey returns the database key for the preimage of key (as a newly -// allocated byte-slice) -func secureKey(hash common.Hash) []byte { - buf := make([]byte, secureKeyLength) - copy(buf, secureKeyPrefix) - copy(buf[secureKeyPrefixLength:], hash[:]) - return buf + return rawdb.ReadPreimage(db.diskdb, hash) } // Nodes retrieves the hashes of all the nodes cached within the memory database. @@ -482,6 +454,9 @@ func (db *Database) Nodes() []common.Hash { } // Reference adds a new reference from a parent node to a child node. +// This function is used to add reference between internal trie node +// and external node(e.g. storage trie root), all internal trie nodes +// are referenced together by database itself. func (db *Database) Reference(child common.Hash, parent common.Hash) { db.lock.Lock() defer db.lock.Unlock() @@ -604,27 +579,16 @@ func (db *Database) Cap(limit common.StorageSize) error { size := db.dirtiesSize + common.StorageSize((len(db.dirties)-1)*cachedNodeSize) size += db.childrenSize - common.StorageSize(len(db.dirties[common.Hash{}].children)*(common.HashLength+2)) - // We reuse an ephemeral buffer for the keys. The batch Put operation - // copies it internally, so we can reuse it. - var keyBuf [secureKeyLength]byte - copy(keyBuf[:], secureKeyPrefix) - // If the preimage cache got large enough, push to disk. If it's still small // leave for later to deduplicate writes. flushPreimages := db.preimagesSize > 4*1024*1024 if flushPreimages { - for hash, preimage := range db.preimages { - copy(keyBuf[secureKeyPrefixLength:], hash[:]) - if err := batch.Put(keyBuf[:], preimage); err != nil { - log.Error("Failed to commit preimage from trie database", "err", err) + rawdb.WritePreimages(batch, db.preimages) + if batch.ValueSize() > ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { return err } - if batch.ValueSize() > ethdb.IdealBatchSize { - if err := batch.Write(); err != nil { - return err - } - batch.Reset() - } + batch.Reset() } } // Keep committing nodes from the flush-list until we're below allowance @@ -632,9 +596,8 @@ func (db *Database) Cap(limit common.StorageSize) error { for size > limit && oldest != (common.Hash{}) { // Fetch the oldest referenced node and push into the batch node := db.dirties[oldest] - if err := batch.Put(oldest[:], node.rlp()); err != nil { - return err - } + rawdb.WriteTrieNode(batch, oldest, node.rlp()) + // If we exceeded the ideal batch size, commit and reset if batch.ValueSize() >= ethdb.IdealBatchSize { if err := batch.Write(); err != nil { @@ -662,8 +625,7 @@ func (db *Database) Cap(limit common.StorageSize) error { defer db.lock.Unlock() if flushPreimages { - db.preimages = make(map[common.Hash][]byte) - db.preimagesSize = 0 + db.preimages, db.preimagesSize = make(map[common.Hash][]byte), 0 } for db.oldest != oldest { node := db.dirties[db.oldest] @@ -706,25 +668,13 @@ func (db *Database) Commit(node common.Hash, report bool, callback func(common.H start := time.Now() batch := db.diskdb.NewBatch() - // We reuse an ephemeral buffer for the keys. The batch Put operation - // copies it internally, so we can reuse it. - var keyBuf [secureKeyLength]byte - copy(keyBuf[:], secureKeyPrefix) - // Move all of the accumulated preimages into a write batch - for hash, preimage := range db.preimages { - copy(keyBuf[secureKeyPrefixLength:], hash[:]) - if err := batch.Put(keyBuf[:], preimage); err != nil { - log.Error("Failed to commit preimage from trie database", "err", err) + rawdb.WritePreimages(batch, db.preimages) + if batch.ValueSize() > ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { return err } - // If the batch is too large, flush to disk - if batch.ValueSize() > ethdb.IdealBatchSize { - if err := batch.Write(); err != nil { - return err - } - batch.Reset() - } + batch.Reset() } // Since we're going to replay trie node writes into the clean cache, flush out // any batched pre-images before continuing. @@ -754,8 +704,7 @@ func (db *Database) Commit(node common.Hash, report bool, callback func(common.H batch.Reset() // Reset the storage counters and bumpd metrics - db.preimages = make(map[common.Hash][]byte) - db.preimagesSize = 0 + db.preimages, db.preimagesSize = make(map[common.Hash][]byte), 0 memcacheCommitTimeTimer.Update(time.Since(start)) memcacheCommitSizeMeter.Mark(int64(storage - db.dirtiesSize)) @@ -791,13 +740,11 @@ func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleane if err != nil { return err } - if err := batch.Put(hash[:], node.rlp()); err != nil { - return err - } + // If we've reached an optimal batch size, commit and start over + rawdb.WriteTrieNode(batch, hash, node.rlp()) if callback != nil { callback(hash) } - // If we've reached an optimal batch size, commit and start over if batch.ValueSize() >= ethdb.IdealBatchSize { if err := batch.Write(); err != nil { return err diff --git a/trie/secure_trie.go b/trie/secure_trie.go index bd8e51d989..ae1bbc6aa9 100644 --- a/trie/secure_trie.go +++ b/trie/secure_trie.go @@ -130,8 +130,7 @@ func (t *SecureTrie) GetKey(shaKey []byte) []byte { if key, ok := t.getSecKeyCache()[string(shaKey)]; ok { return key } - key, _ := t.trie.db.preimage(common.BytesToHash(shaKey)) - return key + return t.trie.db.preimage(common.BytesToHash(shaKey)) } // Commit writes all nodes and the secure hash pre-images to the trie's database. diff --git a/trie/sync.go b/trie/sync.go index 620e97fa30..af99466416 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -22,6 +22,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/prque" + "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" ) @@ -37,7 +38,7 @@ var ErrAlreadyProcessed = errors.New("already processed") type request struct { hash common.Hash // Hash of the node data content to retrieve data []byte // Data content of the node, cached until all subtrees complete - raw bool // Whether this is a raw entry (code) or a trie node + code bool // Whether this is a code entry parents []*request // Parent state nodes referencing this entry (notify all upon completion) depth int // Depth level within the trie the node is located to prioritise DFS @@ -46,8 +47,7 @@ type request struct { callback LeafCallback // Callback to invoke if a leaf node it reached on this branch } -// SyncResult is a simple list to return missing nodes along with their request -// hashes. +// SyncResult is a response with requested data along with it's hash. type SyncResult struct { Hash common.Hash // Hash of the originally unknown trie node Data []byte // Data content of the retrieved node @@ -56,25 +56,40 @@ type SyncResult struct { // syncMemBatch is an in-memory buffer of successfully downloaded but not yet // persisted data items. type syncMemBatch struct { - batch map[common.Hash][]byte // In-memory membatch of recently completed items + nodes map[common.Hash][]byte // In-memory membatch of recently completed nodes + codes map[common.Hash][]byte // In-memory membatch of recently completed codes } // newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes. func newSyncMemBatch() *syncMemBatch { return &syncMemBatch{ - batch: make(map[common.Hash][]byte), + nodes: make(map[common.Hash][]byte), + codes: make(map[common.Hash][]byte), } } +// hasNode reports the trie node with specific hash is already cached. +func (batch *syncMemBatch) hasNode(hash common.Hash) bool { + _, ok := batch.nodes[hash] + return ok +} + +// hasCode reports the contract code with specific hash is already cached. +func (batch *syncMemBatch) hasCode(hash common.Hash) bool { + _, ok := batch.codes[hash] + return ok +} + // Sync is the main state trie synchronisation scheduler, which provides yet // unknown trie hashes to retrieve, accepts node data associated with said hashes // and reconstructs the trie step by step until all is done. type Sync struct { database ethdb.KeyValueReader // Persistent database to check for existing entries membatch *syncMemBatch // Memory buffer to avoid frequent database writes - requests map[common.Hash]*request // Pending requests pertaining to a key hash + nodeReqs map[common.Hash]*request // Pending requests pertaining to a trie node hash + codeReqs map[common.Hash]*request // Pending requests pertaining to a code hash queue *prque.Prque // Priority queue with the pending requests - bloom *SyncBloom // Bloom filter for fast node existence checks + bloom *SyncBloom // Bloom filter for fast state existence checks } // NewSync creates a new trie data download scheduler. @@ -82,7 +97,8 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb ts := &Sync{ database: database, membatch: newSyncMemBatch(), - requests: make(map[common.Hash]*request), + nodeReqs: make(map[common.Hash]*request), + codeReqs: make(map[common.Hash]*request), queue: prque.New(nil), bloom: bloom, } @@ -96,13 +112,15 @@ func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callb if root == emptyRoot { return } - if _, ok := s.membatch.batch[root]; ok { + if s.membatch.hasNode(root) { return } if s.bloom == nil || s.bloom.Contains(root[:]) { - // Bloom filter says this might be a duplicate, double check - blob, _ := s.database.Get(root[:]) - if local, err := decodeNode(root[:], blob); local != nil && err == nil { + // Bloom filter says this might be a duplicate, double check. + // If database says yes, then at least the trie node is present + // and we hold the assumption that it's NOT legacy contract code. + blob := rawdb.ReadTrieNode(s.database, root) + if len(blob) > 0 { return } // False positive, bump fault meter @@ -116,7 +134,7 @@ func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callb } // If this sub-trie has a designated parent, link them together if parent != (common.Hash{}) { - ancestor := s.requests[parent] + ancestor := s.nodeReqs[parent] if ancestor == nil { panic(fmt.Sprintf("sub-trie ancestor not found: %x", parent)) } @@ -126,21 +144,25 @@ func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callb s.schedule(req) } -// AddRawEntry schedules the direct retrieval of a state entry that should not be -// interpreted as a trie node, but rather accepted and stored into the database -// as is. This method's goal is to support misc state metadata retrievals (e.g. -// contract code). -func (s *Sync) AddRawEntry(hash common.Hash, depth int, parent common.Hash) { +// AddCodeEntry schedules the direct retrieval of a contract code that should not +// be interpreted as a trie node, but rather accepted and stored into the database +// as is. +func (s *Sync) AddCodeEntry(hash common.Hash, depth int, parent common.Hash) { // Short circuit if the entry is empty or already known if hash == emptyState { return } - if _, ok := s.membatch.batch[hash]; ok { + if s.membatch.hasCode(hash) { return } if s.bloom == nil || s.bloom.Contains(hash[:]) { - // Bloom filter says this might be a duplicate, double check - if ok, _ := s.database.Has(hash[:]); ok { + // Bloom filter says this might be a duplicate, double check. + // If database says yes, the blob is present for sure. + // Note we only check the existence with new code scheme, fast + // sync is expected to run with a fresh new node. Even there + // exists the code with legacy format, fetch and store with + // new scheme anyway. + if blob := rawdb.ReadCodeWithPrefix(s.database, hash); len(blob) > 0 { return } // False positive, bump fault meter @@ -149,12 +171,12 @@ func (s *Sync) AddRawEntry(hash common.Hash, depth int, parent common.Hash) { // Assemble the new sub-trie sync request req := &request{ hash: hash, - raw: true, + code: true, depth: depth, } // If this sub-trie has a designated parent, link them together if parent != (common.Hash{}) { - ancestor := s.requests[parent] + ancestor := s.nodeReqs[parent] // the parent of codereq can ONLY be nodereq if ancestor == nil { panic(fmt.Sprintf("raw-entry ancestor not found: %x", parent)) } @@ -173,61 +195,64 @@ func (s *Sync) Missing(max int) []common.Hash { return requests } -// Process injects a batch of retrieved trie nodes data, returning if something -// was committed to the database and also the index of an entry if its processing -// failed. -func (s *Sync) Process(results []SyncResult) (bool, int, error) { - committed := false - - for i, item := range results { - // If the item was not requested, bail out - request := s.requests[item.Hash] - if request == nil { - return committed, i, ErrNotRequested - } - if request.data != nil { - return committed, i, ErrAlreadyProcessed - } - // If the item is a raw entry request, commit directly - if request.raw { - request.data = item.Data - s.commit(request) - committed = true - continue - } +// Process injects the received data for requested item. Note it can +// happpen that the single response commits two pending requests(e.g. +// there are two requests one for code and one for node but the hash +// is same). In this case the second response for the same hash will +// be treated as "non-requested" item or "already-processed" item but +// there is no downside. +func (s *Sync) Process(result SyncResult) error { + // If the item was not requested either for code or node, bail out + if s.nodeReqs[result.Hash] == nil && s.codeReqs[result.Hash] == nil { + return ErrNotRequested + } + // There is an pending code request for this data, commit directly + var filled bool + if req := s.codeReqs[result.Hash]; req != nil && req.data == nil { + filled = true + req.data = result.Data + s.commit(req) + } + // There is an pending node request for this data, fill it. + if req := s.nodeReqs[result.Hash]; req != nil && req.data == nil { + filled = true // Decode the node data content and update the request - node, err := decodeNode(item.Hash[:], item.Data) + node, err := decodeNode(result.Hash[:], result.Data) if err != nil { - return committed, i, err + return err } - request.data = item.Data + req.data = result.Data // Create and schedule a request for all the children nodes - requests, err := s.children(request, node) + requests, err := s.children(req, node) if err != nil { - return committed, i, err - } - if len(requests) == 0 && request.deps == 0 { - s.commit(request) - committed = true - continue + return err } - request.deps += len(requests) - for _, child := range requests { - s.schedule(child) + if len(requests) == 0 && req.deps == 0 { + s.commit(req) + } else { + req.deps += len(requests) + for _, child := range requests { + s.schedule(child) + } } } - return committed, 0, nil + if !filled { + return ErrAlreadyProcessed + } + return nil } // Commit flushes the data stored in the internal membatch out to persistent // storage, returning any occurred error. func (s *Sync) Commit(dbw ethdb.Batch) error { // Dump the membatch into a database dbw - for key, value := range s.membatch.batch { - if err := dbw.Put(key[:], value); err != nil { - return err - } + for key, value := range s.membatch.nodes { + rawdb.WriteTrieNode(dbw, key, value) + s.bloom.Add(key[:]) + } + for key, value := range s.membatch.codes { + rawdb.WriteCode(dbw, key, value) s.bloom.Add(key[:]) } // Drop the membatch data and return @@ -237,21 +262,30 @@ func (s *Sync) Commit(dbw ethdb.Batch) error { // Pending returns the number of state entries currently pending for download. func (s *Sync) Pending() int { - return len(s.requests) + return len(s.nodeReqs) + len(s.codeReqs) } // schedule inserts a new state retrieval request into the fetch queue. If there // is already a pending request for this node, the new request will be discarded // and only a parent reference added to the old one. func (s *Sync) schedule(req *request) { + var reqset = s.nodeReqs + if req.code { + reqset = s.codeReqs + } // If we're already requesting this node, add a new reference and stop - if old, ok := s.requests[req.hash]; ok { + if old, ok := reqset[req.hash]; ok { old.parents = append(old.parents, req.parents...) return } - // Schedule the request for future retrieval + reqset[req.hash] = req + + // Schedule the request for future retrieval. This queue is shared + // by both node requests and code requests. It can happen that there + // is a trie node and code has same hash. In this case two elements + // with same hash and same or different depth will be pushed. But it's + // ok the worst case is the second response will be treated as duplicated. s.queue.Push(req.hash, int64(req.depth)) - s.requests[req.hash] = req } // children retrieves all the missing children of a state trie entry for future @@ -297,12 +331,14 @@ func (s *Sync) children(req *request, object node) ([]*request, error) { if node, ok := (child.node).(hashNode); ok { // Try to resolve the node from the local database hash := common.BytesToHash(node) - if _, ok := s.membatch.batch[hash]; ok { + if s.membatch.hasNode(hash) { continue } if s.bloom == nil || s.bloom.Contains(node) { - // Bloom filter says this might be a duplicate, double check - if ok, _ := s.database.Has(node); ok { + // Bloom filter says this might be a duplicate, double check. + // If database says yes, then at least the trie node is present + // and we hold the assumption that it's NOT legacy contract code. + if blob := rawdb.ReadTrieNode(s.database, common.BytesToHash(node)); len(blob) > 0 { continue } // False positive, bump fault meter @@ -325,10 +361,13 @@ func (s *Sync) children(req *request, object node) ([]*request, error) { // committed themselves. func (s *Sync) commit(req *request) (err error) { // Write the node content to the membatch - s.membatch.batch[req.hash] = req.data - - delete(s.requests, req.hash) - + if req.code { + s.membatch.codes[req.hash] = req.data + delete(s.codeReqs, req.hash) + } else { + s.membatch.nodes[req.hash] = req.data + delete(s.nodeReqs, req.hash) + } // Check all parents for completion for _, parent := range req.parents { parent.deps-- diff --git a/trie/sync_bloom.go b/trie/sync_bloom.go index 3108b05935..89f61d66d9 100644 --- a/trie/sync_bloom.go +++ b/trie/sync_bloom.go @@ -25,6 +25,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" @@ -41,8 +42,8 @@ var ( ) // syncBloomHasher is a wrapper around a byte blob to satisfy the interface API -// requirements of the bloom library used. It's used to convert a trie hash into -// a 64 bit mini hash. +// requirements of the bloom library used. It's used to convert a trie hash or +// contract code hash into a 64 bit mini hash. type syncBloomHasher []byte func (f syncBloomHasher) Write(p []byte) (n int, err error) { panic("not implemented") } @@ -53,9 +54,9 @@ func (f syncBloomHasher) Size() int { return 8 } func (f syncBloomHasher) Sum64() uint64 { return binary.BigEndian.Uint64(f) } // SyncBloom is a bloom filter used during fast sync to quickly decide if a trie -// node already exists on disk or not. It self populates from the provided disk -// database on creation in a background thread and will only start returning live -// results once that's finished. +// node or contract code already exists on disk or not. It self populates from the +// provided disk database on creation in a background thread and will only start +// returning live results once that's finished. type SyncBloom struct { bloom *bloomfilter.Filter inited uint32 @@ -107,10 +108,16 @@ func (b *SyncBloom) init(database ethdb.Iteratee) { ) for it.Next() && atomic.LoadUint32(&b.closed) == 0 { // If the database entry is a trie node, add it to the bloom - if key := it.Key(); len(key) == common.HashLength { + key := it.Key() + if len(key) == common.HashLength { b.bloom.Add(syncBloomHasher(key)) bloomLoadMeter.Mark(1) } + // If the database entry is a contract code, add it to the bloom + if ok, hash := rawdb.IsCodeKey(key); ok { + b.bloom.Add(syncBloomHasher(hash)) + bloomLoadMeter.Mark(1) + } // If enough time elapsed since the last iterator swap, restart if time.Since(swap) > 8*time.Second { key := common.CopyBytes(it.Key()) diff --git a/trie/sync_test.go b/trie/sync_test.go index 6025b87fcc..34f3990576 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -124,8 +124,10 @@ func testIterativeSync(t *testing.T, count int) { } results[i] = SyncResult{hash, data} } - if _, index, err := sched.Process(results); err != nil { - t.Fatalf("failed to process result #%d: %v", index, err) + for _, result := range results { + if err := sched.Process(result); err != nil { + t.Fatalf("failed to process result %v", err) + } } batch := diskdb.NewBatch() if err := sched.Commit(batch); err != nil { @@ -160,8 +162,10 @@ func TestIterativeDelayedSync(t *testing.T) { } results[i] = SyncResult{hash, data} } - if _, index, err := sched.Process(results); err != nil { - t.Fatalf("failed to process result #%d: %v", index, err) + for _, result := range results { + if err := sched.Process(result); err != nil { + t.Fatalf("failed to process result %v", err) + } } batch := diskdb.NewBatch() if err := sched.Commit(batch); err != nil { @@ -204,8 +208,10 @@ func testIterativeRandomSync(t *testing.T, count int) { results = append(results, SyncResult{hash, data}) } // Feed the retrieved results back and queue new tasks - if _, index, err := sched.Process(results); err != nil { - t.Fatalf("failed to process result #%d: %v", index, err) + for _, result := range results { + if err := sched.Process(result); err != nil { + t.Fatalf("failed to process result %v", err) + } } batch := diskdb.NewBatch() if err := sched.Commit(batch); err != nil { @@ -251,8 +257,10 @@ func TestIterativeRandomDelayedSync(t *testing.T) { } } // Feed the retrieved results back and queue new tasks - if _, index, err := sched.Process(results); err != nil { - t.Fatalf("failed to process result #%d: %v", index, err) + for _, result := range results { + if err := sched.Process(result); err != nil { + t.Fatalf("failed to process result %v", err) + } } batch := diskdb.NewBatch() if err := sched.Commit(batch); err != nil { @@ -298,8 +306,10 @@ func TestDuplicateAvoidanceSync(t *testing.T) { results[i] = SyncResult{hash, data} } - if _, index, err := sched.Process(results); err != nil { - t.Fatalf("failed to process result #%d: %v", index, err) + for _, result := range results { + if err := sched.Process(result); err != nil { + t.Fatalf("failed to process result %v", err) + } } batch := diskdb.NewBatch() if err := sched.Commit(batch); err != nil { @@ -336,8 +346,10 @@ func TestIncompleteSync(t *testing.T) { results[i] = SyncResult{hash, data} } // Process each of the trie nodes - if _, index, err := sched.Process(results); err != nil { - t.Fatalf("failed to process result #%d: %v", index, err) + for _, result := range results { + if err := sched.Process(result); err != nil { + t.Fatalf("failed to process result %v", err) + } } batch := diskdb.NewBatch() if err := sched.Commit(batch); err != nil { diff --git a/trie/trie.go b/trie/trie.go index 78e2eff534..26c3f2c29b 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -473,3 +473,9 @@ func (t *Trie) hashRoot(db *Database) (node, node, error) { t.unhashed = 0 return hashed, cached, nil } + +// Reset drops the referenced root node and cleans all internal state. +func (t *Trie) Reset() { + t.root = nil + t.unhashed = 0 +}