From 45f8304f3c44e5379c7e30ab144d73e591e270af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 7 May 2015 12:59:19 +0300 Subject: [PATCH] eth/downloader: fix expiration not running while fetching --- eth/downloader/downloader.go | 40 ++++++++++++++++-------------------- eth/downloader/queue.go | 26 ++++++++++++----------- 2 files changed, 32 insertions(+), 34 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 608acf499a..25b2511127 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -346,13 +346,28 @@ out: d.peers.setState(blockPack.peerId, idleState) } case <-ticker.C: - // after removing bad peers make sure we actually have sufficient peer left to keep downloading + // Check for bad peers. Bad peers may indicate a peer not responding + // to a `getBlocks` message. A timeout of 5 seconds is set. Peers + // that badly or poorly behave are removed from the peer set (not banned). + // Bad peers are excluded from the available peer set and therefor won't be + // reused. XXX We could re-introduce peers after X time. + badPeers := d.queue.Expire(blockTtl) + for _, pid := range badPeers { + // XXX We could make use of a reputation system here ranking peers + // in their performance + // 1) Time for them to respond; + // 2) Measure their speed; + // 3) Amount and availability. + if peer := d.peers[pid]; peer != nil { + peer.demote() + peer.reset() + } + } + // After removing bad peers make sure we actually have sufficient peer left to keep downloading if len(d.peers) == 0 { d.queue.Reset() - return errNoPeers } - // If there are unrequested hashes left start fetching // from the available peers. if d.queue.Pending() > 0 { @@ -392,25 +407,6 @@ out: // safely assume we're done. Another part of the process will check // for parent errors and will re-request anything that's missing break out - } else { - // Check for bad peers. Bad peers may indicate a peer not responding - // to a `getBlocks` message. A timeout of 5 seconds is set. Peers - // that badly or poorly behave are removed from the peer set (not banned). - // Bad peers are excluded from the available peer set and therefor won't be - // reused. XXX We could re-introduce peers after X time. - badPeers := d.queue.Expire(blockTtl) - for _, pid := range badPeers { - // XXX We could make use of a reputation system here ranking peers - // in their performance - // 1) Time for them to respond; - // 2) Measure their speed; - // 3) Amount and availability. - if peer := d.peers[pid]; peer != nil { - peer.demote() - peer.reset() - } - } - } } } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index eae5670521..d849d4d68a 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -12,7 +12,7 @@ import ( ) const ( - blockCacheLimit = 4096 // Maximum number of blocks to cache before throttling the download + blockCacheLimit = 1024 // Maximum number of blocks to cache before throttling the download ) // fetchRequest is a currently running block retrieval operation. @@ -28,8 +28,7 @@ type queue struct { hashQueue *prque.Prque // Priority queue of the block hashes to fetch hashCounter int // Counter indexing the added hashes to ensure retrieval order - pendPool map[string]*fetchRequest // Currently pending block retrieval operations - pendCount int // Number of pending block fetches (to throttle the download) + pendPool map[string]*fetchRequest // Currently pending block retrieval operations blockPool map[common.Hash]int // Hash-set of the downloaded data blocks, mapping to cache indexes blockCache []*types.Block // Downloaded but not yet delivered blocks @@ -58,7 +57,6 @@ func (q *queue) Reset() { q.hashCounter = 0 q.pendPool = make(map[string]*fetchRequest) - q.pendCount = 0 q.blockPool = make(map[common.Hash]int) q.blockOffset = 0 @@ -106,7 +104,13 @@ func (q *queue) Throttle() bool { q.lock.RLock() defer q.lock.RUnlock() - return q.pendCount >= len(q.blockCache)-len(q.blockPool) + // Calculate the currently in-flight block requests + pending := 0 + for _, request := range q.pendPool { + pending += len(request.Hashes) + } + // Throttle if more blocks are in-flight than free space in the cache + return pending >= len(q.blockCache)-len(q.blockPool) } // Has checks if a hash is within the download queue or not. @@ -206,10 +210,14 @@ func (q *queue) Reserve(p *peer, max int) *fetchRequest { q.lock.Lock() defer q.lock.Unlock() - // Short circuit if the pool has been depleted + // Short circuit if the pool has been depleted, or if the peer's already + // downloading something (sanity check not to corrupt state) if q.hashQueue.Empty() { return nil } + if _, ok := q.pendPool[p.id]; ok { + return nil + } // Retrieve a batch of hashes, skipping previously failed ones send := make(map[common.Hash]int) skip := make(map[common.Hash]int) @@ -236,7 +244,6 @@ func (q *queue) Reserve(p *peer, max int) *fetchRequest { Time: time.Now(), } q.pendPool[p.id] = request - q.pendCount += len(request.Hashes) return request } @@ -250,7 +257,6 @@ func (q *queue) Cancel(request *fetchRequest) { q.hashQueue.Push(hash, float32(index)) } delete(q.pendPool, request.Peer.id) - q.pendCount -= len(request.Hashes) } // Expire checks for in flight requests that exceeded a timeout allowance, @@ -266,7 +272,6 @@ func (q *queue) Expire(timeout time.Duration) []string { for hash, index := range request.Hashes { q.hashQueue.Push(hash, float32(index)) } - q.pendCount -= len(request.Hashes) peers = append(peers, id) } } @@ -289,9 +294,6 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { } delete(q.pendPool, id) - // Mark all the hashes in the request as non-pending - q.pendCount -= len(request.Hashes) - // If no blocks were retrieved, mark them as unavailable for the origin peer if len(blocks) == 0 { for hash, _ := range request.Hashes {