|
|
|
@ -2,6 +2,7 @@ package downloader |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"errors" |
|
|
|
|
"math/rand" |
|
|
|
|
"sync" |
|
|
|
|
"sync/atomic" |
|
|
|
|
"time" |
|
|
|
@ -14,15 +15,19 @@ import ( |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
|
maxBlockFetch = 128 // Amount of max blocks to be fetched per chunk
|
|
|
|
|
maxHashFetch = 512 // Amount of hashes to be fetched per chunk
|
|
|
|
|
maxBlockFetch = 128 // Amount of blocks to be fetched per chunk
|
|
|
|
|
peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
|
|
|
|
|
hashTtl = 20 * time.Second // The amount of time it takes for a hash request to time out
|
|
|
|
|
hashTTL = 5 * time.Second // Time it takes for a hash request to time out
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
|
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
|
|
|
|
|
blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out
|
|
|
|
|
blockTTL = 5 * time.Second // Time it takes for a block request to time out
|
|
|
|
|
crossCheckCycle = time.Second // Period after which to check for expired cross checks
|
|
|
|
|
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
|
errLowTd = errors.New("peer's TD is too low") |
|
|
|
|
ErrBusy = errors.New("busy") |
|
|
|
|
errUnknownPeer = errors.New("peer's unknown or unhealthy") |
|
|
|
@ -34,6 +39,7 @@ var ( |
|
|
|
|
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") |
|
|
|
|
errAlreadyInPool = errors.New("hash already in pool") |
|
|
|
|
ErrInvalidChain = errors.New("retrieved hash chain is invalid") |
|
|
|
|
ErrCrossCheckFailed = errors.New("block cross-check failed") |
|
|
|
|
errCancelHashFetch = errors.New("hash fetching cancelled (requested)") |
|
|
|
|
errCancelBlockFetch = errors.New("block downloading cancelled (requested)") |
|
|
|
|
errNoSyncActive = errors.New("no sync active") |
|
|
|
@ -220,46 +226,47 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { |
|
|
|
|
|
|
|
|
|
start := time.Now() |
|
|
|
|
|
|
|
|
|
// Add the hash to the queue first
|
|
|
|
|
// Add the hash to the queue first, and start hash retrieval
|
|
|
|
|
d.queue.Insert([]common.Hash{h}) |
|
|
|
|
|
|
|
|
|
// Get the first batch of hashes
|
|
|
|
|
p.getHashes(h) |
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
|
failureResponseTimer = time.NewTimer(hashTtl) |
|
|
|
|
attemptedPeers = make(map[string]bool) // attempted peers will help with retries
|
|
|
|
|
activePeer = p // active peer will help determine the current active peer
|
|
|
|
|
hash common.Hash // common and last hash
|
|
|
|
|
active = p // active peer will help determine the current active peer
|
|
|
|
|
head = common.Hash{} // common and last hash
|
|
|
|
|
|
|
|
|
|
timeout = time.NewTimer(hashTTL) // timer to dump a non-responsive active peer
|
|
|
|
|
attempted = make(map[string]bool) // attempted peers will help with retries
|
|
|
|
|
crossChecks = make(map[common.Hash]time.Time) // running cross checks and their deadline
|
|
|
|
|
crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks
|
|
|
|
|
) |
|
|
|
|
attemptedPeers[p.id] = true |
|
|
|
|
defer crossTicker.Stop() |
|
|
|
|
|
|
|
|
|
out: |
|
|
|
|
for { |
|
|
|
|
attempted[p.id] = true |
|
|
|
|
for finished := false; !finished; { |
|
|
|
|
select { |
|
|
|
|
case <-d.cancelCh: |
|
|
|
|
return errCancelHashFetch |
|
|
|
|
|
|
|
|
|
case hashPack := <-d.hashCh: |
|
|
|
|
// Make sure the active peer is giving us the hashes
|
|
|
|
|
if hashPack.peerId != activePeer.id { |
|
|
|
|
if hashPack.peerId != active.id { |
|
|
|
|
glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)\n", hashPack.peerId) |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
failureResponseTimer.Reset(hashTtl) |
|
|
|
|
timeout.Reset(hashTTL) |
|
|
|
|
|
|
|
|
|
// Make sure the peer actually gave something valid
|
|
|
|
|
if len(hashPack.hashes) == 0 { |
|
|
|
|
glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", activePeer.id) |
|
|
|
|
glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", active.id) |
|
|
|
|
d.queue.Reset() |
|
|
|
|
|
|
|
|
|
return errEmptyHashSet |
|
|
|
|
} |
|
|
|
|
// Determine if we're done fetching hashes (queue up all pending), and continue if not done
|
|
|
|
|
done, index := false, 0 |
|
|
|
|
for index, hash = range hashPack.hashes { |
|
|
|
|
if d.hasBlock(hash) || d.queue.GetBlock(hash) != nil { |
|
|
|
|
glog.V(logger.Debug).Infof("Found common hash %x\n", hash[:4]) |
|
|
|
|
for index, head = range hashPack.hashes { |
|
|
|
|
if d.hasBlock(head) || d.queue.GetBlock(head) != nil { |
|
|
|
|
glog.V(logger.Debug).Infof("Found common hash %x\n", head[:4]) |
|
|
|
|
hashPack.hashes = hashPack.hashes[:index] |
|
|
|
|
done = true |
|
|
|
|
break |
|
|
|
@ -267,25 +274,50 @@ out: |
|
|
|
|
} |
|
|
|
|
// Insert all the new hashes, but only continue if got something useful
|
|
|
|
|
inserts := d.queue.Insert(hashPack.hashes) |
|
|
|
|
if inserts == 0 && !done { |
|
|
|
|
glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", activePeer.id) |
|
|
|
|
if len(inserts) == 0 && !done { |
|
|
|
|
glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", active.id) |
|
|
|
|
d.queue.Reset() |
|
|
|
|
|
|
|
|
|
return ErrBadPeer |
|
|
|
|
} |
|
|
|
|
if !done { |
|
|
|
|
activePeer.getHashes(hash) |
|
|
|
|
// Try and fetch a random block to verify the hash batch
|
|
|
|
|
cross := inserts[rand.Intn(len(inserts))] |
|
|
|
|
glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross) |
|
|
|
|
|
|
|
|
|
crossChecks[cross] = time.Now().Add(blockTTL) |
|
|
|
|
active.getBlocks([]common.Hash{cross}) |
|
|
|
|
|
|
|
|
|
// Also fetch a fresh
|
|
|
|
|
active.getHashes(head) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
// We're done, allocate the download cache and proceed pulling the blocks
|
|
|
|
|
offset := 0 |
|
|
|
|
if block := d.getBlock(hash); block != nil { |
|
|
|
|
if block := d.getBlock(head); block != nil { |
|
|
|
|
offset = int(block.NumberU64() + 1) |
|
|
|
|
} |
|
|
|
|
d.queue.Alloc(offset) |
|
|
|
|
break out |
|
|
|
|
finished = true |
|
|
|
|
|
|
|
|
|
case blockPack := <-d.blockCh: |
|
|
|
|
// Cross check the block with the random verifications
|
|
|
|
|
if blockPack.peerId != active.id || len(blockPack.blocks) != 1 { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
hash := blockPack.blocks[0].Hash() |
|
|
|
|
delete(crossChecks, hash) |
|
|
|
|
|
|
|
|
|
case <-crossTicker.C: |
|
|
|
|
// Iterate over all the cross checks and fail the hash chain if they're not verified
|
|
|
|
|
for hash, deadline := range crossChecks { |
|
|
|
|
if time.Now().After(deadline) { |
|
|
|
|
glog.V(logger.Debug).Infof("Cross check timeout for %x", hash) |
|
|
|
|
return ErrCrossCheckFailed |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
case <-failureResponseTimer.C: |
|
|
|
|
case <-timeout.C: |
|
|
|
|
glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id) |
|
|
|
|
|
|
|
|
|
var p *peer // p will be set if a peer can be found
|
|
|
|
@ -293,21 +325,21 @@ out: |
|
|
|
|
// already fetched hash list. This can't guarantee 100% correctness but does
|
|
|
|
|
// a fair job. This is always either correct or false incorrect.
|
|
|
|
|
for _, peer := range d.peers.AllPeers() { |
|
|
|
|
if d.queue.Has(peer.head) && !attemptedPeers[peer.id] { |
|
|
|
|
if d.queue.Has(peer.head) && !attempted[peer.id] { |
|
|
|
|
p = peer |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// if all peers have been tried, abort the process entirely or if the hash is
|
|
|
|
|
// the zero hash.
|
|
|
|
|
if p == nil || (hash == common.Hash{}) { |
|
|
|
|
if p == nil || (head == common.Hash{}) { |
|
|
|
|
d.queue.Reset() |
|
|
|
|
return ErrTimeout |
|
|
|
|
} |
|
|
|
|
// set p to the active peer. this will invalidate any hashes that may be returned
|
|
|
|
|
// by our previous (delayed) peer.
|
|
|
|
|
activePeer = p |
|
|
|
|
p.getHashes(hash) |
|
|
|
|
active = p |
|
|
|
|
p.getHashes(head) |
|
|
|
|
glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -359,7 +391,7 @@ out: |
|
|
|
|
// that badly or poorly behave are removed from the peer set (not banned).
|
|
|
|
|
// Bad peers are excluded from the available peer set and therefor won't be
|
|
|
|
|
// reused. XXX We could re-introduce peers after X time.
|
|
|
|
|
badPeers := d.queue.Expire(blockTtl) |
|
|
|
|
badPeers := d.queue.Expire(blockTTL) |
|
|
|
|
for _, pid := range badPeers { |
|
|
|
|
// XXX We could make use of a reputation system here ranking peers
|
|
|
|
|
// in their performance
|
|
|
|
|