From c58918c84ad6825ca20cc9170b0a79eb1033c50a Mon Sep 17 00:00:00 2001 From: obscuren Date: Sun, 19 Apr 2015 13:30:34 +0200 Subject: [PATCH] downloader: moved chunk ignoring. Fixes issue with catching up --- eth/downloader/downloader.go | 39 +++++++++++++++++-------------- eth/downloader/downloader_test.go | 6 +++-- eth/downloader/peer.go | 8 +++---- eth/downloader/queue.go | 22 +++++++++++------ 4 files changed, 44 insertions(+), 31 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 6768c3e67a..18c4bf4d4a 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -18,14 +18,15 @@ import ( ) const ( - maxBlockFetch = 256 // Amount of max blocks to be fetched per chunk - minDesiredPeerCount = 5 // Amount of peers desired to start syncing - peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount - blockTtl = 15 * time.Second // The amount of time it takes for a block request to time out - hashTtl = 20 * time.Second // The amount of time it takes for a hash request to time out + maxBlockFetch = 256 // Amount of max blocks to be fetched per chunk + peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount + blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out + hashTtl = 20 * time.Second // The amount of time it takes for a hash request to time out ) var ( + minDesiredPeerCount = 5 // Amount of peers desired to start syncing + errLowTd = errors.New("peer's TD is too low") errBusy = errors.New("busy") errUnknownPeer = errors.New("peer's unknown or unhealthy") @@ -127,11 +128,11 @@ out: for { select { case <-d.newPeerCh: - itimer.Stop() // Meet the `minDesiredPeerCount` before we select our best peer if len(d.peers) < minDesiredPeerCount { break } + itimer.Stop() d.selectPeer(d.peers.bestPeer()) case <-itimer.C: @@ -154,17 +155,18 @@ func (d *Downloader) selectPeer(p *peer) { // Make sure it's doing neither. Once done we can restart the // downloading process if the TD is higher. For now just get on // with whatever is going on. This prevents unecessary switching. - if !d.isBusy() { - // selected peer must be better than our own - // XXX we also check the peer's recent hash to make sure we - // don't have it. Some peers report (i think) incorrect TD. - if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) { - return - } - - glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td) - d.syncCh <- syncPack{p, p.recentHash, false} + if d.isBusy() { + return } + // selected peer must be better than our own + // XXX we also check the peer's recent hash to make sure we + // don't have it. Some peers report (i think) incorrect TD. + if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) { + return + } + + glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td) + d.syncCh <- syncPack{p, p.recentHash, false} } @@ -282,6 +284,8 @@ out: // If there are unrequested hashes left start fetching // from the available peers. if d.queue.hashPool.Size() > 0 { + was := d.queue.hashPool.Size() + fmt.Println("it was =", was) availablePeers := d.peers.get(idleState) for _, peer := range availablePeers { // Get a possible chunk. If nil is returned no chunk @@ -301,13 +305,14 @@ out: d.queue.put(chunk.hashes) } } + fmt.Println("it is =", d.queue.hashPool.Size()) // make sure that we have peers available for fetching. If all peers have been tried // and all failed throw an error if len(d.queue.fetching) == 0 { d.queue.reset() - return fmt.Errorf("%v avaialable = %d. total = %d", errPeersUnavailable, len(availablePeers), len(d.peers)) + return fmt.Errorf("%v peers avaialable = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(availablePeers), len(d.peers), d.queue.hashPool.Size()) } } else if len(d.queue.fetching) == 0 { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 6cf99b678d..249d8a5339 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -73,7 +73,7 @@ func (dl *downloadTester) insertChain(blocks types.Blocks) error { } func (dl *downloadTester) getHashes(hash common.Hash) error { - dl.downloader.HashCh <- dl.hashes + dl.downloader.hashCh <- dl.hashes return nil } @@ -109,6 +109,8 @@ func TestDownload(t *testing.T) { glog.SetV(logger.Detail) glog.SetToStderr(true) + minDesiredPeerCount = 4 + hashes := createHashes(0, 1000) blocks := createBlocksFromHashes(hashes) tester := newTester(t, hashes, blocks) @@ -123,7 +125,7 @@ success: case <-tester.done: break success case <-time.After(10 * time.Second): // XXX this could actually fail on a slow computer - t.Error("timout") + t.Error("timeout") } } diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 7065ca1058..bcb8ad43ab 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -71,7 +71,7 @@ type peer struct { td *big.Int recentHash common.Hash - requested *set.Set + ignored *set.Set getHashes hashFetcherFn getBlocks blockFetcherFn @@ -86,7 +86,7 @@ func newPeer(id string, td *big.Int, hash common.Hash, getHashes hashFetcherFn, getHashes: getHashes, getBlocks: getBlocks, state: idleState, - requested: set.New(), + ignored: set.New(), } } @@ -99,8 +99,6 @@ func (p *peer) fetch(chunk *chunk) error { return errors.New("peer already fetching chunk") } - p.requested.Merge(chunk.hashes) - // set working state p.state = workingState // convert the set to a fetchable slice @@ -137,5 +135,5 @@ func (p *peer) demote() { func (p *peer) reset() { p.state = idleState - p.requested.Clear() + p.ignored.Clear() } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index ce3aa98509..adbc2a0d01 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -56,16 +56,18 @@ func (c *queue) get(p *peer, max int) *chunk { // Create a new set of hashes hashes, i := set.New(), 0 c.hashPool.Each(func(v interface{}) bool { + // break on limit if i == limit { return false } - - // Skip any hashes that have previously been requested from the peer - if !p.requested.Has(v) { - hashes.Add(v) - i++ + // skip any hashes that have previously been requested from the peer + if p.ignored.Has(v) { + return true } + hashes.Add(v) + i++ + return true }) // if no hashes can be requested return a nil chunk @@ -79,7 +81,7 @@ func (c *queue) get(p *peer, max int) *chunk { // Create a new chunk for the seperated hashes. The time is being used // to reset the chunk (timeout) - chunk := &chunk{hashes, time.Now()} + chunk := &chunk{p, hashes, time.Now()} // register as 'fetching' state c.fetching[p.id] = chunk @@ -111,6 +113,12 @@ func (c *queue) deliver(id string, blocks []*types.Block) { // If the chunk was never requested simply ignore it if chunk != nil { delete(c.fetching, id) + // check the length of the returned blocks. If the length of blocks is 0 + // we'll assume the peer doesn't know about the chain. + if len(blocks) == 0 { + // So we can ignore the blocks we didn't know about + chunk.peer.ignored.Merge(chunk.hashes) + } // seperate the blocks and the hashes blockHashes := chunk.fetchedHashes(blocks) @@ -118,7 +126,6 @@ func (c *queue) deliver(id string, blocks []*types.Block) { c.blockHashes.Merge(blockHashes) // Add the blocks c.blocks = append(c.blocks, blocks...) - // Add back whatever couldn't be delivered c.hashPool.Merge(chunk.hashes) c.fetchPool.Separate(chunk.hashes) @@ -134,6 +141,7 @@ func (c *queue) put(hashes *set.Set) { } type chunk struct { + peer *peer hashes *set.Set itime time.Time }