|
|
|
@ -53,6 +53,11 @@ type syncPack struct { |
|
|
|
|
ignoreInitial bool |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type hashPack struct { |
|
|
|
|
peerId string |
|
|
|
|
hashes []common.Hash |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type Downloader struct { |
|
|
|
|
mu sync.RWMutex |
|
|
|
|
queue *queue |
|
|
|
@ -69,7 +74,7 @@ type Downloader struct { |
|
|
|
|
|
|
|
|
|
// Channels
|
|
|
|
|
newPeerCh chan *peer |
|
|
|
|
hashCh chan []common.Hash |
|
|
|
|
hashCh chan hashPack |
|
|
|
|
blockCh chan blockPack |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -80,7 +85,7 @@ func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { |
|
|
|
|
hasBlock: hasBlock, |
|
|
|
|
getBlock: getBlock, |
|
|
|
|
newPeerCh: make(chan *peer, 1), |
|
|
|
|
hashCh: make(chan []common.Hash, 1), |
|
|
|
|
hashCh: make(chan hashPack, 1), |
|
|
|
|
blockCh: make(chan blockPack, 1), |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -235,15 +240,15 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// XXX Make synchronous
|
|
|
|
|
func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitial bool) error { |
|
|
|
|
func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error { |
|
|
|
|
atomic.StoreInt32(&d.fetchingHashes, 1) |
|
|
|
|
defer atomic.StoreInt32(&d.fetchingHashes, 0) |
|
|
|
|
|
|
|
|
|
if d.queue.has(hash) { |
|
|
|
|
if d.queue.has(h) { |
|
|
|
|
return errAlreadyInPool |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", hash.Bytes()[:4], p.id) |
|
|
|
|
glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id) |
|
|
|
|
|
|
|
|
|
start := time.Now() |
|
|
|
|
|
|
|
|
@ -251,22 +256,34 @@ func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitia |
|
|
|
|
// In such circumstances we don't need to download the block so don't add it to the queue.
|
|
|
|
|
if !ignoreInitial { |
|
|
|
|
// Add the hash to the queue first
|
|
|
|
|
d.queue.hashPool.Add(hash) |
|
|
|
|
d.queue.hashPool.Add(h) |
|
|
|
|
} |
|
|
|
|
// Get the first batch of hashes
|
|
|
|
|
p.getHashes(hash) |
|
|
|
|
p.getHashes(h) |
|
|
|
|
|
|
|
|
|
failureResponseTimer := time.NewTimer(hashTtl) |
|
|
|
|
var ( |
|
|
|
|
failureResponseTimer = time.NewTimer(hashTtl) |
|
|
|
|
attemptedPeers = make(map[string]bool) // attempted peers will help with retries
|
|
|
|
|
activePeer = p // active peer will help determine the current active peer
|
|
|
|
|
hash common.Hash // common and last hash
|
|
|
|
|
) |
|
|
|
|
attemptedPeers[p.id] = true |
|
|
|
|
|
|
|
|
|
out: |
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case hashes := <-d.hashCh: |
|
|
|
|
case hashPack := <-d.hashCh: |
|
|
|
|
// make sure the active peer is giving us the hashes
|
|
|
|
|
if hashPack.peerId != activePeer.id { |
|
|
|
|
glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)\n", hashPack.peerId) |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
failureResponseTimer.Reset(hashTtl) |
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
|
done bool // determines whether we're done fetching hashes (i.e. common hash found)
|
|
|
|
|
hash common.Hash // current and common hash
|
|
|
|
|
hashes = hashPack.hashes |
|
|
|
|
done bool // determines whether we're done fetching hashes (i.e. common hash found)
|
|
|
|
|
) |
|
|
|
|
hashSet := set.New() |
|
|
|
|
for _, hash = range hashes { |
|
|
|
@ -283,13 +300,13 @@ out: |
|
|
|
|
|
|
|
|
|
// Add hashes to the chunk set
|
|
|
|
|
if len(hashes) == 0 { // Make sure the peer actually gave you something valid
|
|
|
|
|
glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", p.id) |
|
|
|
|
glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", activePeer.id) |
|
|
|
|
d.queue.reset() |
|
|
|
|
|
|
|
|
|
return errEmptyHashSet |
|
|
|
|
} else if !done { // Check if we're done fetching
|
|
|
|
|
// Get the next set of hashes
|
|
|
|
|
p.getHashes(hashes[len(hashes)-1]) |
|
|
|
|
activePeer.getHashes(hash) |
|
|
|
|
} else { // we're done
|
|
|
|
|
// The offset of the queue is determined by the highest known block
|
|
|
|
|
var offset int |
|
|
|
@ -303,12 +320,30 @@ out: |
|
|
|
|
} |
|
|
|
|
case <-failureResponseTimer.C: |
|
|
|
|
glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id) |
|
|
|
|
// TODO instead of reseting the queue select a new peer from which we can start downloading hashes.
|
|
|
|
|
// 1. check for peer's best hash to be included in the current hash set;
|
|
|
|
|
// 2. resume from last point (hashes[len(hashes)-1]) using the newly selected peer.
|
|
|
|
|
d.queue.reset() |
|
|
|
|
|
|
|
|
|
return errTimeout |
|
|
|
|
var p *peer // p will be set if a peer can be found
|
|
|
|
|
// Attempt to find a new peer by checking inclusion of peers best hash in our
|
|
|
|
|
// already fetched hash list. This can't guarantee 100% correctness but does
|
|
|
|
|
// a fair job. This is always either correct or false incorrect.
|
|
|
|
|
for id, peer := range d.peers { |
|
|
|
|
if d.queue.hashPool.Has(peer.recentHash) && !attemptedPeers[id] { |
|
|
|
|
p = peer |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// if all peers have been tried, abort the process entirely or if the hash is
|
|
|
|
|
// the zero hash.
|
|
|
|
|
if p == nil || (hash == common.Hash{}) { |
|
|
|
|
d.queue.reset() |
|
|
|
|
return errTimeout |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// set p to the active peer. this will invalidate any hashes that may be returned
|
|
|
|
|
// by our previous (delayed) peer.
|
|
|
|
|
activePeer = p |
|
|
|
|
p.getHashes(hash) |
|
|
|
|
glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
glog.V(logger.Detail).Infof("Downloaded hashes (%d) in %v\n", d.queue.hashPool.Size(), time.Since(start)) |
|
|
|
@ -454,7 +489,7 @@ func (d *Downloader) AddHashes(id string, hashes []common.Hash) error { |
|
|
|
|
glog.Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.hashPool.Size(), from[:4], to[:4], id) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
d.hashCh <- hashes |
|
|
|
|
d.hashCh <- hashPack{id, hashes} |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|