@ -56,6 +56,7 @@ type blockPack struct {
type syncPack struct {
type syncPack struct {
peer * peer
peer * peer
hash common . Hash
hash common . Hash
ignoreInitial bool
}
}
func New ( hasBlock hashCheckFn , insertChain chainInsertFn , currentTd currentTdFn ) * Downloader {
func New ( hasBlock hashCheckFn , insertChain chainInsertFn , currentTd currentTdFn ) * Downloader {
@ -104,11 +105,13 @@ func (d *Downloader) UnregisterPeer(id string) {
func ( d * Downloader ) peerHandler ( ) {
func ( d * Downloader ) peerHandler ( ) {
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
itimer := time . NewTicker ( 5 * time . Second )
//itimer := time.NewTicker(5 * time.Second)
itimer := time . NewTimer ( 5 * time . Second )
out :
out :
for {
for {
select {
select {
case <- d . newPeerCh :
case <- d . newPeerCh :
itimer . Stop ( )
// Meet the `minDesiredPeerCount` before we select our best peer
// Meet the `minDesiredPeerCount` before we select our best peer
if len ( d . peers ) < minDesiredPeerCount {
if len ( d . peers ) < minDesiredPeerCount {
break
break
@ -137,7 +140,7 @@ func (d *Downloader) selectPeer(p *peer) {
}
}
glog . V ( logger . Detail ) . Infoln ( "New peer with highest TD =" , p . td )
glog . V ( logger . Detail ) . Infoln ( "New peer with highest TD =" , p . td )
d . syncCh <- syncPack { p , p . recentHash }
d . syncCh <- syncPack { p , p . recentHash , false }
}
}
}
}
@ -147,11 +150,11 @@ out:
select {
select {
case sync := <- d . syncCh :
case sync := <- d . syncCh :
selectedPeer := sync . peer
selectedPeer := sync . peer
glog . V ( logger . Detail ) . Infoln ( "Synchronising with network using:" , selectedPeer . id )
glog . V ( logger . Detail ) . Infoln ( "Synchronising with the network using:" , selectedPeer . id )
// Start the fetcher. This will block the update entirely
// Start the fetcher. This will block the update entirely
// interupts need to be send to the appropriate channels
// interupts need to be send to the appropriate channels
// respectively.
// respectively.
if err := d . startFetchingHashes ( selectedPeer , sync . hash ) ; err != nil {
if err := d . startFetchingHashes ( selectedPeer , sync . hash , sync . ignoreInitial ) ; err != nil {
// handle error
// handle error
glog . V ( logger . Debug ) . Infoln ( "Error fetching hashes:" , err )
glog . V ( logger . Debug ) . Infoln ( "Error fetching hashes:" , err )
// XXX Reset
// XXX Reset
@ -178,11 +181,18 @@ out:
}
}
// XXX Make synchronous
// XXX Make synchronous
func ( d * Downloader ) startFetchingHashes ( p * peer , hash common . Hash ) error {
func ( d * Downloader ) startFetchingHashes ( p * peer , hash common . Hash , ignoreInitial bool ) error {
glog . V ( logger . Debug ) . Infoln ( "Downloading hashes" )
glog . V ( logger . Debug ) . Infof ( "Downloading hashes (%x) from %s " , hash . Bytes ( ) [ : 4 ] , p . id )
start := time . Now ( )
start := time . Now ( )
// We ignore the initial hash in some cases (e.g. we received a block without it's parent)
// In such circumstances we don't need to download the block so don't add it to the queue.
if ! ignoreInitial {
// Add the hash to the queue first
d . queue . hashPool . Add ( hash )
}
// Get the first batch of hashes
// Get the first batch of hashes
p . getHashes ( hash )
p . getHashes ( hash )
atomic . StoreInt32 ( & d . fetchingHashes , 1 )
atomic . StoreInt32 ( & d . fetchingHashes , 1 )
@ -195,7 +205,7 @@ out:
hashSet := set . New ( )
hashSet := set . New ( )
for _ , hash := range hashes {
for _ , hash := range hashes {
if d . hasBlock ( hash ) {
if d . hasBlock ( hash ) {
glog . V ( logger . Debug ) . Infof ( "Found common hash %x\n" , hash )
glog . V ( logger . Debug ) . Infof ( "Found common hash %x\n" , hash [ : 4 ] )
done = true
done = true
break
break
@ -207,7 +217,7 @@ out:
// Add hashes to the chunk set
// Add hashes to the chunk set
// Check if we're done fetching
// Check if we're done fetching
if ! done {
if ! done && len ( hashes ) > 0 {
//fmt.Println("re-fetch. current =", d.queue.hashPool.Size())
//fmt.Println("re-fetch. current =", d.queue.hashPool.Size())
// Get the next set of hashes
// Get the next set of hashes
p . getHashes ( hashes [ len ( hashes ) - 1 ] )
p . getHashes ( hashes [ len ( hashes ) - 1 ] )
@ -218,7 +228,7 @@ out:
}
}
}
}
}
}
glog . V ( logger . Detail ) . Infoln ( "Download hashes: done. Took" , time . Since ( start ) )
glog . V ( logger . Detail ) . Infof ( "Downloaded hashes (%d). Took %v\n" , d . queue . hashPool . Size ( ) , time . Since ( start ) )
return nil
return nil
}
}
@ -242,6 +252,10 @@ out:
// from the available peers.
// from the available peers.
if d . queue . hashPool . Size ( ) > 0 {
if d . queue . hashPool . Size ( ) > 0 {
availablePeers := d . peers . get ( idleState )
availablePeers := d . peers . get ( idleState )
if len ( availablePeers ) == 0 {
glog . V ( logger . Detail ) . Infoln ( "No peers available out of" , len ( d . peers ) )
}
for _ , peer := range availablePeers {
for _ , peer := range availablePeers {
// Get a possible chunk. If nil is returned no chunk
// Get a possible chunk. If nil is returned no chunk
// could be returned due to no hashes available.
// could be returned due to no hashes available.
@ -317,23 +331,35 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) {
return
return
}
}
glog . V ( logger . Detail ) . Infoln ( "Inserting new block from:" , id )
peer := d . peers . getPeer ( id )
d . queue . addBlock ( id , block , td )
// if the peer is in our healthy list of peers; update the td
// if the peer is in our healthy list of peers; update the td
// here is a good chance to add the peer back to the list
// and add the block. Otherwise just ignore it
if peer := d . peers . getPeer ( id ) ; peer != nil {
if peer == nil {
glog . V ( logger . Detail ) . Infof ( "Ignored block from bad peer %s\n" , id )
return
}
peer . mu . Lock ( )
peer . mu . Lock ( )
peer . td = td
peer . td = td
peer . recentHash = block . Hash ( )
peer . recentHash = block . Hash ( )
peer . mu . Unlock ( )
peer . mu . Unlock ( )
}
glog . V ( logger . Detail ) . Infoln ( "Inserting new block from:" , id )
d . queue . addBlock ( id , block , td )
// if neither go ahead to process
// if neither go ahead to process
if ! ( d . isFetchingHashes ( ) || d . isDownloadingBlocks ( ) ) {
if ! ( d . isFetchingHashes ( ) || d . isDownloadingBlocks ( ) ) {
// Check if the parent of the received block is known.
// If the block is not know, request it otherwise, request.
phash := block . ParentHash ( )
if ! d . hasBlock ( phash ) {
glog . V ( logger . Detail ) . Infof ( "Missing parent %x, requires fetching\n" , phash . Bytes ( ) [ : 4 ] )
d . syncCh <- syncPack { peer , peer . recentHash , true }
} else {
d . process ( )
d . process ( )
}
}
}
}
}
// Deliver a chunk to the downloader. This is usually done through the BlocksMsg by
// Deliver a chunk to the downloader. This is usually done through the BlocksMsg by
// the protocol handler.
// the protocol handler.
@ -369,7 +395,7 @@ func (d *Downloader) process() error {
// TODO change this. This shite
// TODO change this. This shite
for i , block := range blocks [ : max ] {
for i , block := range blocks [ : max ] {
if ! d . hasBlock ( block . ParentHash ( ) ) {
if ! d . hasBlock ( block . ParentHash ( ) ) {
d . syncCh <- syncPack { d . peers . bestPeer ( ) , block . Hash ( ) }
d . syncCh <- syncPack { d . peers . bestPeer ( ) , block . Hash ( ) , true }
// remove processed blocks
// remove processed blocks
blocks = blocks [ i : ]
blocks = blocks [ i : ]