@ -48,23 +48,17 @@ var (
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
MaxStateFetch = 384 // Amount of node state values to allow fetching per request
MaxForkAncestry = 3 * params . EpochDuration . Uint64 ( ) // Maximum chain reorganisation
hashTTL = 3 * time . Second // [eth/61] Time it takes for a hash request to time out
blockTargetRTT = 3 * time . Second / 2 // [eth/61] Target time for completing a block retrieval request
blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired
rttMinEstimate = 2 * time . Second // Minimum round-trip time to target for download requests
rttMaxEstimate = 20 * time . Second // Maximum rount-trip time to target for download requests
rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
ttlLimit = time . Minute // Maximum TTL allowance to prevent reaching crazy timeouts
MaxForkAncestry = 3 * params . EpochDuration . Uint64 ( ) // Maximum chain reorganisation
rttMinEstimate = 2 * time . Second // Minimum round-trip time to target for download requests
rttMaxEstimate = 20 * time . Second // Maximum rount-trip time to target for download requests
rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
ttlLimit = time . Minute // Maximum TTL allowance to prevent reaching crazy timeouts
qosTuningPeers = 5 // Number of peers to tune based on (best peers)
qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
maxQueuedHashes = 32 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
maxResultsProcess = 2048 // Number of content download results to import at once into the chain
@ -84,16 +78,13 @@ var (
errStallingPeer = errors . New ( "peer is stalling" )
errNoPeers = errors . New ( "no peers to keep download active" )
errTimeout = errors . New ( "timeout" )
errEmptyHashSet = errors . New ( "empty hash set by peer" )
errEmptyHeaderSet = errors . New ( "empty header set by peer" )
errPeersUnavailable = errors . New ( "no peers available or all tried for download" )
errAlreadyInPool = errors . New ( "hash already in pool" )
errInvalidAncestor = errors . New ( "retrieved ancestor is invalid" )
errInvalidChain = errors . New ( "retrieved hash chain is invalid" )
errInvalidBlock = errors . New ( "retrieved block is invalid" )
errInvalidBody = errors . New ( "retrieved block body is invalid" )
errInvalidReceipt = errors . New ( "retrieved receipt is invalid" )
errCancelHashFetch = errors . New ( "hash download canceled (requested)" )
errCancelBlockFetch = errors . New ( "block download canceled (requested)" )
errCancelHeaderFetch = errors . New ( "block header download canceled (requested)" )
errCancelBodyFetch = errors . New ( "block body download canceled (requested)" )
@ -102,6 +93,7 @@ var (
errCancelHeaderProcessing = errors . New ( "header processing canceled (requested)" )
errCancelContentProcessing = errors . New ( "content processing canceled (requested)" )
errNoSyncActive = errors . New ( "no sync active" )
errTooOld = errors . New ( "peer doesn't speak recent enough protocol version (need version >= 62)" )
)
type Downloader struct {
@ -146,20 +138,19 @@ type Downloader struct {
// Channels
newPeerCh chan * peer
hashCh chan dataPack // [eth/61] Channel receiving inbound hashes
blockCh chan dataPack // [eth/61] Channel receiving inbound blocks
headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks
bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
headerProcCh chan [ ] * types . Header // [eth/62] Channel to feed the header processor new tasks
// Cancellation and termination
cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
cancelCh chan struct { } // Channel to cancel mid-flight syncs
cancelLock sync . RWMutex // Lock to protect the cancel channel in delivers
cancelLock sync . RWMutex // Lock to protect the cancel channel and peer in delivers
quitCh chan struct { } // Quit channel to signal termination
quitLock sync . RWMutex // Lock to prevent double closes
@ -199,13 +190,10 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha
rollback : rollback ,
dropPeer : dropPeer ,
newPeerCh : make ( chan * peer , 1 ) ,
hashCh : make ( chan dataPack , 1 ) ,
blockCh : make ( chan dataPack , 1 ) ,
headerCh : make ( chan dataPack , 1 ) ,
bodyCh : make ( chan dataPack , 1 ) ,
receiptCh : make ( chan dataPack , 1 ) ,
stateCh : make ( chan dataPack , 1 ) ,
blockWakeCh : make ( chan bool , 1 ) ,
bodyWakeCh : make ( chan bool , 1 ) ,
receiptWakeCh : make ( chan bool , 1 ) ,
stateWakeCh : make ( chan bool , 1 ) ,
@ -250,13 +238,12 @@ func (d *Downloader) Synchronising() bool {
// RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from.
func ( d * Downloader ) RegisterPeer ( id string , version int , head common . Hash ,
getRelHashes relativeHashFetcherFn , getAbsHashes absoluteHashFetcherFn , getBlocks blockFetcherFn , // eth/61 callbacks, remove when upgrading
func ( d * Downloader ) RegisterPeer ( id string , version int , currentHead currentHeadRetrievalFn ,
getRelHeaders relativeHeaderFetcherFn , getAbsHeaders absoluteHeaderFetcherFn , getBlockBodies blockBodyFetcherFn ,
getReceipts receiptFetcherFn , getNodeData stateFetcherFn ) error {
glog . V ( logger . Detail ) . Infoln ( "Registering peer" , id )
if err := d . peers . Register ( newPeer ( id , version , head , getRelHashes , getAbsHashes , getBlocks , getRelHeaders , getAbsHeaders , getBlockBodies , getReceipts , getNodeData ) ) ; err != nil {
if err := d . peers . Register ( newPeer ( id , version , currentHead , getRelHeaders , getAbsHeaders , getBlockBodies , getReceipts , getNodeData ) ) ; err != nil {
glog . V ( logger . Error ) . Infoln ( "Register failed:" , err )
return err
}
@ -269,12 +256,22 @@ func (d *Downloader) RegisterPeer(id string, version int, head common.Hash,
// the specified peer. An effort is also made to return any pending fetches into
// the queue.
func ( d * Downloader ) UnregisterPeer ( id string ) error {
// Unregister the peer from the active peer set and revoke any fetch tasks
glog . V ( logger . Detail ) . Infoln ( "Unregistering peer" , id )
if err := d . peers . Unregister ( id ) ; err != nil {
glog . V ( logger . Error ) . Infoln ( "Unregister failed:" , err )
return err
}
d . queue . Revoke ( id )
// If this peer was the master peer, abort sync immediately
d . cancelLock . RLock ( )
master := id == d . cancelPeer
d . cancelLock . RUnlock ( )
if master {
d . cancel ( )
}
return nil
}
@ -291,7 +288,9 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
case errBusy :
glog . V ( logger . Detail ) . Infof ( "Synchronisation already in progress" )
case errTimeout , errBadPeer , errStallingPeer , errEmptyHashSet , errEmptyHeaderSet , errPeersUnavailable , errInvalidAncestor , errInvalidChain :
case errTimeout , errBadPeer , errStallingPeer ,
errEmptyHeaderSet , errPeersUnavailable , errTooOld ,
errInvalidAncestor , errInvalidChain :
glog . V ( logger . Debug ) . Infof ( "Removing peer %v: %v" , id , err )
d . dropPeer ( id )
@ -323,13 +322,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
d . queue . Reset ( )
d . peers . Reset ( )
for _ , ch := range [ ] chan bool { d . blockWakeCh , d . b odyWakeCh , d . receiptWakeCh , d . stateWakeCh } {
for _ , ch := range [ ] chan bool { d . bodyWakeCh , d . receiptWakeCh , d . stateWakeCh } {
select {
case <- ch :
default :
}
}
for _ , ch := range [ ] chan dataPack { d . hashCh , d . blockCh , d . h eaderCh , d . bodyCh , d . receiptCh , d . stateCh } {
for _ , ch := range [ ] chan dataPack { d . headerCh , d . bodyCh , d . receiptCh , d . stateCh } {
for empty := false ; ! empty ; {
select {
case <- ch :
@ -345,9 +344,10 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
empty = true
}
}
// Create cancel channel for aborting mid-flight
// Create cancel channel for aborting mid-flight and mark the master peer
d . cancelLock . Lock ( )
d . cancelCh = make ( chan struct { } )
d . cancelPeer = id
d . cancelLock . Unlock ( )
defer d . cancel ( ) // No matter what, we can't leave the cancel channel open
@ -377,105 +377,73 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
d . mux . Post ( DoneEvent { } )
}
} ( )
if p . version < 62 {
return errTooOld
}
glog . V ( logger . Debug ) . Infof ( "Synchronising with the network using: %s [eth/%d]" , p . id , p . version )
defer func ( start time . Time ) {
glog . V ( logger . Debug ) . Infof ( "Synchronisation terminated after %v" , time . Since ( start ) )
} ( time . Now ( ) )
switch {
case p . version == 61 :
// Look up the sync boundaries: the common ancestor and the target block
latest , err := d . fetchHeight61 ( p )
if err != nil {
return err
}
origin , err := d . findAncestor61 ( p , latest )
if err != nil {
return err
}
d . syncStatsLock . Lock ( )
if d . syncStatsChainHeight <= origin || d . syncStatsChainOrigin > origin {
d . syncStatsChainOrigin = origin
}
d . syncStatsChainHeight = latest
d . syncStatsLock . Unlock ( )
// Look up the sync boundaries: the common ancestor and the target block
latest , err := d . fetchHeight ( p )
if err != nil {
return err
}
height := latest . Number . Uint64 ( )
// Initiate the sync using a concurrent hash and block retrieval algorithm
d . queue . Prepare ( origin + 1 , d . mode , 0 , nil )
if d . syncInitHook != nil {
d . syncInitHook ( origin , latest )
}
return d . spawnSync ( origin + 1 ,
func ( ) error { return d . fetchHashes61 ( p , td , origin + 1 ) } ,
func ( ) error { return d . fetchBlocks61 ( origin + 1 ) } ,
)
case p . version >= 62 :
// Look up the sync boundaries: the common ancestor and the target block
latest , err := d . fetchHeight ( p )
if err != nil {
return err
}
height := latest . Number . Uint64 ( )
origin , err := d . findAncestor ( p , height )
if err != nil {
return err
}
d . syncStatsLock . Lock ( )
if d . syncStatsChainHeight <= origin || d . syncStatsChainOrigin > origin {
d . syncStatsChainOrigin = origin
}
d . syncStatsChainHeight = height
d . syncStatsLock . Unlock ( )
origin , err := d . findAncestor ( p , height )
if err != nil {
return err
}
d . syncStatsLock . Lock ( )
if d . syncStatsChainHeight <= origin || d . syncStatsChainOrigin > origin {
d . syncStatsChainOrigin = origin
}
d . syncStatsChainHeight = height
d . syncStatsLock . Unlock ( )
// Initiate the sync using a concurrent header and content retrieval algorithm
pivot := uint64 ( 0 )
switch d . mode {
case LightSync :
pivot = height
case FastSync :
// Calculate the new fast/slow sync pivot point
if d . fsPivotLock == nil {
pivotOffset , err := rand . Int ( rand . Reader , big . NewInt ( int64 ( fsPivotInterval ) ) )
if err != nil {
panic ( fmt . Sprintf ( "Failed to access crypto random source: %v" , err ) )
}
if height > uint64 ( fsMinFullBlocks ) + pivotOffset . Uint64 ( ) {
pivot = height - uint64 ( fsMinFullBlocks ) - pivotOffset . Uint64 ( )
}
} else {
// Pivot point locked in, use this and do not pick a new one!
pivot = d . fsPivotLock . Number . Uint64 ( )
// Initiate the sync using a concurrent header and content retrieval algorithm
pivot := uint64 ( 0 )
switch d . mode {
case LightSync :
pivot = height
case FastSync :
// Calculate the new fast/slow sync pivot point
if d . fsPivotLock == nil {
pivotOffset , err := rand . Int ( rand . Reader , big . NewInt ( int64 ( fsPivotInterval ) ) )
if err != nil {
panic ( fmt . Sprintf ( "Failed to access crypto random source: %v" , err ) )
}
// If the point is below the origin, move origin back to ensure state download
if pivot < origin {
if pivot > 0 {
origin = pivot - 1
} else {
origin = 0
}
if height > uint64 ( fsMinFullBlocks ) + pivotOffset . Uint64 ( ) {
pivot = height - uint64 ( fsMinFullBlocks ) - pivotOffset . Uint64 ( )
}
glog . V ( logger . Debug ) . Infof ( "Fast syncing until pivot block #%d" , pivot )
} else {
// Pivot point locked in, use this and do not pick a new one!
pivot = d . fsPivotLock . Number . Uint64 ( )
}
d . queue . Prepare ( origin + 1 , d . mode , pivot , latest )
if d . syncInitHook != nil {
d . syncInitHook ( origin , height )
// If the point is below the origin, move origin back to ensure state download
if pivot < origin {
if pivot > 0 {
origin = pivot - 1
} else {
origin = 0
}
}
return d . spawnSync ( origin + 1 ,
func ( ) error { return d . fetchHeaders ( p , origin + 1 ) } , // Headers are always retrieved
func ( ) error { return d . processHeaders ( origin + 1 , td ) } , // Headers are always retrieved
func ( ) error { return d . fetchBodies ( origin + 1 ) } , // Bodies are retrieved during normal and fast sync
func ( ) error { return d . fetchReceipts ( origin + 1 ) } , // Receipts are retrieved during fast sync
func ( ) error { return d . fetchNodeData ( ) } , // Node state data is retrieved during fast sync
)
default :
// Something very wrong, stop right here
glog . V ( logger . Error ) . Infof ( "Unsupported eth protocol: %d" , p . version )
return errBadPeer
glog . V ( logger . Debug ) . Infof ( "Fast syncing until pivot block #%d" , pivot )
}
d . queue . Prepare ( origin + 1 , d . mode , pivot , latest )
if d . syncInitHook != nil {
d . syncInitHook ( origin , height )
}
return d . spawnSync ( origin + 1 ,
func ( ) error { return d . fetchHeaders ( p , origin + 1 ) } , // Headers are always retrieved
func ( ) error { return d . processHeaders ( origin + 1 , td ) } , // Headers are always retrieved
func ( ) error { return d . fetchBodies ( origin + 1 ) } , // Bodies are retrieved during normal and fast sync
func ( ) error { return d . fetchReceipts ( origin + 1 ) } , // Receipts are retrieved during fast sync
func ( ) error { return d . fetchNodeData ( ) } , // Node state data is retrieved during fast sync
)
}
// spawnSync runs d.process and all given fetcher functions to completion in
@ -540,459 +508,14 @@ func (d *Downloader) Terminate() {
d . cancel ( )
}
// fetchHeight61 retrieves the head block of the remote peer to aid in estimating
// the total time a pending synchronisation would take.
func ( d * Downloader ) fetchHeight61 ( p * peer ) ( uint64 , error ) {
glog . V ( logger . Debug ) . Infof ( "%v: retrieving remote chain height" , p )
// Request the advertised remote head block and wait for the response
go p . getBlocks ( [ ] common . Hash { p . head } )
timeout := time . After ( hashTTL )
for {
select {
case <- d . cancelCh :
return 0 , errCancelBlockFetch
case packet := <- d . blockCh :
// Discard anything not from the origin peer
if packet . PeerId ( ) != p . id {
glog . V ( logger . Debug ) . Infof ( "Received blocks from incorrect peer(%s)" , packet . PeerId ( ) )
break
}
// Make sure the peer actually gave something valid
blocks := packet . ( * blockPack ) . blocks
if len ( blocks ) != 1 {
glog . V ( logger . Debug ) . Infof ( "%v: invalid number of head blocks: %d != 1" , p , len ( blocks ) )
return 0 , errBadPeer
}
return blocks [ 0 ] . NumberU64 ( ) , nil
case <- timeout :
glog . V ( logger . Debug ) . Infof ( "%v: head block timeout" , p )
return 0 , errTimeout
case <- d . hashCh :
// Out of bounds hashes received, ignore them
case <- d . headerCh :
case <- d . bodyCh :
case <- d . stateCh :
case <- d . receiptCh :
// Ignore eth/{62,63} packets because this is eth/61.
// These can arrive as a late delivery from a previous sync.
}
}
}
// findAncestor61 tries to locate the common ancestor block of the local chain and
// a remote peers blockchain. In the general case when our node was in sync and
// on the correct chain, checking the top N blocks should already get us a match.
// In the rare scenario when we ended up on a long reorganisation (i.e. none of
// the head blocks match), we do a binary search to find the common ancestor.
func ( d * Downloader ) findAncestor61 ( p * peer , height uint64 ) ( uint64 , error ) {
glog . V ( logger . Debug ) . Infof ( "%v: looking for common ancestor" , p )
// Figure out the valid ancestor range to prevent rewrite attacks
floor , ceil := int64 ( - 1 ) , d . headBlock ( ) . NumberU64 ( )
if ceil >= MaxForkAncestry {
floor = int64 ( ceil - MaxForkAncestry )
}
// Request the topmost blocks to short circuit binary ancestor lookup
head := ceil
if head > height {
head = height
}
from := int64 ( head ) - int64 ( MaxHashFetch ) + 1
if from < 0 {
from = 0
}
go p . getAbsHashes ( uint64 ( from ) , MaxHashFetch )
// Wait for the remote response to the head fetch
number , hash := uint64 ( 0 ) , common . Hash { }
timeout := time . After ( hashTTL )
for finished := false ; ! finished ; {
select {
case <- d . cancelCh :
return 0 , errCancelHashFetch
case packet := <- d . hashCh :
// Discard anything not from the origin peer
if packet . PeerId ( ) != p . id {
glog . V ( logger . Debug ) . Infof ( "Received hashes from incorrect peer(%s)" , packet . PeerId ( ) )
break
}
// Make sure the peer actually gave something valid
hashes := packet . ( * hashPack ) . hashes
if len ( hashes ) == 0 {
glog . V ( logger . Debug ) . Infof ( "%v: empty head hash set" , p )
return 0 , errEmptyHashSet
}
// Check if a common ancestor was found
finished = true
for i := len ( hashes ) - 1 ; i >= 0 ; i -- {
// Skip any headers that underflow/overflow our requested set
header := d . getHeader ( hashes [ i ] )
if header == nil || header . Number . Int64 ( ) < from || header . Number . Uint64 ( ) > head {
continue
}
// Otherwise check if we already know the header or not
if d . hasBlockAndState ( hashes [ i ] ) {
number , hash = header . Number . Uint64 ( ) , header . Hash ( )
break
}
}
case <- timeout :
glog . V ( logger . Debug ) . Infof ( "%v: head hash timeout" , p )
return 0 , errTimeout
case <- d . blockCh :
// Out of bounds blocks received, ignore them
case <- d . headerCh :
case <- d . bodyCh :
case <- d . stateCh :
case <- d . receiptCh :
// Ignore eth/{62,63} packets because this is eth/61.
// These can arrive as a late delivery from a previous sync.
}
}
// If the head fetch already found an ancestor, return
if ! common . EmptyHash ( hash ) {
if int64 ( number ) <= floor {
glog . V ( logger . Warn ) . Infof ( "%v: potential rewrite attack: #%d [%x…] <= #%d limit" , p , number , hash [ : 4 ] , floor )
return 0 , errInvalidAncestor
}
glog . V ( logger . Debug ) . Infof ( "%v: common ancestor: #%d [%x…]" , p , number , hash [ : 4 ] )
return number , nil
}
// Ancestor not found, we need to binary search over our chain
start , end := uint64 ( 0 ) , head
if floor > 0 {
start = uint64 ( floor )
}
for start + 1 < end {
// Split our chain interval in two, and request the hash to cross check
check := ( start + end ) / 2
timeout := time . After ( hashTTL )
go p . getAbsHashes ( uint64 ( check ) , 1 )
// Wait until a reply arrives to this request
for arrived := false ; ! arrived ; {
select {
case <- d . cancelCh :
return 0 , errCancelHashFetch
case packet := <- d . hashCh :
// Discard anything not from the origin peer
if packet . PeerId ( ) != p . id {
glog . V ( logger . Debug ) . Infof ( "Received hashes from incorrect peer(%s)" , packet . PeerId ( ) )
break
}
// Make sure the peer actually gave something valid
hashes := packet . ( * hashPack ) . hashes
if len ( hashes ) != 1 {
glog . V ( logger . Debug ) . Infof ( "%v: invalid search hash set (%d)" , p , len ( hashes ) )
return 0 , errBadPeer
}
arrived = true
// Modify the search interval based on the response
if ! d . hasBlockAndState ( hashes [ 0 ] ) {
end = check
break
}
block := d . getBlock ( hashes [ 0 ] ) // this doesn't check state, hence the above explicit check
if block . NumberU64 ( ) != check {
glog . V ( logger . Debug ) . Infof ( "%v: non requested hash #%d [%x…], instead of #%d" , p , block . NumberU64 ( ) , block . Hash ( ) . Bytes ( ) [ : 4 ] , check )
return 0 , errBadPeer
}
start = check
case <- timeout :
glog . V ( logger . Debug ) . Infof ( "%v: search hash timeout" , p )
return 0 , errTimeout
case <- d . blockCh :
// Out of bounds blocks received, ignore them
case <- d . headerCh :
case <- d . bodyCh :
case <- d . stateCh :
case <- d . receiptCh :
// Ignore eth/{62,63} packets because this is eth/61.
// These can arrive as a late delivery from a previous sync.
}
}
}
// Ensure valid ancestry and return
if int64 ( start ) <= floor {
glog . V ( logger . Warn ) . Infof ( "%v: potential rewrite attack: #%d [%x…] <= #%d limit" , p , start , hash [ : 4 ] , floor )
return 0 , errInvalidAncestor
}
glog . V ( logger . Debug ) . Infof ( "%v: common ancestor: #%d [%x…]" , p , start , hash [ : 4 ] )
return start , nil
}
// fetchHashes61 keeps retrieving hashes from the requested number, until no more
// are returned, potentially throttling on the way.
func ( d * Downloader ) fetchHashes61 ( p * peer , td * big . Int , from uint64 ) error {
glog . V ( logger . Debug ) . Infof ( "%v: downloading hashes from #%d" , p , from )
// Create a timeout timer, and the associated hash fetcher
request := time . Now ( ) // time of the last fetch request
timeout := time . NewTimer ( 0 ) // timer to dump a non-responsive active peer
<- timeout . C // timeout channel should be initially empty
defer timeout . Stop ( )
getHashes := func ( from uint64 ) {
glog . V ( logger . Detail ) . Infof ( "%v: fetching %d hashes from #%d" , p , MaxHashFetch , from )
request = time . Now ( )
timeout . Reset ( hashTTL )
go p . getAbsHashes ( from , MaxHashFetch )
}
// Start pulling hashes, until all are exhausted
getHashes ( from )
gotHashes := false
for {
select {
case <- d . cancelCh :
return errCancelHashFetch
case packet := <- d . hashCh :
// Make sure the active peer is giving us the hashes
if packet . PeerId ( ) != p . id {
glog . V ( logger . Debug ) . Infof ( "Received hashes from incorrect peer(%s)" , packet . PeerId ( ) )
break
}
hashReqTimer . UpdateSince ( request )
timeout . Stop ( )
// If no more hashes are inbound, notify the block fetcher and return
if packet . Items ( ) == 0 {
glog . V ( logger . Debug ) . Infof ( "%v: no available hashes" , p )
select {
case d . blockWakeCh <- false :
case <- d . cancelCh :
}
// If no hashes were retrieved at all, the peer violated it's TD promise that it had a
// better chain compared to ours. The only exception is if it's promised blocks were
// already imported by other means (e.g. fetcher):
//
// R <remote peer>, L <local node>: Both at block 10
// R: Mine block 11, and propagate it to L
// L: Queue block 11 for import
// L: Notice that R's head and TD increased compared to ours, start sync
// L: Import of block 11 finishes
// L: Sync begins, and finds common ancestor at 11
// L: Request new hashes up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
if ! gotHashes && td . Cmp ( d . getTd ( d . headBlock ( ) . Hash ( ) ) ) > 0 {
return errStallingPeer
}
return nil
}
gotHashes = true
hashes := packet . ( * hashPack ) . hashes
// Otherwise insert all the new hashes, aborting in case of junk
glog . V ( logger . Detail ) . Infof ( "%v: scheduling %d hashes from #%d" , p , len ( hashes ) , from )
inserts := d . queue . Schedule61 ( hashes , true )
if len ( inserts ) != len ( hashes ) {
glog . V ( logger . Debug ) . Infof ( "%v: stale hashes" , p )
return errBadPeer
}
// Notify the block fetcher of new hashes, but stop if queue is full
if d . queue . PendingBlocks ( ) < maxQueuedHashes {
// We still have hashes to fetch, send continuation wake signal (potential)
select {
case d . blockWakeCh <- true :
default :
}
} else {
// Hash limit reached, send a termination wake signal (enforced)
select {
case d . blockWakeCh <- false :
case <- d . cancelCh :
}
return nil
}
// Queue not yet full, fetch the next batch
from += uint64 ( len ( hashes ) )
getHashes ( from )
case <- timeout . C :
glog . V ( logger . Debug ) . Infof ( "%v: hash request timed out" , p )
hashTimeoutMeter . Mark ( 1 )
return errTimeout
case <- d . headerCh :
case <- d . bodyCh :
case <- d . stateCh :
case <- d . receiptCh :
// Ignore eth/{62,63} packets because this is eth/61.
// These can arrive as a late delivery from a previous sync.
}
}
}
// fetchBlocks61 iteratively downloads the scheduled hashes, taking any available
// peers, reserving a chunk of blocks for each, waiting for delivery and also
// periodically checking for timeouts.
func ( d * Downloader ) fetchBlocks61 ( from uint64 ) error {
glog . V ( logger . Debug ) . Infof ( "Downloading blocks from #%d" , from )
defer glog . V ( logger . Debug ) . Infof ( "Block download terminated" )
// Create a timeout timer for scheduling expiration tasks
ticker := time . NewTicker ( 100 * time . Millisecond )
defer ticker . Stop ( )
update := make ( chan struct { } , 1 )
// Fetch blocks until the hash fetcher's done
finished := false
for {
select {
case <- d . cancelCh :
return errCancelBlockFetch
case packet := <- d . blockCh :
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
if peer := d . peers . Peer ( packet . PeerId ( ) ) ; peer != nil {
blocks := packet . ( * blockPack ) . blocks
// Deliver the received chunk of blocks and check chain validity
accepted , err := d . queue . DeliverBlocks ( peer . id , blocks )
if err == errInvalidChain {
return err
}
// Unless a peer delivered something completely else than requested (usually
// caused by a timed out request which came through in the end), set it to
// idle. If the delivery's stale, the peer should have already been idled.
if err != errStaleDelivery {
peer . SetBlocksIdle ( accepted )
}
// Issue a log to the user to see what's going on
switch {
case err == nil && len ( blocks ) == 0 :
glog . V ( logger . Detail ) . Infof ( "%s: no blocks delivered" , peer )
case err == nil :
glog . V ( logger . Detail ) . Infof ( "%s: delivered %d blocks" , peer , len ( blocks ) )
default :
glog . V ( logger . Detail ) . Infof ( "%s: delivery failed: %v" , peer , err )
}
}
// Blocks arrived, try to update the progress
select {
case update <- struct { } { } :
default :
}
case cont := <- d . blockWakeCh :
// The hash fetcher sent a continuation flag, check if it's done
if ! cont {
finished = true
}
// Hashes arrive, try to update the progress
select {
case update <- struct { } { } :
default :
}
case <- ticker . C :
// Sanity check update the progress
select {
case update <- struct { } { } :
default :
}
case <- update :
// Short circuit if we lost all our peers
if d . peers . Len ( ) == 0 {
return errNoPeers
}
// Check for block request timeouts and demote the responsible peers
for pid , fails := range d . queue . ExpireBlocks ( blockTTL ) {
if peer := d . peers . Peer ( pid ) ; peer != nil {
if fails > 1 {
glog . V ( logger . Detail ) . Infof ( "%s: block delivery timeout" , peer )
peer . SetBlocksIdle ( 0 )
} else {
glog . V ( logger . Debug ) . Infof ( "%s: stalling block delivery, dropping" , peer )
d . dropPeer ( pid )
}
}
}
// If there's nothing more to fetch, wait or terminate
if d . queue . PendingBlocks ( ) == 0 {
if ! d . queue . InFlightBlocks ( ) && finished {
glog . V ( logger . Debug ) . Infof ( "Block fetching completed" )
return nil
}
break
}
// Send a download request to all idle peers, until throttled
throttled := false
idles , total := d . peers . BlockIdlePeers ( )
for _ , peer := range idles {
// Short circuit if throttling activated
if d . queue . ShouldThrottleBlocks ( ) {
throttled = true
break
}
// Reserve a chunk of hashes for a peer. A nil can mean either that
// no more hashes are available, or that the peer is known not to
// have them.
request := d . queue . ReserveBlocks ( peer , peer . BlockCapacity ( blockTargetRTT ) )
if request == nil {
continue
}
if glog . V ( logger . Detail ) {
glog . Infof ( "%s: requesting %d blocks" , peer , len ( request . Hashes ) )
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if err := peer . Fetch61 ( request ) ; err != nil {
// Although we could try and make an attempt to fix this, this error really
// means that we've double allocated a fetch task to a peer. If that is the
// case, the internal state of the downloader and the queue is very wrong so
// better hard crash and note the error instead of silently accumulating into
// a much bigger issue.
panic ( fmt . Sprintf ( "%v: fetch assignment failed" , peer ) )
}
}
// Make sure that we have peers available for fetching. If all peers have been tried
// and all failed throw an error
if ! throttled && ! d . queue . InFlightBlocks ( ) && len ( idles ) == total {
return errPeersUnavailable
}
case <- d . headerCh :
case <- d . bodyCh :
case <- d . stateCh :
case <- d . receiptCh :
// Ignore eth/{62,63} packets because this is eth/61.
// These can arrive as a late delivery from a previous sync.
}
}
}
// fetchHeight retrieves the head header of the remote peer to aid in estimating
// the total time a pending synchronisation would take.
func ( d * Downloader ) fetchHeight ( p * peer ) ( * types . Header , error ) {
glog . V ( logger . Debug ) . Infof ( "%v: retrieving remote chain height" , p )
// Request the advertised remote head block and wait for the response
go p . getRelHeaders ( p . head , 1 , 0 , false )
head , _ := p . currentHead ( )
go p . getRelHeaders ( head , 1 , 0 , false )
timeout := time . After ( d . requestTTL ( ) )
for {
@ -1022,11 +545,6 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
case <- d . stateCh :
case <- d . receiptCh :
// Out of bounds delivery, ignore
case <- d . hashCh :
case <- d . blockCh :
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
}
}
}
@ -1037,7 +555,7 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
// In the rare scenario when we ended up on a long reorganisation (i.e. none of
// the head links match), we do a binary search to find the common ancestor.
func ( d * Downloader ) findAncestor ( p * peer , height uint64 ) ( uint64 , error ) {
glog . V ( logger . Debug ) . Infof ( "%v: looking for common ancestor" , p )
glog . V ( logger . Debug ) . Infof ( "%v: looking for common ancestor (remote height %d) " , p , height )
// Figure out the valid ancestor range to prevent rewrite attacks
floor , ceil := int64 ( - 1 ) , d . headHeader ( ) . Number . Uint64 ( )
@ -1054,11 +572,17 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
if head > height {
head = height
}
from := int64 ( head ) - int64 ( MaxHeaderFetch ) + 1
from := int64 ( head ) - int64 ( MaxHeaderFetch )
if from < 0 {
from = 0
}
go p . getAbsHeaders ( uint64 ( from ) , MaxHeaderFetch , 0 , false )
// Span out with 15 block gaps into the future to catch bad head reports
limit := 2 * MaxHeaderFetch / 16
count := 1 + int ( ( int64 ( ceil ) - from ) / 16 )
if count > limit {
count = limit
}
go p . getAbsHeaders ( uint64 ( from ) , count , 15 , false )
// Wait for the remote response to the head fetch
number , hash := uint64 ( 0 ) , common . Hash { }
@ -1067,7 +591,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
for finished := false ; ! finished ; {
select {
case <- d . cancelCh :
return 0 , errCancelHash Fetch
return 0 , errCancelHeader Fetch
case packet := <- d . headerCh :
// Discard anything not from the origin peer
@ -1083,12 +607,8 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
}
// Make sure the peer's reply conforms to the request
for i := 0 ; i < len ( headers ) ; i ++ {
if number := headers [ i ] . Number . Int64 ( ) ; number != from + int64 ( i ) {
glog . V ( logger . Warn ) . Infof ( "%v: head header set (item %d) broke chain ordering: requested %d, got %d" , p , i , from + int64 ( i ) , number )
return 0 , errInvalidChain
}
if i > 0 && headers [ i - 1 ] . Hash ( ) != headers [ i ] . ParentHash {
glog . V ( logger . Warn ) . Infof ( "%v: head header set (item %d) broke chain ancestry: expected [%x], got [%x]" , p , i , headers [ i - 1 ] . Hash ( ) . Bytes ( ) [ : 4 ] , headers [ i ] . ParentHash [ : 4 ] )
if number := headers [ i ] . Number . Int64 ( ) ; number != from + int64 ( i ) * 16 {
glog . V ( logger . Warn ) . Infof ( "%v: head header set (item %d) broke chain ordering: requested %d, got %d" , p , i , from + int64 ( i ) * 16 , number )
return 0 , errInvalidChain
}
}
@ -1096,12 +616,18 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
finished = true
for i := len ( headers ) - 1 ; i >= 0 ; i -- {
// Skip any headers that underflow/overflow our requested set
if headers [ i ] . Number . Int64 ( ) < from || headers [ i ] . Number . Uint64 ( ) > head {
if headers [ i ] . Number . Int64 ( ) < from || headers [ i ] . Number . Uint64 ( ) > ceil {
continue
}
// Otherwise check if we already know the header or not
if ( d . mode == FullSync && d . hasBlockAndState ( headers [ i ] . Hash ( ) ) ) || ( d . mode != FullSync && d . hasHeader ( headers [ i ] . Hash ( ) ) ) {
number , hash = headers [ i ] . Number . Uint64 ( ) , headers [ i ] . Hash ( )
// If every header is known, even future ones, the peer straight out lied about its head
if number > height && i == limit - 1 {
glog . V ( logger . Warn ) . Infof ( "%v: lied about chain head: reported %d, found above %d" , p , height , number )
return 0 , errStallingPeer
}
break
}
}
@ -1114,11 +640,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
case <- d . stateCh :
case <- d . receiptCh :
// Out of bounds delivery, ignore
case <- d . hashCh :
case <- d . blockCh :
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
}
}
// If the head fetch already found an ancestor, return
@ -1146,7 +667,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
for arrived := false ; ! arrived ; {
select {
case <- d . cancelCh :
return 0 , errCancelHash Fetch
return 0 , errCancelHeader Fetch
case packer := <- d . headerCh :
// Discard anything not from the origin peer
@ -1182,11 +703,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
case <- d . stateCh :
case <- d . receiptCh :
// Out of bounds delivery, ignore
case <- d . hashCh :
case <- d . blockCh :
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
}
}
}
@ -1305,11 +821,6 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
case <- d . cancelCh :
}
return errBadPeer
case <- d . hashCh :
case <- d . blockCh :
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
}
}
}
@ -1555,7 +1066,14 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
// Check for fetch request timeouts and demote the responsible peers
for pid , fails := range expire ( ) {
if peer := d . peers . Peer ( pid ) ; peer != nil {
if fails > 1 {
// If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps
// ourselves. Only reset to minimal throughput but don't drop just yet. If even the minimal times
// out that sync wise we need to get rid of the peer.
//
// The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth
// and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
if fails > 2 {
glog . V ( logger . Detail ) . Infof ( "%s: %s delivery timeout" , peer , strings . ToLower ( kind ) )
setIdle ( peer , 0 )
} else {
@ -1623,11 +1141,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
if ! progressed && ! throttled && ! running && len ( idles ) == total && pending ( ) > 0 {
return errPeersUnavailable
}
case <- d . hashCh :
case <- d . blockCh :
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
}
}
}
@ -1867,19 +1380,6 @@ func (d *Downloader) processContent() error {
}
}
// DeliverHashes injects a new batch of hashes received from a remote node into
// the download schedule. This is usually invoked through the BlockHashesMsg by
// the protocol handler.
func ( d * Downloader ) DeliverHashes ( id string , hashes [ ] common . Hash ) ( err error ) {
return d . deliver ( id , d . hashCh , & hashPack { id , hashes } , hashInMeter , hashDropMeter )
}
// DeliverBlocks injects a new batch of blocks received from a remote node.
// This is usually invoked through the BlocksMsg by the protocol handler.
func ( d * Downloader ) DeliverBlocks ( id string , blocks [ ] * types . Block ) ( err error ) {
return d . deliver ( id , d . blockCh , & blockPack { id , blocks } , blockInMeter , blockDropMeter )
}
// DeliverHeaders injects a new batch of block headers received from a remote
// node into the download schedule.
func ( d * Downloader ) DeliverHeaders ( id string , headers [ ] * types . Header ) ( err error ) {