@ -23,7 +23,6 @@ import (
"fmt"
"math"
"math/big"
"strings"
"sync"
"sync/atomic"
"time"
@ -248,9 +247,10 @@ func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHea
getRelHeaders relativeHeaderFetcherFn , getAbsHeaders absoluteHeaderFetcherFn , getBlockBodies blockBodyFetcherFn ,
getReceipts receiptFetcherFn , getNodeData stateFetcherFn ) error {
log . Trace ( fmt . Sprint ( "Registering peer" , id ) )
if err := d . peers . Register ( newPeer ( id , version , currentHead , getRelHeaders , getAbsHeaders , getBlockBodies , getReceipts , getNodeData ) ) ; err != nil {
log . Error ( fmt . Sprint ( "Register failed:" , err ) )
logger := log . New ( "peer" , id )
logger . Trace ( "Registering sync peer" )
if err := d . peers . Register ( newPeer ( id , version , currentHead , getRelHeaders , getAbsHeaders , getBlockBodies , getReceipts , getNodeData , logger ) ) ; err != nil {
logger . Error ( "Failed to register sync peer" , "error" , err )
return err
}
d . qosReduceConfidence ( )
@ -263,9 +263,10 @@ func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHea
// the queue.
func ( d * Downloader ) UnregisterPeer ( id string ) error {
// Unregister the peer from the active peer set and revoke any fetch tasks
log . Trace ( fmt . Sprint ( "Unregistering peer" , id ) )
logger := log . New ( "peer" , id )
logger . Trace ( "Unregistering sync peer" )
if err := d . peers . Unregister ( id ) ; err != nil {
log . Error ( fmt . Sprint ( "Unregister failed: ", err ) )
logger . Error ( "Failed to unregister sync peer" , "error ", err )
return err
}
d . queue . Revoke ( id )
@ -284,24 +285,19 @@ func (d *Downloader) UnregisterPeer(id string) error {
// Synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
func ( d * Downloader ) Synchronise ( id string , head common . Hash , td * big . Int , mode SyncMode ) error {
log . Trace ( fmt . Sprintf ( "Attempting synchronisation: %v, head [%x…], TD %v" , id , head [ : 4 ] , td ) )
err := d . synchronise ( id , head , td , mode )
switch err {
case nil :
log . Trace ( fmt . Sprintf ( "Synchronisation completed" ) )
case errBusy :
log . Trace ( fmt . Sprintf ( "Synchronisation already in progress" ) )
case errTimeout , errBadPeer , errStallingPeer ,
errEmptyHeaderSet , errPeersUnavailable , errTooOld ,
errInvalidAncestor , errInvalidChain :
log . Debug ( fmt . Sprintf ( "Removing peer %v: %v ", id , err ) )
log . Warn ( "Synchronisation failed, dropping peer" , "peer ", id , "error" , err )
d . dropPeer ( id )
default :
log . Warn ( fmt . Sprintf ( "Synchronisation failed: %v ", err ) )
log . Warn ( "Synchronisation failed, retrying" , "error ", err )
}
return err
}
@ -322,7 +318,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
// Post a user notification of the sync (only once per session)
if atomic . CompareAndSwapInt32 ( & d . notified , 0 , 1 ) {
log . Info ( fmt . Sprint ( "Block synchronisation started" ) )
log . Info ( "Block synchronisation started" )
}
// Reset the queue, peer set and wake channels to clean any internal leftover state
d . queue . Reset ( )
@ -387,9 +383,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
return errTooOld
}
log . Debug ( fmt . Sprintf ( "Synchronising with the network using: %s [eth/%d]" , p . id , p . version ) )
log . Debug ( "Synchronising with the network" , "peer" , p . id , "eth" , p . version , "head" , hash . Hex ( ) [ 2 : 10 ] , "td" , td , "mode" , syncModeLabels [ d . mode ] )
defer func ( start time . Time ) {
log . Debug ( fmt . Sprintf ( "Synchronisation terminated after %v ", time . Since ( start ) ) )
log . Debug ( "Synchronisation terminated" , "elapsed ", time . Since ( start ) )
} ( time . Now ( ) )
// Look up the sync boundaries: the common ancestor and the target block
@ -437,7 +433,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
origin = 0
}
}
log . Debug ( fmt . Sprintf ( "Fast syncing until pivot block #%d ", pivot ) )
log . Debug ( "Fast syncing until pivot block" , "pivot ", pivot )
}
d . queue . Prepare ( origin + 1 , d . mode , pivot , latest )
if d . syncInitHook != nil {
@ -522,13 +518,14 @@ func (d *Downloader) Terminate() {
// fetchHeight retrieves the head header of the remote peer to aid in estimating
// the total time a pending synchronisation would take.
func ( d * Downloader ) fetchHeight ( p * peer ) ( * types . Header , error ) {
log . Debug ( fmt . Sprintf ( "%v: retrieving remote chain height" , p ) )
p . logger . Debug ( "Retrieving remote chain height" )
// Request the advertised remote head block and wait for the response
head , _ := p . currentHead ( )
go p . getRelHeaders ( head , 1 , 0 , false )
timeout := time . After ( d . requestTTL ( ) )
ttl := d . requestTTL ( )
timeout := time . After ( ttl )
for {
select {
case <- d . cancelCh :
@ -537,19 +534,21 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
case packet := <- d . headerCh :
// Discard anything not from the origin peer
if packet . PeerId ( ) != p . id {
log . Debug ( fmt . Sprintf ( "Received headers from incorrect peer(%s) ", packet . PeerId ( ) ) )
log . Debug ( "Received headers from incorrect peer" , "peer ", packet . PeerId ( ) )
break
}
// Make sure the peer actually gave something valid
headers := packet . ( * headerPack ) . headers
if len ( headers ) != 1 {
log . Debug ( fmt . Sprintf ( "%v: invalid number of head headers: %d != 1" , p , len ( headers ) ) )
p . logger . Debug ( "Multiple headers for single request" , "headers" , len ( headers ) )
return nil , errBadPeer
}
return headers [ 0 ] , nil
head := headers [ 0 ]
p . logger . Debug ( "Remote head header identified" , "number" , head . Number , "hash" , head . Hash ( ) . Hex ( ) [ 2 : 10 ] )
return head , nil
case <- timeout :
log . Debug ( fmt . Sprintf ( "%v: head header timeout" , p ) )
p . logger . Debug ( "Waiting for head header timed out" , "elapsed" , ttl )
return nil , errTimeout
case <- d . bodyCh :
@ -566,10 +565,10 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
// In the rare scenario when we ended up on a long reorganisation (i.e. none of
// the head links match), we do a binary search to find the common ancestor.
func ( d * Downloader ) findAncestor ( p * peer , height uint64 ) ( uint64 , error ) {
log . Debug ( fmt . Sprintf ( "%v: looking for common ancestor (remote height %d)" , p , height ) )
// Figure out the valid ancestor range to prevent rewrite attacks
floor , ceil := int64 ( - 1 ) , d . headHeader ( ) . Number . Uint64 ( )
p . logger . Debug ( "Looking for common ancestor" , "local" , ceil , "remote" , height )
if d . mode == FullSync {
ceil = d . headBlock ( ) . NumberU64 ( )
} else if d . mode == FastSync {
@ -597,7 +596,9 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// Wait for the remote response to the head fetch
number , hash := uint64 ( 0 ) , common . Hash { }
timeout := time . After ( d . requestTTL ( ) )
ttl := d . requestTTL ( )
timeout := time . After ( ttl )
for finished := false ; ! finished ; {
select {
@ -607,19 +608,19 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
case packet := <- d . headerCh :
// Discard anything not from the origin peer
if packet . PeerId ( ) != p . id {
log . Debug ( fmt . Sprintf ( "Received headers from incorrect peer(%s) ", packet . PeerId ( ) ) )
log . Debug ( "Received headers from incorrect peer" , "peer ", packet . PeerId ( ) )
break
}
// Make sure the peer actually gave something valid
headers := packet . ( * headerPack ) . headers
if len ( headers ) == 0 {
log . Warn ( fmt . Sprintf ( "%v: empty head header set" , p ) )
p . logger . Warn ( "Empty head header set" )
return 0 , errEmptyHeaderSet
}
// Make sure the peer's reply conforms to the request
for i := 0 ; i < len ( headers ) ; i ++ {
if number := headers [ i ] . Number . Int64 ( ) ; number != from + int64 ( i ) * 16 {
log . Warn ( fmt . Sprintf ( "%v: head header set (item %d) broke chain ordering: requested %d, got %d" , p , i , from + int64 ( i ) * 16 , number ) )
p . logger . Warn ( "Head headers broke chain ordering" , "index" , i , "requested" , from + int64 ( i ) * 16 , "received" , number )
return 0 , errInvalidChain
}
}
@ -636,7 +637,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// If every header is known, even future ones, the peer straight out lied about its head
if number > height && i == limit - 1 {
log . Warn ( fmt . Sprintf ( "%v: lied about chain head: reported %d, found above %d" , p , height , number ) )
p . logger . Warn ( "Lied about chain head" , "reported" , height , "found" , number )
return 0 , errStallingPeer
}
break
@ -644,7 +645,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
}
case <- timeout :
log . Debug ( fmt . Sprintf ( "%v: head header timeout" , p ) )
p . logger . Debug ( "Waiting for head header timed out" , "elapsed" , ttl )
return 0 , errTimeout
case <- d . bodyCh :
@ -656,10 +657,10 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// If the head fetch already found an ancestor, return
if ! common . EmptyHash ( hash ) {
if int64 ( number ) <= floor {
log . Warn ( fmt . Sprintf ( "%v: potential rewrite attack: #%d [%x…] <= #%d limit" , p , number , hash [ : 4 ] , floor ) )
p . logger . Warn ( "Ancestor below allowance" , "number" , number , "hash" , hash . Hex ( ) [ 2 : 10 ] , "allowance" , floor )
return 0 , errInvalidAncestor
}
log . Debug ( fmt . Sprintf ( "%v: common ancestor: #%d [%x…]" , p , number , hash [ : 4 ] ) )
p . logger . Debug ( "Found common ancestor" , "number" , number , "hash" , hash . Hex ( ) [ 2 : 10 ] )
return number , nil
}
// Ancestor not found, we need to binary search over our chain
@ -671,7 +672,9 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// Split our chain interval in two, and request the hash to cross check
check := ( start + end ) / 2
timeout := time . After ( d . requestTTL ( ) )
ttl := d . requestTTL ( )
timeout := time . After ( ttl )
go p . getAbsHeaders ( uint64 ( check ) , 1 , 0 , false )
// Wait until a reply arrives to this request
@ -683,13 +686,13 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
case packer := <- d . headerCh :
// Discard anything not from the origin peer
if packer . PeerId ( ) != p . id {
log . Debug ( fmt . Sprintf ( "Received headers from incorrect peer(%s) ", packer . PeerId ( ) ) )
log . Debug ( "Received headers from incorrect peer" , "peer ", packer . PeerId ( ) )
break
}
// Make sure the peer actually gave something valid
headers := packer . ( * headerPack ) . headers
if len ( headers ) != 1 {
log . Debug ( fmt . Sprintf ( "%v: invalid search header set (%d)" , p , len ( headers ) ) )
p . logger . Debug ( "Multiple headers for single request" , "headers" , len ( headers ) )
return 0 , errBadPeer
}
arrived = true
@ -701,13 +704,13 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
}
header := d . getHeader ( headers [ 0 ] . Hash ( ) ) // Independent of sync mode, header surely exists
if header . Number . Uint64 ( ) != check {
log . Debug ( fmt . Sprintf ( "%v: non requested header #%d [%x…], instead of #%d" , p , header . Number , header . Hash ( ) . Bytes ( ) [ : 4 ] , check ) )
p . logger . Debug ( "Received non requested header" , "number" , header . Number , "hash" , header . Hash ( ) . Hex ( ) [ 2 : 10 ] , "request" , check )
return 0 , errBadPeer
}
start = check
case <- timeout :
log . Debug ( fmt . Sprintf ( "%v: search header timeout" , p ) )
p . logger . Debug ( "Waiting for search header timed out" , "elapsed" , ttl )
return 0 , errTimeout
case <- d . bodyCh :
@ -719,10 +722,10 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
}
// Ensure valid ancestry and return
if int64 ( start ) <= floor {
log . Warn ( fmt . Sprintf ( "%v: potential rewrite attack: #%d [%x…] <= #%d limit" , p , start , hash [ : 4 ] , floor ) )
p . logger . Warn ( "Ancestor below allowance" , "number" , start , "hash" , hash . Hex ( ) [ 2 : 10 ] , "allowance" , floor )
return 0 , errInvalidAncestor
}
log . Debug ( fmt . Sprintf ( "%v: common ancestor: #%d [%x…]" , p , start , hash [ : 4 ] ) )
p . logger . Debug ( "Found common ancestor" , "number" , start , "hash" , hash . Hex ( ) [ 2 : 10 ] )
return start , nil
}
@ -735,8 +738,8 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// can fill in the skeleton - not even the origin peer - it's assumed invalid and
// the origin is dropped.
func ( d * Downloader ) fetchHeaders ( p * peer , from uint64 ) error {
log . Debug ( fmt . Sprintf ( "%v: directing header downloads from #%d" , p , from ) )
defer log . Debug ( fmt . Sprintf ( "%v: header download terminated" , p ) )
p . logger . Debug ( "Directing header downloads" , "origin" , from )
defer p . logger . Debug ( "Header download terminated" )
// Create a timeout timer, and the associated header fetcher
skeleton := true // Skeleton assembly phase or finishing up
@ -745,15 +748,18 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
<- timeout . C // timeout channel should be initially empty
defer timeout . Stop ( )
var ttl time . Duration
getHeaders := func ( from uint64 ) {
request = time . Now ( )
timeout . Reset ( d . requestTTL ( ) )
ttl = d . requestTTL ( )
timeout . Reset ( ttl )
if skeleton {
log . Trace ( fmt . Sprintf ( "%v: fetching %d skeleton headers from #%d" , p , MaxHeaderFetch , from ) )
p . logger . Trace ( "Fetching skeleton headers" , "count" , MaxHeaderFetch , "from" , from )
go p . getAbsHeaders ( from + uint64 ( MaxHeaderFetch ) - 1 , MaxSkeletonSize , MaxHeaderFetch - 1 , false )
} else {
log . Trace ( fmt . Sprintf ( "%v: fetching %d full headers from #%d" , p , MaxHeaderFetch , from ) )
p . logger . Trace ( "Fetching full headers" , "count" , MaxHeaderFetch , "from" , from )
go p . getAbsHeaders ( from , MaxHeaderFetch , 0 , false )
}
}
@ -768,7 +774,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
case packet := <- d . headerCh :
// Make sure the active peer is giving us the skeleton headers
if packet . PeerId ( ) != p . id {
log . Debug ( fmt . Sprintf ( "Received skeleton headers from incorrect peer (%s) ", packet . PeerId ( ) ) )
log . Debug ( "Received skeleton from incorrect peer" , "peer ", packet . PeerId ( ) )
break
}
headerReqTimer . UpdateSince ( request )
@ -782,7 +788,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
}
// If no more headers are inbound, notify the content fetchers and return
if packet . Items ( ) == 0 {
log . Debug ( fmt . Sprintf ( "%v: no available headers" , p ) )
p . logger . Debug ( "No more headers available" )
select {
case d . headerProcCh <- nil :
return nil
@ -796,7 +802,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
if skeleton {
filled , proced , err := d . fillHeaderSkeleton ( from , headers )
if err != nil {
log . Debug ( fmt . Sprintf ( "%v: skeleton chain invalid: %v" , p , err ) )
p . logger . Debug ( "Skeleton chain invalid" , "error" , err )
return errInvalidChain
}
headers = filled [ proced : ]
@ -804,7 +810,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
}
// Insert all the new headers and fetch the next batch
if len ( headers ) > 0 {
log . Trace ( fmt . Sprintf ( "%v: schedule %d headers from #%d" , p , len ( headers ) , from ) )
p . logger . Trace ( "Scheduling new headers" , "count" , len ( headers ) , "from" , from )
select {
case d . headerProcCh <- headers :
case <- d . cancelCh :
@ -816,7 +822,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
case <- timeout . C :
// Header retrieval timed out, consider the peer bad and drop
log . Debug ( fmt . Sprintf ( "%v: header request timed out" , p ) )
p . logger . Debug ( "Header request timed out" , "elapsed" , ttl )
headerTimeoutMeter . Mark ( 1 )
d . dropPeer ( p . id )
@ -846,7 +852,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
// The method returs the entire filled skeleton and also the number of headers
// already forwarded for processing.
func ( d * Downloader ) fillHeaderSkeleton ( from uint64 , skeleton [ ] * types . Header ) ( [ ] * types . Header , int , error ) {
log . Debug ( fmt . Sprintf ( "Filling up skeleton from #%d ", from ) )
log . Debug ( "Filling up skeleton" , "from ", from )
d . queue . ScheduleSkeleton ( from , skeleton )
var (
@ -865,9 +871,9 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
)
err := d . fetchParts ( errCancelHeaderFetch , d . headerCh , deliver , d . queue . headerContCh , expire ,
d . queue . PendingHeaders , d . queue . InFlightHeaders , throttle , reserve ,
nil , fetch , d . queue . CancelHeaders , capacity , d . peers . HeaderIdlePeers , setIdle , "Header " )
nil , fetch , d . queue . CancelHeaders , capacity , d . peers . HeaderIdlePeers , setIdle , "headers " )
log . Debug ( fmt . Sprintf ( "Skeleton fill terminated: %v ", err ) )
log . Debug ( "Skeleton fill terminated" , "error ", err )
filled , proced := d . queue . RetrieveHeaders ( )
return filled , proced , err
@ -877,7 +883,7 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
// available peers, reserving a chunk of blocks for each, waiting for delivery
// and also periodically checking for timeouts.
func ( d * Downloader ) fetchBodies ( from uint64 ) error {
log . Debug ( fmt . Sprintf ( "Downloading block bodies from #%d ", from ) )
log . Debug ( "Downloading block bodies" , "origin ", from )
var (
deliver = func ( packet dataPack ) ( int , error ) {
@ -891,9 +897,9 @@ func (d *Downloader) fetchBodies(from uint64) error {
)
err := d . fetchParts ( errCancelBodyFetch , d . bodyCh , deliver , d . bodyWakeCh , expire ,
d . queue . PendingBlocks , d . queue . InFlightBlocks , d . queue . ShouldThrottleBlocks , d . queue . ReserveBodies ,
d . bodyFetchHook , fetch , d . queue . CancelBodies , capacity , d . peers . BodyIdlePeers , setIdle , "Body " )
d . bodyFetchHook , fetch , d . queue . CancelBodies , capacity , d . peers . BodyIdlePeers , setIdle , "bodies " )
log . Debug ( fmt . Sprintf ( "Block body download terminated: %v ", err ) )
log . Debug ( "Block body download terminated" , "error ", err )
return err
}
@ -901,7 +907,7 @@ func (d *Downloader) fetchBodies(from uint64) error {
// available peers, reserving a chunk of receipts for each, waiting for delivery
// and also periodically checking for timeouts.
func ( d * Downloader ) fetchReceipts ( from uint64 ) error {
log . Debug ( fmt . Sprintf ( "Downloading receipts from #%d ", from ) )
log . Debug ( "Downloading transaction receipts" , "origin ", from )
var (
deliver = func ( packet dataPack ) ( int , error ) {
@ -915,9 +921,9 @@ func (d *Downloader) fetchReceipts(from uint64) error {
)
err := d . fetchParts ( errCancelReceiptFetch , d . receiptCh , deliver , d . receiptWakeCh , expire ,
d . queue . PendingReceipts , d . queue . InFlightReceipts , d . queue . ShouldThrottleReceipts , d . queue . ReserveReceipts ,
d . receiptFetchHook , fetch , d . queue . CancelReceipts , capacity , d . peers . ReceiptIdlePeers , setIdle , "Receipt " )
d . receiptFetchHook , fetch , d . queue . CancelReceipts , capacity , d . peers . ReceiptIdlePeers , setIdle , "receipts " )
log . Debug ( fmt . Sprintf ( "Receipt download terminated: %v ", err ) )
log . Debug ( "Transaction receipt download terminated" , "error ", err )
return err
}
@ -925,7 +931,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
// available peers, reserving a chunk of nodes for each, waiting for delivery and
// also periodically checking for timeouts.
func ( d * Downloader ) fetchNodeData ( ) error {
log . Debug ( fmt . Sprintf ( "Downloading node state data" ) )
log . Debug ( "Downloading node state data" )
var (
deliver = func ( packet dataPack ) ( int , error ) {
@ -933,12 +939,12 @@ func (d *Downloader) fetchNodeData() error {
return d . queue . DeliverNodeData ( packet . PeerId ( ) , packet . ( * statePack ) . states , func ( delivered int , progressed bool , err error ) {
// If the peer returned old-requested data, forgive
if err == trie . ErrNotRequested {
log . Debug ( fmt . Sprintf ( "peer %s: replied to stale state request, forgiving ", packet . PeerId ( ) ) )
log . Debug ( "Forgiving reply to stale state request" , "peer ", packet . PeerId ( ) )
return
}
if err != nil {
// If the node data processing failed, the root hash is very wrong, abort
log . Error ( fmt . Sprintf ( "peer %s: state processing failed: %v ", packet . PeerId ( ) , err ) )
log . Error ( "State processing failed" , "peer ", packet . PeerId ( ) , "error" , err )
d . cancel ( )
return
}
@ -957,12 +963,12 @@ func (d *Downloader) fetchNodeData() error {
// If real database progress was made, reset any fast-sync pivot failure
if progressed && atomic . LoadUint32 ( & d . fsPivotFails ) > 1 {
log . Debug ( fmt . Sprintf ( "fast-sync progressed, resetting fail counter from %d ", atomic . LoadUint32 ( & d . fsPivotFails ) ) )
log . Debug ( "Fast-sync progressed, resetting fail counter" , "previous ", atomic . LoadUint32 ( & d . fsPivotFails ) )
atomic . StoreUint32 ( & d . fsPivotFails , 1 ) // Don't ever reset to 0, as that will unlock the pivot block
}
// Log a message to the user and return
if delivered > 0 {
log . Info ( fmt . Sprintf ( "imported %3d state entries in %9v: processed %d, pending at least %d" , delivered , common . PrettyDuration ( time . Since ( start ) ) , syncStatsStateDone , pending ) )
log . Info ( "Imported new state entries" , "count" , delivered , "elapsed" , common . PrettyDuration ( time . Since ( start ) ) , "processed" , syncStatsStateDone , "pending" , pending )
}
} )
}
@ -977,9 +983,9 @@ func (d *Downloader) fetchNodeData() error {
)
err := d . fetchParts ( errCancelStateFetch , d . stateCh , deliver , d . stateWakeCh , expire ,
d . queue . PendingNodeData , d . queue . InFlightNodeData , throttle , reserve , nil , fetch ,
d . queue . CancelNodeData , capacity , d . peers . NodeDataIdlePeers , setIdle , "State " )
d . queue . CancelNodeData , capacity , d . peers . NodeDataIdlePeers , setIdle , "states " )
log . Debug ( fmt . Sprintf ( "Node state data download terminated: %v ", err ) )
log . Debug ( "Node state data download terminated" , "error ", err )
return err
}
@ -1044,11 +1050,11 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
// Issue a log to the user to see what's going on
switch {
case err == nil && packet . Items ( ) == 0 :
log . Trace ( fmt . Sprintf ( "%s: no %s delivered" , peer , strings . ToLower ( kind ) ) )
peer . logger . Trace ( "Requested data not delivered" , "type" , kind )
case err == nil :
log . Trace ( fmt . Sprintf ( "%s: delivered %s %s(s)" , peer , packet . Stats ( ) , strings . ToLower ( kind ) ) )
peer . logger . Trace ( "Delivered new batch of data" , "type" , kind , "count" , packet . Stats ( ) )
default :
log . Trace ( fmt . Sprintf ( "%s: %s delivery failed: %v" , peer , strings . ToLower ( kind ) , err ) )
peer . logger . Trace ( "Failed to deliver retrieved data" , "type" , kind , "error" , err )
}
}
// Blocks assembled, try to update the progress
@ -1091,10 +1097,10 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
// and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
if fails > 2 {
log . Trace ( fmt . Sprintf ( "%s: %s delivery timeout" , peer , strings . ToLower ( kind ) ) )
peer . logger . Trace ( "Data delivery timed out" , "type" , kind )
setIdle ( peer , 0 )
} else {
log . Debug ( fmt . Sprintf ( "%s: stalling %s delivery, dropping" , peer , strings . ToLower ( kind ) ) )
peer . logger . Debug ( "Stalling delivery, dropping" , "type" , kind )
d . dropPeer ( pid )
}
}
@ -1102,7 +1108,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
// If there's nothing more to fetch, wait or terminate
if pending ( ) == 0 {
if ! inFlight ( ) && finished {
log . Debug ( fmt . Sprintf ( "%s fetching completed ", kind ) )
log . Debug ( "Data fetching completed" , "type ", kind )
return nil
}
break
@ -1130,15 +1136,13 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
if request == nil {
continue
}
log . Trace ( "" , "msg" , log . Lazy { Fn : func ( ) string {
if request . From > 0 {
return fmt . Sprintf ( "%s: requesting %s(s) from #%d" , peer , strings . ToLower ( kind ) , request . From )
} else if len ( request . Headers ) > 0 {
return fmt . Sprintf ( "%s: requesting %d %s(s), first at #%d" , peer , len ( request . Headers ) , strings . ToLower ( kind ) , request . Headers [ 0 ] . Number )
} else {
return fmt . Sprintf ( "%s: requesting %d %s(s)" , peer , len ( request . Hashes ) , strings . ToLower ( kind ) )
}
} } )
if request . From > 0 {
peer . logger . Trace ( "Requesting new batch of data" , "type" , kind , "from" , request . From )
} else if len ( request . Headers ) > 0 {
peer . logger . Trace ( "Requesting new batch of data" , "type" , kind , "count" , len ( request . Headers ) , "from" , request . Headers [ 0 ] . Number )
} else {
peer . logger . Trace ( "Requesting new batch of data" , "type" , kind , "count" , len ( request . Hashes ) )
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if fetchHook != nil {
fetchHook ( request . Headers )
@ -1149,7 +1153,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
// case, the internal state of the downloader and the queue is very wrong so
// better hard crash and note the error instead of silently accumulating into
// a much bigger issue.
panic ( fmt . Sprintf ( "%v: %s fetch assignment failed" , peer , strings . ToLower ( kind ) ) )
panic ( fmt . Sprintf ( "%v: %s fetch assignment failed" , peer , kind ) )
}
running = true
}
@ -1193,8 +1197,10 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
if d . headBlock != nil {
curBlock = d . headBlock ( ) . Number ( )
}
log . Warn ( fmt . Sprintf ( "Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)" ,
len ( hashes ) , lastHeader , d . headHeader ( ) . Number , lastFastBlock , curFastBlock , lastBlock , curBlock ) )
log . Warn ( "Rolled back headers" , "count" , len ( hashes ) ,
"header" , fmt . Sprintf ( "%d->%d" , lastHeader , d . headHeader ( ) . Number ) ,
"fast" , fmt . Sprintf ( "%d->%d" , lastFastBlock , curFastBlock ) ,
"block" , fmt . Sprintf ( "%d->%d" , lastBlock , curBlock ) )
// If we're already past the pivot point, this could be an attack, thread carefully
if rollback [ len ( rollback ) - 1 ] . Number . Uint64 ( ) > pivot {
@ -1202,7 +1208,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
if atomic . LoadUint32 ( & d . fsPivotFails ) == 0 {
for _ , header := range rollback {
if header . Number . Uint64 ( ) == pivot {
log . Warn ( fmt . Sprintf ( "Fast-sync critical section failure, locked pivot to header #%d [%x…]" , pivot , header . Hash ( ) . Bytes ( ) [ : 4 ] ) )
log . Warn ( "Fast-sync critical section failure, locked pivot to header" , "number" , pivot , "hash" , header . Hash ( ) . Hex ( ) [ 2 : 10 ] )
d . fsPivotLock = header
}
}
@ -1298,7 +1304,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
if n > 0 {
rollback = append ( rollback , chunk [ : n ] ... )
}
log . Debug ( fmt . Sprintf ( "invalid header #%d [%x…]: %v" , chunk [ n ] . Number , chunk [ n ] . Hash ( ) . Bytes ( ) [ : 4 ] , err ) )
log . Debug ( "Invalid header encountered" , "number" , chunk [ n ] . Number , "hash" , chunk [ n ] . Hash ( ) . Hex ( ) [ 2 : 10 ] , "error" , err )
return errInvalidChain
}
// All verifications passed, store newly found uncertain headers
@ -1310,7 +1316,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// If we're fast syncing and just pulled in the pivot, make sure it's the one locked in
if d . mode == FastSync && d . fsPivotLock != nil && chunk [ 0 ] . Number . Uint64 ( ) <= pivot && chunk [ len ( chunk ) - 1 ] . Number . Uint64 ( ) >= pivot {
if pivot := chunk [ int ( pivot - chunk [ 0 ] . Number . Uint64 ( ) ) ] ; pivot . Hash ( ) != d . fsPivotLock . Hash ( ) {
log . Warn ( fmt . Sprintf ( "Pivot doesn't match locked in version: have #%v [%x…], want #%v [%x…]" , pivot . Number , pivot . Hash ( ) . Bytes ( ) [ : 4 ] , d . fsPivotLock . Number , d . fsPivotLock . Hash ( ) . Bytes ( ) [ : 4 ] ) )
log . Warn ( "Pivot doesn't match locked in one" , "remoteNumber" , pivot . Number , "remoteHash" , pivot . Hash ( ) . Hex ( ) [ 2 : 10 ] , "localNumber" , d . fsPivotLock . Number , "localHash" , d . fsPivotLock . Hash ( ) . Hex ( ) [ 2 : 10 ] )
return errInvalidChain
}
}
@ -1327,7 +1333,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// Otherwise insert the headers for content retrieval
inserts := d . queue . Schedule ( chunk , origin )
if len ( inserts ) != len ( chunk ) {
log . Debug ( fmt . Sprintf ( "stale headers" ) )
log . Debug ( "Stale headers" )
return errBadPeer
}
}
@ -1358,10 +1364,15 @@ func (d *Downloader) processContent() error {
d . chainInsertHook ( results )
}
// Actually import the blocks
log . Debug ( "" , "msg" , log . Lazy { Fn : func ( ) string {
first , last := results [ 0 ] . Header , results [ len ( results ) - 1 ] . Header
return fmt . Sprintf ( "Inserting chain with %d items (#%d [%x…] - #%d [%x…])" , len ( results ) , first . Number , first . Hash ( ) . Bytes ( ) [ : 4 ] , last . Number , last . Hash ( ) . Bytes ( ) [ : 4 ] )
} } )
first , last := results [ 0 ] . Header , results [ len ( results ) - 1 ] . Header
log . Debug ( "Inserting downloaded chain" , "items" , len ( results ) ,
"from" , log . Lazy { Fn : func ( ) string {
return fmt . Sprintf ( "#%d [%x…]" , first . Number , first . Hash ( ) . Bytes ( ) [ : 4 ] )
} } ,
"till" , log . Lazy { Fn : func ( ) string {
return fmt . Sprintf ( "#%d [%x…]" , last . Number , last . Hash ( ) . Bytes ( ) [ : 4 ] )
} } )
for len ( results ) != 0 {
// Check for any termination requests
select {
@ -1395,14 +1406,14 @@ func (d *Downloader) processContent() error {
case len ( receipts ) > 0 :
index , err = d . insertReceipts ( blocks , receipts )
if err == nil && blocks [ len ( blocks ) - 1 ] . NumberU64 ( ) == pivot {
log . Debug ( fmt . Sprintf ( "Committing block #%d [%x…] as the new head ", blocks [ len ( blocks ) - 1 ] . Number ( ) , blocks [ len ( blocks ) - 1 ] . Hash ( ) . Bytes ( ) [ : 4 ] ) )
log . Debug ( "Committing block as new head" , "number ", blocks [ len ( blocks ) - 1 ] . Number ( ) , "hash" , blocks [ len ( blocks ) - 1 ] . Hash ( ) . Hex ( ) [ 2 : 10 ] )
index , err = len ( blocks ) - 1 , d . commitHeadBlock ( blocks [ len ( blocks ) - 1 ] . Hash ( ) )
}
default :
index , err = d . insertBlocks ( blocks )
}
if err != nil {
log . Debug ( fmt . Sprintf ( "Result #%d [%x…] processing failed: %v ", results [ index ] . Header . Number , results [ index ] . Header . Hash ( ) . Bytes ( ) [ : 4 ] , err ) )
log . Debug ( "Downloaded item processing failed" , "number ", results [ index ] . Header . Number , "hash" , results [ index ] . Header . Hash ( ) . Hex ( ) [ 2 : 10 ] , "error" , err )
return errInvalidChain
}
// Shift the results to the next batch
@ -1470,7 +1481,7 @@ func (d *Downloader) qosTuner() {
atomic . StoreUint64 ( & d . rttConfidence , conf )
// Log the new QoS values and sleep until the next RTT
log . Debug ( fmt . Sprintf ( "Quality of service: rtt %v, conf %.3f, ttl %v" , rtt , float64 ( conf ) / 1000000.0 , d . requestTTL ( ) ) )
log . Debug ( "Recalculated downloader QoS values" , "rtt" , rtt , "confidence" , float64 ( conf ) / 1000000.0 , "ttl" , d . requestTTL ( ) )
select {
case <- d . quitCh :
return
@ -1500,7 +1511,7 @@ func (d *Downloader) qosReduceConfidence() {
atomic . StoreUint64 ( & d . rttConfidence , conf )
rtt := time . Duration ( atomic . LoadUint64 ( & d . rttEstimate ) )
log . Debug ( fmt . Sprintf ( "Quality of service: rtt %v, conf %.3f, ttl %v" , rtt , float64 ( conf ) / 1000000.0 , d . requestTTL ( ) ) )
log . Debug ( "Relaxed downloader QoS values" , "rtt" , rtt , "confidence" , float64 ( conf ) / 1000000.0 , "ttl" , d . requestTTL ( ) )
}
// requestRTT returns the current target round trip time for a download request