downloader: updated downloader and fixed issues with catch up

Properly ignore blocks coming from peers not in our peer list (blocked)
and do never request anything from bad peers. Added some checks to
account for blocks known when requesting hashes (missing parents).
pull/730/head
obscuren 10 years ago
parent dff39553d4
commit eda10c7317
  1. 56
      eth/downloader/downloader.go

@ -56,6 +56,7 @@ type blockPack struct {
type syncPack struct { type syncPack struct {
peer *peer peer *peer
hash common.Hash hash common.Hash
ignoreInitial bool
} }
func New(hasBlock hashCheckFn, insertChain chainInsertFn, currentTd currentTdFn) *Downloader { func New(hasBlock hashCheckFn, insertChain chainInsertFn, currentTd currentTdFn) *Downloader {
@ -104,11 +105,13 @@ func (d *Downloader) UnregisterPeer(id string) {
func (d *Downloader) peerHandler() { func (d *Downloader) peerHandler() {
// itimer is used to determine when to start ignoring `minDesiredPeerCount` // itimer is used to determine when to start ignoring `minDesiredPeerCount`
itimer := time.NewTicker(5 * time.Second) //itimer := time.NewTicker(5 * time.Second)
itimer := time.NewTimer(5 * time.Second)
out: out:
for { for {
select { select {
case <-d.newPeerCh: case <-d.newPeerCh:
itimer.Stop()
// Meet the `minDesiredPeerCount` before we select our best peer // Meet the `minDesiredPeerCount` before we select our best peer
if len(d.peers) < minDesiredPeerCount { if len(d.peers) < minDesiredPeerCount {
break break
@ -137,7 +140,7 @@ func (d *Downloader) selectPeer(p *peer) {
} }
glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td) glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td)
d.syncCh <- syncPack{p, p.recentHash} d.syncCh <- syncPack{p, p.recentHash, false}
} }
} }
@ -151,7 +154,7 @@ out:
// Start the fetcher. This will block the update entirely // Start the fetcher. This will block the update entirely
// interupts need to be send to the appropriate channels // interupts need to be send to the appropriate channels
// respectively. // respectively.
if err := d.startFetchingHashes(selectedPeer, sync.hash); err != nil { if err := d.startFetchingHashes(selectedPeer, sync.hash, sync.ignoreInitial); err != nil {
// handle error // handle error
glog.V(logger.Debug).Infoln("Error fetching hashes:", err) glog.V(logger.Debug).Infoln("Error fetching hashes:", err)
// XXX Reset // XXX Reset
@ -178,11 +181,18 @@ out:
} }
// XXX Make synchronous // XXX Make synchronous
func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash) error { func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitial bool) error {
glog.V(logger.Debug).Infoln("Downloading hashes") glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", hash.Bytes()[:4], p.id)
start := time.Now() start := time.Now()
// We ignore the initial hash in some cases (e.g. we received a block without it's parent)
// In such circumstances we don't need to download the block so don't add it to the queue.
if !ignoreInitial {
// Add the hash to the queue first
d.queue.hashPool.Add(hash)
}
// Get the first batch of hashes // Get the first batch of hashes
p.getHashes(hash) p.getHashes(hash)
atomic.StoreInt32(&d.fetchingHashes, 1) atomic.StoreInt32(&d.fetchingHashes, 1)
@ -195,7 +205,7 @@ out:
hashSet := set.New() hashSet := set.New()
for _, hash := range hashes { for _, hash := range hashes {
if d.hasBlock(hash) { if d.hasBlock(hash) {
glog.V(logger.Debug).Infof("Found common hash %x\n", hash) glog.V(logger.Debug).Infof("Found common hash %x\n", hash[:4])
done = true done = true
break break
@ -207,7 +217,7 @@ out:
// Add hashes to the chunk set // Add hashes to the chunk set
// Check if we're done fetching // Check if we're done fetching
if !done { if !done && len(hashes) > 0 {
//fmt.Println("re-fetch. current =", d.queue.hashPool.Size()) //fmt.Println("re-fetch. current =", d.queue.hashPool.Size())
// Get the next set of hashes // Get the next set of hashes
p.getHashes(hashes[len(hashes)-1]) p.getHashes(hashes[len(hashes)-1])
@ -218,7 +228,7 @@ out:
} }
} }
} }
glog.V(logger.Detail).Infoln("Download hashes: done. Took", time.Since(start)) glog.V(logger.Detail).Infof("Downloaded hashes (%d). Took %v\n", d.queue.hashPool.Size(), time.Since(start))
return nil return nil
} }
@ -242,6 +252,10 @@ out:
// from the available peers. // from the available peers.
if d.queue.hashPool.Size() > 0 { if d.queue.hashPool.Size() > 0 {
availablePeers := d.peers.get(idleState) availablePeers := d.peers.get(idleState)
if len(availablePeers) == 0 {
glog.V(logger.Detail).Infoln("No peers available out of", len(d.peers))
}
for _, peer := range availablePeers { for _, peer := range availablePeers {
// Get a possible chunk. If nil is returned no chunk // Get a possible chunk. If nil is returned no chunk
// could be returned due to no hashes available. // could be returned due to no hashes available.
@ -317,22 +331,34 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) {
return return
} }
glog.V(logger.Detail).Infoln("Inserting new block from:", id) peer := d.peers.getPeer(id)
d.queue.addBlock(id, block, td)
// if the peer is in our healthy list of peers; update the td // if the peer is in our healthy list of peers; update the td
// here is a good chance to add the peer back to the list // and add the block. Otherwise just ignore it
if peer := d.peers.getPeer(id); peer != nil { if peer == nil {
glog.V(logger.Detail).Infof("Ignored block from bad peer %s\n", id)
return
}
peer.mu.Lock() peer.mu.Lock()
peer.td = td peer.td = td
peer.recentHash = block.Hash() peer.recentHash = block.Hash()
peer.mu.Unlock() peer.mu.Unlock()
}
glog.V(logger.Detail).Infoln("Inserting new block from:", id)
d.queue.addBlock(id, block, td)
// if neither go ahead to process // if neither go ahead to process
if !(d.isFetchingHashes() || d.isDownloadingBlocks()) { if !(d.isFetchingHashes() || d.isDownloadingBlocks()) {
// Check if the parent of the received block is known.
// If the block is not know, request it otherwise, request.
phash := block.ParentHash()
if !d.hasBlock(phash) {
glog.V(logger.Detail).Infof("Missing parent %x, requires fetching\n", phash.Bytes()[:4])
d.syncCh <- syncPack{peer, peer.recentHash, true}
} else {
d.process() d.process()
} }
}
} }
// Deliver a chunk to the downloader. This is usually done through the BlocksMsg by // Deliver a chunk to the downloader. This is usually done through the BlocksMsg by
@ -369,7 +395,7 @@ func (d *Downloader) process() error {
// TODO change this. This shite // TODO change this. This shite
for i, block := range blocks[:max] { for i, block := range blocks[:max] {
if !d.hasBlock(block.ParentHash()) { if !d.hasBlock(block.ParentHash()) {
d.syncCh <- syncPack{d.peers.bestPeer(), block.Hash()} d.syncCh <- syncPack{d.peers.bestPeer(), block.Hash(), true}
// remove processed blocks // remove processed blocks
blocks = blocks[i:] blocks = blocks[i:]

Loading…
Cancel
Save