From 58d1988349b94d168ffc46d60c6d018564d57dc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 3 Dec 2021 12:32:41 +0200 Subject: [PATCH] core, eth, les, trie: remove the sync bloom, used by fast sync --- core/state/sync.go | 4 +- core/state/sync_test.go | 13 +- eth/downloader/downloader.go | 25 +--- eth/downloader/downloader_test.go | 2 +- eth/handler.go | 11 +- eth/handler_eth.go | 6 +- eth/handler_eth_test.go | 2 - eth/protocols/eth/handler.go | 4 - eth/protocols/eth/handler_test.go | 6 +- eth/protocols/eth/handlers.go | 8 +- eth/protocols/snap/sync.go | 2 +- les/client_handler.go | 2 +- les/downloader/downloader.go | 25 +--- les/downloader/downloader_test.go | 2 +- les/downloader/statesync.go | 2 +- trie/sync.go | 57 +++------ trie/sync_bloom.go | 191 ------------------------------ trie/sync_test.go | 16 +-- 18 files changed, 49 insertions(+), 329 deletions(-) delete mode 100644 trie/sync_bloom.go diff --git a/core/state/sync.go b/core/state/sync.go index 734961d9c..cc7d01a21 100644 --- a/core/state/sync.go +++ b/core/state/sync.go @@ -27,7 +27,7 @@ import ( ) // NewStateSync create a new state trie download scheduler. -func NewStateSync(root common.Hash, database ethdb.KeyValueReader, bloom *trie.SyncBloom, onLeaf func(paths [][]byte, leaf []byte) error) *trie.Sync { +func NewStateSync(root common.Hash, database ethdb.KeyValueReader, onLeaf func(paths [][]byte, leaf []byte) error) *trie.Sync { // Register the storage slot callback if the external callback is specified. var onSlot func(paths [][]byte, hexpath []byte, leaf []byte, parent common.Hash) error if onLeaf != nil { @@ -52,6 +52,6 @@ func NewStateSync(root common.Hash, database ethdb.KeyValueReader, bloom *trie.S syncer.AddCodeEntry(common.BytesToHash(obj.CodeHash), hexpath, parent) return nil } - syncer = trie.NewSync(root, database, onAccount, bloom) + syncer = trie.NewSync(root, database, onAccount) return syncer } diff --git a/core/state/sync_test.go b/core/state/sync_test.go index beb8fcfd9..007590c76 100644 --- a/core/state/sync_test.go +++ b/core/state/sync_test.go @@ -26,7 +26,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" ) @@ -134,7 +133,7 @@ func checkStateConsistency(db ethdb.Database, root common.Hash) error { // Tests that an empty state is not scheduled for syncing. func TestEmptyStateSync(t *testing.T) { empty := common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421") - sync := NewStateSync(empty, rawdb.NewMemoryDatabase(), trie.NewSyncBloom(1, memorydb.New()), nil) + sync := NewStateSync(empty, rawdb.NewMemoryDatabase(), nil) if nodes, paths, codes := sync.Missing(1); len(nodes) != 0 || len(paths) != 0 || len(codes) != 0 { t.Errorf(" content requested for empty state: %v, %v, %v", nodes, paths, codes) } @@ -171,7 +170,7 @@ func testIterativeStateSync(t *testing.T, count int, commit bool, bypath bool) { // Create a destination state and sync with the scheduler dstDb := rawdb.NewMemoryDatabase() - sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb), nil) + sched := NewStateSync(srcRoot, dstDb, nil) nodes, paths, codes := sched.Missing(count) var ( @@ -250,7 +249,7 @@ func TestIterativeDelayedStateSync(t *testing.T) { // Create a destination state and sync with the scheduler dstDb := rawdb.NewMemoryDatabase() - sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb), nil) + sched := NewStateSync(srcRoot, dstDb, nil) nodes, _, codes := sched.Missing(0) queue := append(append([]common.Hash{}, nodes...), codes...) @@ -298,7 +297,7 @@ func testIterativeRandomStateSync(t *testing.T, count int) { // Create a destination state and sync with the scheduler dstDb := rawdb.NewMemoryDatabase() - sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb), nil) + sched := NewStateSync(srcRoot, dstDb, nil) queue := make(map[common.Hash]struct{}) nodes, _, codes := sched.Missing(count) @@ -348,7 +347,7 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) { // Create a destination state and sync with the scheduler dstDb := rawdb.NewMemoryDatabase() - sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb), nil) + sched := NewStateSync(srcRoot, dstDb, nil) queue := make(map[common.Hash]struct{}) nodes, _, codes := sched.Missing(0) @@ -415,7 +414,7 @@ func TestIncompleteStateSync(t *testing.T) { // Create a destination state and sync with the scheduler dstDb := rawdb.NewMemoryDatabase() - sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb), nil) + sched := NewStateSync(srcRoot, dstDb, nil) var added []common.Hash diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 6b262b5ec..28ad18b81 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -36,7 +36,6 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/trie" ) var ( @@ -101,8 +100,7 @@ type Downloader struct { queue *queue // Scheduler for selecting the hashes to download peers *peerSet // Set of active peers from which download can proceed - stateDB ethdb.Database // Database to state sync into (and deduplicate via) - stateBloom *trie.SyncBloom // Bloom filter for snap trie node and contract code existence checks + stateDB ethdb.Database // Database to state sync into (and deduplicate via) // Statistics syncStatsChainOrigin uint64 // Origin block number where syncing started at @@ -203,13 +201,12 @@ type BlockChain interface { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { +func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { if lightchain == nil { lightchain = chain } dl := &Downloader{ stateDB: stateDb, - stateBloom: stateBloom, mux: mux, checkpoint: checkpoint, queue: newQueue(blockCacheMaxItems, blockCacheInitialItems), @@ -365,12 +362,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { log.Info("Block synchronisation started") } - // If we are already full syncing, but have a snap-sync bloom filter laying - // around, make sure it doesn't use memory any more. This is a special case - // when the user attempts to snap sync a new empty network. - if mode == FullSync && d.stateBloom != nil { - d.stateBloom.Close() - } // If snap sync was requested, create the snap scheduler and switch to snap // sync mode. Long term we could drop snap sync or merge the two together, // but until snap becomes prevalent, we should support both. TODO(karalabe). @@ -612,9 +603,6 @@ func (d *Downloader) Terminate() { default: close(d.quitCh) } - if d.stateBloom != nil { - d.stateBloom.Close() - } d.quitLock.Unlock() // Cancel any pending download requests @@ -1599,15 +1587,6 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error { return err } atomic.StoreInt32(&d.committed, 1) - - // If we had a bloom filter for the state sync, deallocate it now. Note, we only - // deallocate internally, but keep the empty wrapper. This ensures that if we do - // a rollback after committing the pivot and restarting snap sync, we don't end - // up using a nil bloom. Empty bloom is fine, it just returns that it does not - // have the info we need, so reach down to the database instead. - if d.stateBloom != nil { - d.stateBloom.Close() - } return nil } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index f62a5d028..3e78a0bb7 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -75,7 +75,7 @@ func newTester() *downloadTester { chain: chain, peers: make(map[string]*downloadTesterPeer), } - tester.downloader = New(0, db, trie.NewSyncBloom(1, db), new(event.TypeMux), tester.chain, nil, tester.dropPeer) + tester.downloader = New(0, db, new(event.TypeMux), tester.chain, nil, tester.dropPeer) return tester } diff --git a/eth/handler.go b/eth/handler.go index 26a90495b..55ca869c7 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -39,7 +39,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/trie" ) const ( @@ -106,7 +105,6 @@ type handler struct { maxPeers int downloader *downloader.Downloader - stateBloom *trie.SyncBloom blockFetcher *fetcher.BlockFetcher txFetcher *fetcher.TxFetcher peers *peerSet @@ -176,14 +174,7 @@ func newHandler(config *handlerConfig) (*handler, error) { // Construct the downloader (long sync) and its backing state bloom if snap // sync is requested. The downloader is responsible for deallocating the state // bloom when it's done. - // Note: we don't enable it if snap-sync is performed, since it's very heavy - // and the heal-portion of the snap sync is much lighter than snap. What we particularly - // want to avoid, is a 90%-finished (but restarted) snap-sync to begin - // indexing the entire trie - if atomic.LoadUint32(&h.snapSync) == 1 && atomic.LoadUint32(&h.snapSync) == 0 { - h.stateBloom = trie.NewSyncBloom(config.BloomCache, config.Database) - } - h.downloader = downloader.New(h.checkpointNumber, config.Database, h.stateBloom, h.eventMux, h.chain, nil, h.removePeer) + h.downloader = downloader.New(h.checkpointNumber, config.Database, h.eventMux, h.chain, nil, h.removePeer) // Construct the fetcher (short sync) validator := func(header *types.Header) error { diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 335558249..bfe95e8c4 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -27,16 +27,14 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/trie" ) // ethHandler implements the eth.Backend interface to handle the various network // packets that are sent as replies or broadcasts. type ethHandler handler -func (h *ethHandler) Chain() *core.BlockChain { return h.chain } -func (h *ethHandler) StateBloom() *trie.SyncBloom { return h.stateBloom } -func (h *ethHandler) TxPool() eth.TxPool { return h.txpool } +func (h *ethHandler) Chain() *core.BlockChain { return h.chain } +func (h *ethHandler) TxPool() eth.TxPool { return h.txpool } // RunPeer is invoked when a peer joins on the `eth` protocol. func (h *ethHandler) RunPeer(peer *eth.Peer, hand eth.Handler) error { diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 311fbac9c..b826ed7a9 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -38,7 +38,6 @@ import ( "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/trie" ) // testEthHandler is a mock event handler to listen for inbound network requests @@ -50,7 +49,6 @@ type testEthHandler struct { } func (h *testEthHandler) Chain() *core.BlockChain { panic("no backing chain") } -func (h *testEthHandler) StateBloom() *trie.SyncBloom { panic("no backing state bloom") } func (h *testEthHandler) TxPool() eth.TxPool { panic("no backing tx pool") } func (h *testEthHandler) AcceptTxs() bool { return true } func (h *testEthHandler) RunPeer(*eth.Peer, eth.Handler) error { panic("not used in tests") } diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 44fa18c4b..6e0fc4a37 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/trie" ) const ( @@ -69,9 +68,6 @@ type Backend interface { // Chain retrieves the blockchain object to serve data. Chain() *core.BlockChain - // StateBloom retrieves the bloom filter - if any - for state trie nodes. - StateBloom() *trie.SyncBloom - // TxPool retrieves the transaction pool object to serve data. TxPool() TxPool diff --git a/eth/protocols/eth/handler_test.go b/eth/protocols/eth/handler_test.go index 66f013409..5192f043d 100644 --- a/eth/protocols/eth/handler_test.go +++ b/eth/protocols/eth/handler_test.go @@ -34,7 +34,6 @@ import ( "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/trie" ) var ( @@ -91,9 +90,8 @@ func (b *testBackend) close() { b.chain.Stop() } -func (b *testBackend) Chain() *core.BlockChain { return b.chain } -func (b *testBackend) StateBloom() *trie.SyncBloom { return nil } -func (b *testBackend) TxPool() TxPool { return b.txpool } +func (b *testBackend) Chain() *core.BlockChain { return b.chain } +func (b *testBackend) TxPool() TxPool { return b.txpool } func (b *testBackend) RunPeer(peer *Peer, handler Handler) error { // Normally the backend would do peer mainentance and handshakes. All that diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 0ed8a8eba..503e572a8 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -164,13 +164,13 @@ func handleGetNodeData66(backend Backend, msg Decoder, peer *Peer) error { if err := msg.Decode(&query); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } - response := ServiceGetNodeDataQuery(backend.Chain(), backend.StateBloom(), query.GetNodeDataPacket) + response := ServiceGetNodeDataQuery(backend.Chain(), query.GetNodeDataPacket) return peer.ReplyNodeData(query.RequestId, response) } // ServiceGetNodeDataQuery assembles the response to a node data query. It is // exposed to allow external packages to test protocol behavior. -func ServiceGetNodeDataQuery(chain *core.BlockChain, bloom *trie.SyncBloom, query GetNodeDataPacket) [][]byte { +func ServiceGetNodeDataQuery(chain *core.BlockChain, query GetNodeDataPacket) [][]byte { // Gather state data until the fetch or network limits is reached var ( bytes int @@ -182,10 +182,6 @@ func ServiceGetNodeDataQuery(chain *core.BlockChain, bloom *trie.SyncBloom, quer break } // Retrieve the requested state entry - if bloom != nil && !bloom.Contains(hash[:]) { - // Only lookup the trie node if there's chance that we actually have it - continue - } entry, err := chain.TrieNode(hash) if len(entry) == 0 || err != nil { // Read the contract code with prefix only to save unnecessary lookups. diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index df364b183..be8644a5a 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -546,7 +546,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { s.lock.Lock() s.root = root s.healer = &healTask{ - scheduler: state.NewStateSync(root, s.db, nil, s.onHealState), + scheduler: state.NewStateSync(root, s.db, s.onHealState), trieTasks: make(map[common.Hash]trie.SyncPath), codeTasks: make(map[common.Hash]struct{}), } diff --git a/les/client_handler.go b/les/client_handler.go index db5eb8a64..e416f92e2 100644 --- a/les/client_handler.go +++ b/les/client_handler.go @@ -74,7 +74,7 @@ func newClientHandler(ulcServers []string, ulcFraction int, checkpoint *params.T height = (checkpoint.SectionIndex+1)*params.CHTFrequency - 1 } handler.fetcher = newLightFetcher(backend.blockchain, backend.engine, backend.peers, handler.ulc, backend.chainDb, backend.reqDist, handler.synchronise) - handler.downloader = downloader.New(height, backend.chainDb, nil, backend.eventMux, nil, backend.blockchain, handler.removePeer) + handler.downloader = downloader.New(height, backend.chainDb, backend.eventMux, nil, backend.blockchain, handler.removePeer) handler.backend.peers.subscribe((*downloaderPeerNotify)(handler)) return handler } diff --git a/les/downloader/downloader.go b/les/downloader/downloader.go index 722077a5f..448a94192 100644 --- a/les/downloader/downloader.go +++ b/les/downloader/downloader.go @@ -40,7 +40,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/trie" ) var ( @@ -97,8 +96,7 @@ type Downloader struct { queue *queue // Scheduler for selecting the hashes to download peers *peerSet // Set of active peers from which download can proceed - stateDB ethdb.Database // Database to state sync into (and deduplicate via) - stateBloom *trie.SyncBloom // Bloom filter for fast trie node and contract code existence checks + stateDB ethdb.Database // Database to state sync into (and deduplicate via) // Statistics syncStatsChainOrigin uint64 // Origin block number where syncing started at @@ -207,13 +205,12 @@ type BlockChain interface { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { +func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { if lightchain == nil { lightchain = chain } dl := &Downloader{ stateDB: stateDb, - stateBloom: stateBloom, mux: mux, checkpoint: checkpoint, queue: newQueue(blockCacheMaxItems, blockCacheInitialItems), @@ -367,12 +364,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { log.Info("Block synchronisation started") } - // If we are already full syncing, but have a fast-sync bloom filter laying - // around, make sure it doesn't use memory any more. This is a special case - // when the user attempts to fast sync a new empty network. - if mode == FullSync && d.stateBloom != nil { - d.stateBloom.Close() - } // If snap sync was requested, create the snap scheduler and switch to fast // sync mode. Long term we could drop fast sync or merge the two together, // but until snap becomes prevalent, we should support both. TODO(karalabe). @@ -628,9 +619,6 @@ func (d *Downloader) Terminate() { default: close(d.quitCh) } - if d.stateBloom != nil { - d.stateBloom.Close() - } d.quitLock.Unlock() // Cancel any pending download requests @@ -1930,15 +1918,6 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error { return err } atomic.StoreInt32(&d.committed, 1) - - // If we had a bloom filter for the state sync, deallocate it now. Note, we only - // deallocate internally, but keep the empty wrapper. This ensures that if we do - // a rollback after committing the pivot and restarting fast sync, we don't end - // up using a nil bloom. Empty bloom is fine, it just returns that it does not - // have the info we need, so reach down to the database instead. - if d.stateBloom != nil { - d.stateBloom.Close() - } return nil } diff --git a/les/downloader/downloader_test.go b/les/downloader/downloader_test.go index 8736de39d..69bdb90ed 100644 --- a/les/downloader/downloader_test.go +++ b/les/downloader/downloader_test.go @@ -89,7 +89,7 @@ func newTester() *downloadTester { tester.stateDb = rawdb.NewMemoryDatabase() tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00}) - tester.downloader = New(0, tester.stateDb, trie.NewSyncBloom(1, tester.stateDb), new(event.TypeMux), tester, nil, tester.dropPeer) + tester.downloader = New(0, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer) return tester } diff --git a/les/downloader/statesync.go b/les/downloader/statesync.go index bc396fffb..2b3278822 100644 --- a/les/downloader/statesync.go +++ b/les/downloader/statesync.go @@ -297,7 +297,7 @@ func newStateSync(d *Downloader, root common.Hash) *stateSync { return &stateSync{ d: d, root: root, - sched: state.NewStateSync(root, d.stateDB, d.stateBloom, nil), + sched: state.NewStateSync(root, d.stateDB, nil), keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState), trieTasks: make(map[common.Hash]*trieTask), codeTasks: make(map[common.Hash]*codeTask), diff --git a/trie/sync.go b/trie/sync.go index 3a6076ff8..d6e435f93 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -128,11 +128,10 @@ type Sync struct { codeReqs map[common.Hash]*request // Pending requests pertaining to a code hash queue *prque.Prque // Priority queue with the pending requests fetches map[int]int // Number of active fetches per trie node depth - bloom *SyncBloom // Bloom filter for fast state existence checks } // NewSync creates a new trie data download scheduler. -func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallback, bloom *SyncBloom) *Sync { +func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallback) *Sync { ts := &Sync{ database: database, membatch: newSyncMemBatch(), @@ -140,7 +139,6 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb codeReqs: make(map[common.Hash]*request), queue: prque.New(nil), fetches: make(map[int]int), - bloom: bloom, } ts.AddSubTrie(root, nil, common.Hash{}, callback) return ts @@ -155,16 +153,11 @@ func (s *Sync) AddSubTrie(root common.Hash, path []byte, parent common.Hash, cal if s.membatch.hasNode(root) { return } - if s.bloom == nil || s.bloom.Contains(root[:]) { - // Bloom filter says this might be a duplicate, double check. - // If database says yes, then at least the trie node is present - // and we hold the assumption that it's NOT legacy contract code. - blob := rawdb.ReadTrieNode(s.database, root) - if len(blob) > 0 { - return - } - // False positive, bump fault meter - bloomFaultMeter.Mark(1) + // If database says this is a duplicate, then at least the trie node is + // present, and we hold the assumption that it's NOT legacy contract code. + blob := rawdb.ReadTrieNode(s.database, root) + if len(blob) > 0 { + return } // Assemble the new sub-trie sync request req := &request{ @@ -195,18 +188,13 @@ func (s *Sync) AddCodeEntry(hash common.Hash, path []byte, parent common.Hash) { if s.membatch.hasCode(hash) { return } - if s.bloom == nil || s.bloom.Contains(hash[:]) { - // Bloom filter says this might be a duplicate, double check. - // If database says yes, the blob is present for sure. - // Note we only check the existence with new code scheme, fast - // sync is expected to run with a fresh new node. Even there - // exists the code with legacy format, fetch and store with - // new scheme anyway. - if blob := rawdb.ReadCodeWithPrefix(s.database, hash); len(blob) > 0 { - return - } - // False positive, bump fault meter - bloomFaultMeter.Mark(1) + // If database says duplicate, the blob is present for sure. + // Note we only check the existence with new code scheme, fast + // sync is expected to run with a fresh new node. Even there + // exists the code with legacy format, fetch and store with + // new scheme anyway. + if blob := rawdb.ReadCodeWithPrefix(s.database, hash); len(blob) > 0 { + return } // Assemble the new sub-trie sync request req := &request{ @@ -313,15 +301,9 @@ func (s *Sync) Commit(dbw ethdb.Batch) error { // Dump the membatch into a database dbw for key, value := range s.membatch.nodes { rawdb.WriteTrieNode(dbw, key, value) - if s.bloom != nil { - s.bloom.Add(key[:]) - } } for key, value := range s.membatch.codes { rawdb.WriteCode(dbw, key, value) - if s.bloom != nil { - s.bloom.Add(key[:]) - } } // Drop the membatch data and return s.membatch = newSyncMemBatch() @@ -417,15 +399,10 @@ func (s *Sync) children(req *request, object node) ([]*request, error) { if s.membatch.hasNode(hash) { continue } - if s.bloom == nil || s.bloom.Contains(node) { - // Bloom filter says this might be a duplicate, double check. - // If database says yes, then at least the trie node is present - // and we hold the assumption that it's NOT legacy contract code. - if blob := rawdb.ReadTrieNode(s.database, hash); len(blob) > 0 { - continue - } - // False positive, bump fault meter - bloomFaultMeter.Mark(1) + // If database says duplicate, then at least the trie node is present + // and we hold the assumption that it's NOT legacy contract code. + if blob := rawdb.ReadTrieNode(s.database, hash); len(blob) > 0 { + continue } // Locally unknown node, schedule for retrieval requests = append(requests, &request{ diff --git a/trie/sync_bloom.go b/trie/sync_bloom.go deleted file mode 100644 index 91e5e6711..000000000 --- a/trie/sync_bloom.go +++ /dev/null @@ -1,191 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package trie - -import ( - "encoding/binary" - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" - bloomfilter "github.com/holiman/bloomfilter/v2" -) - -var ( - bloomAddMeter = metrics.NewRegisteredMeter("trie/bloom/add", nil) - bloomLoadMeter = metrics.NewRegisteredMeter("trie/bloom/load", nil) - bloomTestMeter = metrics.NewRegisteredMeter("trie/bloom/test", nil) - bloomMissMeter = metrics.NewRegisteredMeter("trie/bloom/miss", nil) - bloomFaultMeter = metrics.NewRegisteredMeter("trie/bloom/fault", nil) - bloomErrorGauge = metrics.NewRegisteredGauge("trie/bloom/error", nil) -) - -// SyncBloom is a bloom filter used during fast sync to quickly decide if a trie -// node or contract code already exists on disk or not. It self populates from the -// provided disk database on creation in a background thread and will only start -// returning live results once that's finished. -type SyncBloom struct { - bloom *bloomfilter.Filter - inited uint32 - closer sync.Once - closed uint32 - pend sync.WaitGroup - closeCh chan struct{} -} - -// NewSyncBloom creates a new bloom filter of the given size (in megabytes) and -// initializes it from the database. The bloom is hard coded to use 3 filters. -func NewSyncBloom(memory uint64, database ethdb.Iteratee) *SyncBloom { - // Create the bloom filter to track known trie nodes - bloom, err := bloomfilter.New(memory*1024*1024*8, 4) - if err != nil { - panic(fmt.Sprintf("failed to create bloom: %v", err)) - } - log.Info("Allocated fast sync bloom", "size", common.StorageSize(memory*1024*1024)) - - // Assemble the fast sync bloom and init it from previous sessions - b := &SyncBloom{ - bloom: bloom, - closeCh: make(chan struct{}), - } - b.pend.Add(2) - go func() { - defer b.pend.Done() - b.init(database) - }() - go func() { - defer b.pend.Done() - b.meter() - }() - return b -} - -// init iterates over the database, pushing every trie hash into the bloom filter. -func (b *SyncBloom) init(database ethdb.Iteratee) { - // Iterate over the database, but restart every now and again to avoid holding - // a persistent snapshot since fast sync can push a ton of data concurrently, - // bloating the disk. - // - // Note, this is fine, because everything inserted into leveldb by fast sync is - // also pushed into the bloom directly, so we're not missing anything when the - // iterator is swapped out for a new one. - it := database.NewIterator(nil, nil) - - var ( - start = time.Now() - swap = time.Now() - ) - for it.Next() && atomic.LoadUint32(&b.closed) == 0 { - // If the database entry is a trie node, add it to the bloom - key := it.Key() - if len(key) == common.HashLength { - b.bloom.AddHash(binary.BigEndian.Uint64(key)) - bloomLoadMeter.Mark(1) - } else if ok, hash := rawdb.IsCodeKey(key); ok { - // If the database entry is a contract code, add it to the bloom - b.bloom.AddHash(binary.BigEndian.Uint64(hash)) - bloomLoadMeter.Mark(1) - } - // If enough time elapsed since the last iterator swap, restart - if time.Since(swap) > 8*time.Second { - key := common.CopyBytes(it.Key()) - - it.Release() - it = database.NewIterator(nil, key) - - log.Info("Initializing state bloom", "items", b.bloom.N(), "errorrate", b.bloom.FalsePosititveProbability(), "elapsed", common.PrettyDuration(time.Since(start))) - swap = time.Now() - } - } - it.Release() - - // Mark the bloom filter inited and return - log.Info("Initialized state bloom", "items", b.bloom.N(), "errorrate", b.bloom.FalsePosititveProbability(), "elapsed", common.PrettyDuration(time.Since(start))) - atomic.StoreUint32(&b.inited, 1) -} - -// meter periodically recalculates the false positive error rate of the bloom -// filter and reports it in a metric. -func (b *SyncBloom) meter() { - // check every second - tick := time.NewTicker(1 * time.Second) - defer tick.Stop() - - for { - select { - case <-tick.C: - // Report the current error ration. No floats, lame, scale it up. - bloomErrorGauge.Update(int64(b.bloom.FalsePosititveProbability() * 100000)) - case <-b.closeCh: - return - } - } -} - -// Close terminates any background initializer still running and releases all the -// memory allocated for the bloom. -func (b *SyncBloom) Close() error { - b.closer.Do(func() { - // Ensure the initializer is stopped - atomic.StoreUint32(&b.closed, 1) - close(b.closeCh) - b.pend.Wait() - - // Wipe the bloom, but mark it "uninited" just in case someone attempts an access - log.Info("Deallocated state bloom", "items", b.bloom.N(), "errorrate", b.bloom.FalsePosititveProbability()) - - atomic.StoreUint32(&b.inited, 0) - b.bloom = nil - }) - return nil -} - -// Add inserts a new trie node hash into the bloom filter. -func (b *SyncBloom) Add(hash []byte) { - if atomic.LoadUint32(&b.closed) == 1 { - return - } - b.bloom.AddHash(binary.BigEndian.Uint64(hash)) - bloomAddMeter.Mark(1) -} - -// Contains tests if the bloom filter contains the given hash: -// - false: the bloom definitely does not contain hash -// - true: the bloom maybe contains hash -// -// While the bloom is being initialized, any query will return true. -func (b *SyncBloom) Contains(hash []byte) bool { - bloomTestMeter.Mark(1) - if atomic.LoadUint32(&b.inited) == 0 { - // We didn't load all the trie nodes from the previous run of Geth yet. As - // such, we can't say for sure if a hash is not present for anything. Until - // the init is done, we're faking "possible presence" for everything. - return true - } - // Bloom initialized, check the real one and report any successful misses - maybe := b.bloom.ContainsHash(binary.BigEndian.Uint64(hash)) - if !maybe { - bloomMissMeter.Mark(1) - } - return maybe -} diff --git a/trie/sync_test.go b/trie/sync_test.go index cb3283875..970730b67 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -95,7 +95,7 @@ func TestEmptySync(t *testing.T) { emptyB, _ := New(emptyRoot, dbB) for i, trie := range []*Trie{emptyA, emptyB} { - sync := NewSync(trie.Hash(), memorydb.New(), nil, NewSyncBloom(1, memorydb.New())) + sync := NewSync(trie.Hash(), memorydb.New(), nil) if nodes, paths, codes := sync.Missing(1); len(nodes) != 0 || len(paths) != 0 || len(codes) != 0 { t.Errorf("test %d: content requested for empty trie: %v, %v, %v", i, nodes, paths, codes) } @@ -116,7 +116,7 @@ func testIterativeSync(t *testing.T, count int, bypath bool) { // Create a destination trie and sync with the scheduler diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) + sched := NewSync(srcTrie.Hash(), diskdb, nil) nodes, paths, codes := sched.Missing(count) var ( @@ -177,7 +177,7 @@ func TestIterativeDelayedSync(t *testing.T) { // Create a destination trie and sync with the scheduler diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) + sched := NewSync(srcTrie.Hash(), diskdb, nil) nodes, _, codes := sched.Missing(10000) queue := append(append([]common.Hash{}, nodes...), codes...) @@ -223,7 +223,7 @@ func testIterativeRandomSync(t *testing.T, count int) { // Create a destination trie and sync with the scheduler diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) + sched := NewSync(srcTrie.Hash(), diskdb, nil) queue := make(map[common.Hash]struct{}) nodes, _, codes := sched.Missing(count) @@ -271,7 +271,7 @@ func TestIterativeRandomDelayedSync(t *testing.T) { // Create a destination trie and sync with the scheduler diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) + sched := NewSync(srcTrie.Hash(), diskdb, nil) queue := make(map[common.Hash]struct{}) nodes, _, codes := sched.Missing(10000) @@ -324,7 +324,7 @@ func TestDuplicateAvoidanceSync(t *testing.T) { // Create a destination trie and sync with the scheduler diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) + sched := NewSync(srcTrie.Hash(), diskdb, nil) nodes, _, codes := sched.Missing(0) queue := append(append([]common.Hash{}, nodes...), codes...) @@ -371,7 +371,7 @@ func TestIncompleteSync(t *testing.T) { // Create a destination trie and sync with the scheduler diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) + sched := NewSync(srcTrie.Hash(), diskdb, nil) var added []common.Hash @@ -431,7 +431,7 @@ func TestSyncOrdering(t *testing.T) { // Create a destination trie and sync with the scheduler, tracking the requests diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) + sched := NewSync(srcTrie.Hash(), diskdb, nil) nodes, paths, _ := sched.Missing(1) queue := append([]common.Hash{}, nodes...)