From 4f1d92b3329572d75a20b9f9e1cccdf74aa7c79f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 27 May 2016 14:26:00 +0300 Subject: [PATCH] eth/downloader, trie: pull head state concurrently with chain --- eth/downloader/downloader.go | 56 ++++++++++++++++++------------------ eth/downloader/queue.go | 8 +++++- trie/sync.go | 7 ++++- 3 files changed, 41 insertions(+), 30 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 74bff2b66..8cb0d21f7 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -35,6 +35,7 @@ import ( "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/trie" "github.com/rcrowley/go-metrics" ) @@ -114,7 +115,6 @@ type Downloader struct { // Statistics syncStatsChainOrigin uint64 // Origin block number where syncing started at syncStatsChainHeight uint64 // Highest block number known when syncing started - syncStatsStateTotal uint64 // Total number of node state entries known so far syncStatsStateDone uint64 // Number of state trie entries already pulled syncStatsLock sync.RWMutex // Lock protecting the sync stats fields @@ -321,12 +321,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode empty = true } } - // Reset any ephemeral sync statistics - d.syncStatsLock.Lock() - d.syncStatsStateTotal = 0 - d.syncStatsStateDone = 0 - d.syncStatsLock.Unlock() - // Create cancel channel for aborting mid-flight d.cancelLock.Lock() d.cancelCh = make(chan struct{}) @@ -382,7 +376,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e d.syncStatsLock.Unlock() // Initiate the sync using a concurrent hash and block retrieval algorithm - d.queue.Prepare(origin+1, d.mode, 0) + d.queue.Prepare(origin+1, d.mode, 0, nil) if d.syncInitHook != nil { d.syncInitHook(origin, latest) } @@ -397,7 +391,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e if err != nil { return err } - origin, err := d.findAncestor(p, latest) + height := latest.Number.Uint64() + + origin, err := d.findAncestor(p, height) if err != nil { return err } @@ -405,22 +401,22 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { d.syncStatsChainOrigin = origin } - d.syncStatsChainHeight = latest + d.syncStatsChainHeight = height d.syncStatsLock.Unlock() // Initiate the sync using a concurrent header and content retrieval algorithm pivot := uint64(0) switch d.mode { case LightSync: - pivot = latest + pivot = height case FastSync: // Calculate the new fast/slow sync pivot point pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) if err != nil { panic(fmt.Sprintf("Failed to access crypto random source: %v", err)) } - if latest > uint64(fsMinFullBlocks)+pivotOffset.Uint64() { - pivot = latest - uint64(fsMinFullBlocks) - pivotOffset.Uint64() + if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() { + pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64() } // If the point is below the origin, move origin back to ensure state download if pivot < origin { @@ -432,9 +428,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e } glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot) } - d.queue.Prepare(origin+1, d.mode, pivot) + d.queue.Prepare(origin+1, d.mode, pivot, latest) if d.syncInitHook != nil { - d.syncInitHook(origin, latest) + d.syncInitHook(origin, height) } return d.spawnSync(origin+1, func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved @@ -952,7 +948,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { // fetchHeight retrieves the head header of the remote peer to aid in estimating // the total time a pending synchronisation would take. -func (d *Downloader) fetchHeight(p *peer) (uint64, error) { +func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p) // Request the advertised remote head block and wait for the response @@ -962,7 +958,7 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) { for { select { case <-d.cancelCh: - return 0, errCancelBlockFetch + return nil, errCancelBlockFetch case packet := <-d.headerCh: // Discard anything not from the origin peer @@ -974,13 +970,13 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) { headers := packet.(*headerPack).headers if len(headers) != 1 { glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers)) - return 0, errBadPeer + return nil, errBadPeer } - return headers[0].Number.Uint64(), nil + return headers[0], nil case <-timeout: glog.V(logger.Debug).Infof("%v: head header timeout", p) - return 0, errTimeout + return nil, errTimeout case <-d.bodyCh: case <-d.stateCh: @@ -1369,10 +1365,10 @@ func (d *Downloader) fetchNodeData() error { deliver = func(packet dataPack) (int, error) { start := time.Now() return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) { - // If the peer gave us nothing, stalling fast sync, drop - if delivered == 0 { - glog.V(logger.Debug).Infof("peer %s: stalling state delivery, dropping", packet.PeerId()) - d.dropPeer(packet.PeerId()) + // If the peer returned old-requested data, forgive + if err == trie.ErrNotRequested { + glog.V(logger.Info).Infof("peer %s: replied to stale state request, forgiving", packet.PeerId()) + return } if err != nil { // If the node data processing failed, the root hash is very wrong, abort @@ -1381,17 +1377,21 @@ func (d *Downloader) fetchNodeData() error { return } // Processing succeeded, notify state fetcher of continuation - if d.queue.PendingNodeData() > 0 { + pending := d.queue.PendingNodeData() + if pending > 0 { select { case d.stateWakeCh <- true: default: } } - // Log a message to the user and return d.syncStatsLock.Lock() - defer d.syncStatsLock.Unlock() d.syncStatsStateDone += uint64(delivered) - glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone) + d.syncStatsLock.Unlock() + + // Log a message to the user and return + if delivered > 0 { + glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d, pending at least %d", delivered, time.Since(start), d.syncStatsStateDone, pending) + } }) } expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 195eae4ff..01897af6d 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -1262,13 +1262,19 @@ func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, // Prepare configures the result cache to allow accepting and caching inbound // fetch results. -func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64) { +func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) { q.lock.Lock() defer q.lock.Unlock() + // Prepare the queue for sync results if q.resultOffset < offset { q.resultOffset = offset } q.fastSyncPivot = pivot q.mode = mode + + // If long running fast sync, also start up a head stateretrieval immediately + if mode == FastSync && pivot > 0 { + q.stateScheduler = state.NewStateSync(head.Root, q.stateDatabase) + } } diff --git a/trie/sync.go b/trie/sync.go index d55399d06..6cc6aa706 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -17,6 +17,7 @@ package trie import ( + "errors" "fmt" "github.com/ethereum/go-ethereum/common" @@ -24,6 +25,10 @@ import ( "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) +// ErrNotRequested is returned by the trie sync when it's requested to process a +// node it did not request. +var ErrNotRequested = errors.New("not requested") + // request represents a scheduled or already in-flight state retrieval request. type request struct { hash common.Hash // Hash of the node data content to retrieve @@ -143,7 +148,7 @@ func (s *TrieSync) Process(results []SyncResult) (int, error) { // If the item was not requested, bail out request := s.requests[item.Hash] if request == nil { - return i, fmt.Errorf("not requested: %x", item.Hash) + return i, ErrNotRequested } // If the item is a raw entry request, commit directly if request.object == nil {