// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package fetcher contains the block announcement based synchronisation.
package fetcher
import (
"errors"
"fmt"
"math/rand"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
const (
arriveTimeout = 500 * time . Millisecond // Time allowance before an announced block is explicitly requested
gatherSlack = 100 * time . Millisecond // Interval used to collate almost-expired announces with fetches
fetchTimeout = 5 * time . Second // Maximum allotted time to return an explicitly requested block
maxUncleDist = 7 // Maximum allowed backward distance from the chain head
maxQueueDist = 32 // Maximum allowed distance from the chain head to queue
hashLimit = 256 // Maximum number of unique blocks a peer may have announced
blockLimit = 64 // Maximum number of unique blocks a per may have delivered
)
var (
errTerminated = errors . New ( "terminated" )
)
// blockRetrievalFn is a callback type for retrieving a block from the local chain.
type blockRetrievalFn func ( common . Hash ) * types . Block
// blockRequesterFn is a callback type for sending a block retrieval request.
type blockRequesterFn func ( [ ] common . Hash ) error
// headerRequesterFn is a callback type for sending a header retrieval request.
type headerRequesterFn func ( common . Hash ) error
// bodyRequesterFn is a callback type for sending a body retrieval request.
type bodyRequesterFn func ( [ ] common . Hash ) error
// blockValidatorFn is a callback type to verify a block's header for fast propagation.
type blockValidatorFn func ( block * types . Block , parent * types . Block ) error
// blockBroadcasterFn is a callback type for broadcasting a block to connected peers.
type blockBroadcasterFn func ( block * types . Block , propagate bool )
// chainHeightFn is a callback type to retrieve the current chain height.
type chainHeightFn func ( ) uint64
// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
type chainInsertFn func ( types . Blocks ) ( int , error )
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func ( id string )
// announce is the hash notification of the availability of a new block in the
// network.
type announce struct {
hash common . Hash // Hash of the block being announced
number uint64 // Number of the block being announced (0 = unknown | old protocol)
header * types . Header // Header of the block partially reassembled (new protocol)
time time . Time // Timestamp of the announcement
origin string // Identifier of the peer originating the notification
fetch61 blockRequesterFn // [eth/61] Fetcher function to retrieve an announced block
fetchHeader headerRequesterFn // [eth/62] Fetcher function to retrieve the header of an announced block
fetchBodies bodyRequesterFn // [eth/62] Fetcher function to retrieve the body of an announced block
}
// headerFilterTask represents a batch of headers needing fetcher filtering.
type headerFilterTask struct {
headers [ ] * types . Header // Collection of headers to filter
time time . Time // Arrival time of the headers
}
// headerFilterTask represents a batch of block bodies (transactions and uncles)
// needing fetcher filtering.
type bodyFilterTask struct {
transactions [ ] [ ] * types . Transaction // Collection of transactions per block bodies
uncles [ ] [ ] * types . Header // Collection of uncles per block bodies
time time . Time // Arrival time of the blocks' contents
}
// inject represents a schedules import operation.
type inject struct {
origin string
block * types . Block
}
// Fetcher is responsible for accumulating block announcements from various peers
// and scheduling them for retrieval.
type Fetcher struct {
// Various event channels
notify chan * announce
inject chan * inject
blockFilter chan chan [ ] * types . Block
headerFilter chan chan * headerFilterTask
bodyFilter chan chan * bodyFilterTask
done chan common . Hash
quit chan struct { }
// Announce states
announces map [ string ] int // Per peer announce counts to prevent memory exhaustion
announced map [ common . Hash ] [ ] * announce // Announced blocks, scheduled for fetching
fetching map [ common . Hash ] * announce // Announced blocks, currently fetching
fetched map [ common . Hash ] [ ] * announce // Blocks with headers fetched, scheduled for body retrieval
completing map [ common . Hash ] * announce // Blocks with headers, currently body-completing
// Block cache
queue * prque . Prque // Queue containing the import operations (block number sorted)
queues map [ string ] int // Per peer block counts to prevent memory exhaustion
queued map [ common . Hash ] * inject // Set of already queued blocks (to dedup imports)
// Callbacks
getBlock blockRetrievalFn // Retrieves a block from the local chain
validateBlock blockValidatorFn // Checks if a block's headers have a valid proof of work
broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
chainHeight chainHeightFn // Retrieves the current chain's height
insertChain chainInsertFn // Injects a batch of blocks into the chain
dropPeer peerDropFn // Drops a peer for misbehaving
// Testing hooks
announceChangeHook func ( common . Hash , bool ) // Method to call upon adding or deleting a hash from the announce list
queueChangeHook func ( common . Hash , bool ) // Method to call upon adding or deleting a block from the import queue
fetchingHook func ( [ ] common . Hash ) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
completingHook func ( [ ] common . Hash ) // Method to call upon starting a block body fetch (eth/62)
importedHook func ( * types . Block ) // Method to call upon successful block import (both eth/61 and eth/62)
}
// New creates a block fetcher to retrieve blocks based on hash announcements.
func New ( getBlock blockRetrievalFn , validateBlock blockValidatorFn , broadcastBlock blockBroadcasterFn , chainHeight chainHeightFn , insertChain chainInsertFn , dropPeer peerDropFn ) * Fetcher {
return & Fetcher {
notify : make ( chan * announce ) ,
inject : make ( chan * inject ) ,
blockFilter : make ( chan chan [ ] * types . Block ) ,
headerFilter : make ( chan chan * headerFilterTask ) ,
bodyFilter : make ( chan chan * bodyFilterTask ) ,
done : make ( chan common . Hash ) ,
quit : make ( chan struct { } ) ,
announces : make ( map [ string ] int ) ,
announced : make ( map [ common . Hash ] [ ] * announce ) ,
fetching : make ( map [ common . Hash ] * announce ) ,
fetched : make ( map [ common . Hash ] [ ] * announce ) ,
completing : make ( map [ common . Hash ] * announce ) ,
queue : prque . New ( ) ,
queues : make ( map [ string ] int ) ,
queued : make ( map [ common . Hash ] * inject ) ,
getBlock : getBlock ,
validateBlock : validateBlock ,
broadcastBlock : broadcastBlock ,
chainHeight : chainHeight ,
insertChain : insertChain ,
dropPeer : dropPeer ,
}
}
// Start boots up the announcement based synchroniser, accepting and processing
// hash notifications and block fetches until termination requested.
func ( f * Fetcher ) Start ( ) {
go f . loop ( )
}
// Stop terminates the announcement based synchroniser, canceling all pending
// operations.
func ( f * Fetcher ) Stop ( ) {
close ( f . quit )
}
// Notify announces the fetcher of the potential availability of a new block in
// the network.
func ( f * Fetcher ) Notify ( peer string , hash common . Hash , number uint64 , time time . Time ,
blockFetcher blockRequesterFn , // eth/61 specific whole block fetcher
headerFetcher headerRequesterFn , bodyFetcher bodyRequesterFn ) error {
block := & announce {
hash : hash ,
number : number ,
time : time ,
origin : peer ,
fetch61 : blockFetcher ,
fetchHeader : headerFetcher ,
fetchBodies : bodyFetcher ,
}
select {
case f . notify <- block :
return nil
case <- f . quit :
return errTerminated
}
}
// Enqueue tries to fill gaps the the fetcher's future import queue.
func ( f * Fetcher ) Enqueue ( peer string , block * types . Block ) error {
op := & inject {
origin : peer ,
block : block ,
}
select {
case f . inject <- op :
return nil
case <- f . quit :
return errTerminated
}
}
// FilterBlocks extracts all the blocks that were explicitly requested by the fetcher,
// returning those that should be handled differently.
func ( f * Fetcher ) FilterBlocks ( blocks types . Blocks ) types . Blocks {
glog . V ( logger . Detail ) . Infof ( "[eth/61] filtering %d blocks" , len ( blocks ) )
// Send the filter channel to the fetcher
filter := make ( chan [ ] * types . Block )
select {
case f . blockFilter <- filter :
case <- f . quit :
return nil
}
// Request the filtering of the block list
select {
case filter <- blocks :
case <- f . quit :
return nil
}
// Retrieve the blocks remaining after filtering
select {
case blocks := <- filter :
return blocks
case <- f . quit :
return nil
}
}
// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
// returning those that should be handled differently.
func ( f * Fetcher ) FilterHeaders ( headers [ ] * types . Header , time time . Time ) [ ] * types . Header {
glog . V ( logger . Detail ) . Infof ( "[eth/62] filtering %d headers" , len ( headers ) )
// Send the filter channel to the fetcher
filter := make ( chan * headerFilterTask )
select {
case f . headerFilter <- filter :
case <- f . quit :
return nil
}
// Request the filtering of the header list
select {
case filter <- & headerFilterTask { headers : headers , time : time } :
case <- f . quit :
return nil
}
// Retrieve the headers remaining after filtering
select {
case task := <- filter :
return task . headers
case <- f . quit :
return nil
}
}
// FilterBodies extracts all the block bodies that were explicitly requested by
// the fetcher, returning those that should be handled differently.
func ( f * Fetcher ) FilterBodies ( transactions [ ] [ ] * types . Transaction , uncles [ ] [ ] * types . Header , time time . Time ) ( [ ] [ ] * types . Transaction , [ ] [ ] * types . Header ) {
glog . V ( logger . Detail ) . Infof ( "[eth/62] filtering %d:%d bodies" , len ( transactions ) , len ( uncles ) )
// Send the filter channel to the fetcher
filter := make ( chan * bodyFilterTask )
select {
case f . bodyFilter <- filter :
case <- f . quit :
return nil , nil
}
// Request the filtering of the body list
select {
case filter <- & bodyFilterTask { transactions : transactions , uncles : uncles , time : time } :
case <- f . quit :
return nil , nil
}
// Retrieve the bodies remaining after filtering
select {
case task := <- filter :
return task . transactions , task . uncles
case <- f . quit :
return nil , nil
}
}
// Loop is the main fetcher loop, checking and processing various notification
// events.
func ( f * Fetcher ) loop ( ) {
// Iterate the block fetching until a quit is requested
fetchTimer := time . NewTimer ( 0 )
completeTimer := time . NewTimer ( 0 )
for {
// Clean up any expired block fetches
for hash , announce := range f . fetching {
if time . Since ( announce . time ) > fetchTimeout {
f . forgetHash ( hash )
}
}
// Import any queued blocks that could potentially fit
height := f . chainHeight ( )
for ! f . queue . Empty ( ) {
op := f . queue . PopItem ( ) . ( * inject )
if f . queueChangeHook != nil {
f . queueChangeHook ( op . block . Hash ( ) , false )
}
// If too high up the chain or phase, continue later
number := op . block . NumberU64 ( )
if number > height + 1 {
f . queue . Push ( op , - float32 ( op . block . NumberU64 ( ) ) )
if f . queueChangeHook != nil {
f . queueChangeHook ( op . block . Hash ( ) , true )
}
break
}
// Otherwise if fresh and still unknown, try and import
hash := op . block . Hash ( )
if number + maxUncleDist < height || f . getBlock ( hash ) != nil {
f . forgetBlock ( hash )
continue
}
f . insert ( op . origin , op . block )
}
// Wait for an outside event to occur
select {
case <- f . quit :
// Fetcher terminating, abort all operations
return
case notification := <- f . notify :
// A block was announced, make sure the peer isn't DOSing us
propAnnounceInMeter . Mark ( 1 )
count := f . announces [ notification . origin ] + 1
if count > hashLimit {
glog . V ( logger . Debug ) . Infof ( "Peer %s: exceeded outstanding announces (%d)" , notification . origin , hashLimit )
propAnnounceDOSMeter . Mark ( 1 )
break
}
// If we have a valid block number, check that it's potentially useful
if notification . number > 0 {
if dist := int64 ( notification . number ) - int64 ( f . chainHeight ( ) ) ; dist < - maxUncleDist || dist > maxQueueDist {
glog . V ( logger . Debug ) . Infof ( "[eth/62] Peer %s: discarded announcement #%d [%x…], distance %d" , notification . origin , notification . number , notification . hash [ : 4 ] , dist )
propAnnounceDropMeter . Mark ( 1 )
break
}
}
// All is well, schedule the announce if block's not yet downloading
if _ , ok := f . fetching [ notification . hash ] ; ok {
break
}
if _ , ok := f . completing [ notification . hash ] ; ok {
break
}
f . announces [ notification . origin ] = count
f . announced [ notification . hash ] = append ( f . announced [ notification . hash ] , notification )
if f . announceChangeHook != nil && len ( f . announced [ notification . hash ] ) == 1 {
f . announceChangeHook ( notification . hash , true )
}
if len ( f . announced ) == 1 {
f . rescheduleFetch ( fetchTimer )
}
case op := <- f . inject :
// A direct block insertion was requested, try and fill any pending gaps
propBroadcastInMeter . Mark ( 1 )
f . enqueue ( op . origin , op . block )
case hash := <- f . done :
// A pending import finished, remove all traces of the notification
f . forgetHash ( hash )
f . forgetBlock ( hash )
case <- fetchTimer . C :
// At least one block's timer ran out, check for needing retrieval
request := make ( map [ string ] [ ] common . Hash )
for hash , announces := range f . announced {
if time . Since ( announces [ 0 ] . time ) > arriveTimeout - gatherSlack {
// Pick a random peer to retrieve from, reset all others
announce := announces [ rand . Intn ( len ( announces ) ) ]
f . forgetHash ( hash )
// If the block still didn't arrive, queue for fetching
if f . getBlock ( hash ) == nil {
request [ announce . origin ] = append ( request [ announce . origin ] , hash )
f . fetching [ hash ] = announce
}
}
}
// Send out all block (eth/61) or header (eth/62) requests
for peer , hashes := range request {
if glog . V ( logger . Detail ) && len ( hashes ) > 0 {
list := "["
for _ , hash := range hashes {
list += fmt . Sprintf ( "%x…, " , hash [ : 4 ] )
}
list = list [ : len ( list ) - 2 ] + "]"
if f . fetching [ hashes [ 0 ] ] . fetch61 != nil {
glog . V ( logger . Detail ) . Infof ( "[eth/61] Peer %s: fetching blocks %s" , peer , list )
} else {
glog . V ( logger . Detail ) . Infof ( "[eth/62] Peer %s: fetching headers %s" , peer , list )
}
}
// Create a closure of the fetch and schedule in on a new thread
fetchBlocks , fetchHeader , hashes := f . fetching [ hashes [ 0 ] ] . fetch61 , f . fetching [ hashes [ 0 ] ] . fetchHeader , hashes
go func ( ) {
if f . fetchingHook != nil {
f . fetchingHook ( hashes )
}
if fetchBlocks != nil {
// Use old eth/61 protocol to retrieve whole blocks
blockFetchMeter . Mark ( int64 ( len ( hashes ) ) )
fetchBlocks ( hashes )
} else {
// Use new eth/62 protocol to retrieve headers first
for _ , hash := range hashes {
headerFetchMeter . Mark ( 1 )
fetchHeader ( hash ) // Suboptimal, but protocol doesn't allow batch header retrievals
}
}
} ( )
}
// Schedule the next fetch if blocks are still pending
f . rescheduleFetch ( fetchTimer )
case <- completeTimer . C :
// At least one header's timer ran out, retrieve everything
request := make ( map [ string ] [ ] common . Hash )
for hash , announces := range f . fetched {
// Pick a random peer to retrieve from, reset all others
announce := announces [ rand . Intn ( len ( announces ) ) ]
f . forgetHash ( hash )
// If the block still didn't arrive, queue for completion
if f . getBlock ( hash ) == nil {
request [ announce . origin ] = append ( request [ announce . origin ] , hash )
f . completing [ hash ] = announce
}
}
// Send out all block body requests
for peer , hashes := range request {
if glog . V ( logger . Detail ) && len ( hashes ) > 0 {
list := "["
for _ , hash := range hashes {
list += fmt . Sprintf ( "%x…, " , hash [ : 4 ] )
}
list = list [ : len ( list ) - 2 ] + "]"
glog . V ( logger . Detail ) . Infof ( "[eth/62] Peer %s: fetching bodies %s" , peer , list )
}
// Create a closure of the fetch and schedule in on a new thread
if f . completingHook != nil {
f . completingHook ( hashes )
}
bodyFetchMeter . Mark ( int64 ( len ( hashes ) ) )
go f . completing [ hashes [ 0 ] ] . fetchBodies ( hashes )
}
// Schedule the next fetch if blocks are still pending
f . rescheduleComplete ( completeTimer )
case filter := <- f . blockFilter :
// Blocks arrived, extract any explicit fetches, return all else
var blocks types . Blocks
select {
case blocks = <- filter :
case <- f . quit :
return
}
blockFilterInMeter . Mark ( int64 ( len ( blocks ) ) )
explicit , download := [ ] * types . Block { } , [ ] * types . Block { }
for _ , block := range blocks {
hash := block . Hash ( )
// Filter explicitly requested blocks from hash announcements
if f . fetching [ hash ] != nil && f . queued [ hash ] == nil {
// Discard if already imported by other means
if f . getBlock ( hash ) == nil {
explicit = append ( explicit , block )
} else {
f . forgetHash ( hash )
}
} else {
download = append ( download , block )
}
}
blockFilterOutMeter . Mark ( int64 ( len ( download ) ) )
select {
case filter <- download :
case <- f . quit :
return
}
// Schedule the retrieved blocks for ordered import
for _ , block := range explicit {
if announce := f . fetching [ block . Hash ( ) ] ; announce != nil {
f . enqueue ( announce . origin , block )
}
}
case filter := <- f . headerFilter :
// Headers arrived from a remote peer. Extract those that were explicitly
// requested by the fetcher, and return everything else so it's delivered
// to other parts of the system.
var task * headerFilterTask
select {
case task = <- filter :
case <- f . quit :
return
}
headerFilterInMeter . Mark ( int64 ( len ( task . headers ) ) )
// Split the batch of headers into unknown ones (to return to the caller),
// known incomplete ones (requiring body retrievals) and completed blocks.
unknown , incomplete , complete := [ ] * types . Header { } , [ ] * announce { } , [ ] * types . Block { }
for _ , header := range task . headers {
hash := header . Hash ( )
// Filter fetcher-requested headers from other synchronisation algorithms
if announce := f . fetching [ hash ] ; announce != nil && f . fetched [ hash ] == nil && f . completing [ hash ] == nil && f . queued [ hash ] == nil {
// If the delivered header does not match the promised number, drop the announcer
if header . Number . Uint64 ( ) != announce . number {
glog . V ( logger . Detail ) . Infof ( "[eth/62] Peer %s: invalid block number for [%x…]: announced %d, provided %d" , announce . origin , header . Hash ( ) . Bytes ( ) [ : 4 ] , announce . number , header . Number . Uint64 ( ) )
f . dropPeer ( announce . origin )
f . forgetHash ( hash )
continue
}
// Only keep if not imported by other means
if f . getBlock ( hash ) == nil {
announce . header = header
announce . time = task . time
// If the block is empty (header only), short circuit into the final import queue
if header . TxHash == types . DeriveSha ( types . Transactions { } ) && header . UncleHash == types . CalcUncleHash ( [ ] * types . Header { } ) {
glog . V ( logger . Detail ) . Infof ( "[eth/62] Peer %s: block #%d [%x…] empty, skipping body retrieval" , announce . origin , header . Number . Uint64 ( ) , header . Hash ( ) . Bytes ( ) [ : 4 ] )
block := types . NewBlockWithHeader ( header )
block . ReceivedAt = task . time
complete = append ( complete , block )
f . completing [ hash ] = announce
continue
}
// Otherwise add to the list of blocks needing completion
incomplete = append ( incomplete , announce )
} else {
glog . V ( logger . Detail ) . Infof ( "[eth/62] Peer %s: block #%d [%x…] already imported, discarding header" , announce . origin , header . Number . Uint64 ( ) , header . Hash ( ) . Bytes ( ) [ : 4 ] )
f . forgetHash ( hash )
}
} else {
// Fetcher doesn't know about it, add to the return list
unknown = append ( unknown , header )
}
}
headerFilterOutMeter . Mark ( int64 ( len ( unknown ) ) )
select {
case filter <- & headerFilterTask { headers : unknown , time : task . time } :
case <- f . quit :
return
}
// Schedule the retrieved headers for body completion
for _ , announce := range incomplete {
hash := announce . header . Hash ( )
if _ , ok := f . completing [ hash ] ; ok {
continue
}
f . fetched [ hash ] = append ( f . fetched [ hash ] , announce )
if len ( f . fetched ) == 1 {
f . rescheduleComplete ( completeTimer )
}
}
// Schedule the header-only blocks for import
for _ , block := range complete {
if announce := f . completing [ block . Hash ( ) ] ; announce != nil {
f . enqueue ( announce . origin , block )
}
}
case filter := <- f . bodyFilter :
// Block bodies arrived, extract any explicitly requested blocks, return the rest
var task * bodyFilterTask
select {
case task = <- filter :
case <- f . quit :
return
}
bodyFilterInMeter . Mark ( int64 ( len ( task . transactions ) ) )
blocks := [ ] * types . Block { }
for i := 0 ; i < len ( task . transactions ) && i < len ( task . uncles ) ; i ++ {
// Match up a body to any possible completion request
matched := false
for hash , announce := range f . completing {
if f . queued [ hash ] == nil {
txnHash := types . DeriveSha ( types . Transactions ( task . transactions [ i ] ) )
uncleHash := types . CalcUncleHash ( task . uncles [ i ] )
if txnHash == announce . header . TxHash && uncleHash == announce . header . UncleHash {
// Mark the body matched, reassemble if still unknown
matched = true
if f . getBlock ( hash ) == nil {
block := types . NewBlockWithHeader ( announce . header ) . WithBody ( task . transactions [ i ] , task . uncles [ i ] )
block . ReceivedAt = task . time
blocks = append ( blocks , block )
} else {
f . forgetHash ( hash )
}
}
}
}
if matched {
task . transactions = append ( task . transactions [ : i ] , task . transactions [ i + 1 : ] ... )
task . uncles = append ( task . uncles [ : i ] , task . uncles [ i + 1 : ] ... )
i --
continue
}
}
bodyFilterOutMeter . Mark ( int64 ( len ( task . transactions ) ) )
select {
case filter <- task :
case <- f . quit :
return
}
// Schedule the retrieved blocks for ordered import
for _ , block := range blocks {
if announce := f . completing [ block . Hash ( ) ] ; announce != nil {
f . enqueue ( announce . origin , block )
}
}
}
}
}
// rescheduleFetch resets the specified fetch timer to the next announce timeout.
func ( f * Fetcher ) rescheduleFetch ( fetch * time . Timer ) {
// Short circuit if no blocks are announced
if len ( f . announced ) == 0 {
return
}
// Otherwise find the earliest expiring announcement
earliest := time . Now ( )
for _ , announces := range f . announced {
if earliest . After ( announces [ 0 ] . time ) {
earliest = announces [ 0 ] . time
}
}
fetch . Reset ( arriveTimeout - time . Since ( earliest ) )
}
// rescheduleComplete resets the specified completion timer to the next fetch timeout.
func ( f * Fetcher ) rescheduleComplete ( complete * time . Timer ) {
// Short circuit if no headers are fetched
if len ( f . fetched ) == 0 {
return
}
// Otherwise find the earliest expiring announcement
earliest := time . Now ( )
for _ , announces := range f . fetched {
if earliest . After ( announces [ 0 ] . time ) {
earliest = announces [ 0 ] . time
}
}
complete . Reset ( gatherSlack - time . Since ( earliest ) )
}
// enqueue schedules a new future import operation, if the block to be imported
// has not yet been seen.
func ( f * Fetcher ) enqueue ( peer string , block * types . Block ) {
hash := block . Hash ( )
// Ensure the peer isn't DOSing us
count := f . queues [ peer ] + 1
if count > blockLimit {
glog . V ( logger . Debug ) . Infof ( "Peer %s: discarded block #%d [%x…], exceeded allowance (%d)" , peer , block . NumberU64 ( ) , hash . Bytes ( ) [ : 4 ] , blockLimit )
propBroadcastDOSMeter . Mark ( 1 )
f . forgetHash ( hash )
return
}
// Discard any past or too distant blocks
if dist := int64 ( block . NumberU64 ( ) ) - int64 ( f . chainHeight ( ) ) ; dist < - maxUncleDist || dist > maxQueueDist {
glog . V ( logger . Debug ) . Infof ( "Peer %s: discarded block #%d [%x…], distance %d" , peer , block . NumberU64 ( ) , hash . Bytes ( ) [ : 4 ] , dist )
propBroadcastDropMeter . Mark ( 1 )
f . forgetHash ( hash )
return
}
// Schedule the block for future importing
if _ , ok := f . queued [ hash ] ; ! ok {
op := & inject {
origin : peer ,
block : block ,
}
f . queues [ peer ] = count
f . queued [ hash ] = op
f . queue . Push ( op , - float32 ( block . NumberU64 ( ) ) )
if f . queueChangeHook != nil {
f . queueChangeHook ( op . block . Hash ( ) , true )
}
if glog . V ( logger . Debug ) {
glog . Infof ( "Peer %s: queued block #%d [%x…], total %v" , peer , block . NumberU64 ( ) , hash . Bytes ( ) [ : 4 ] , f . queue . Size ( ) )
}
}
}
// insert spawns a new goroutine to run a block insertion into the chain. If the
// block's number is at the same height as the current import phase, if updates
// the phase states accordingly.
func ( f * Fetcher ) insert ( peer string , block * types . Block ) {
hash := block . Hash ( )
// Run the import on a new thread
glog . V ( logger . Debug ) . Infof ( "Peer %s: importing block #%d [%x…]" , peer , block . NumberU64 ( ) , hash [ : 4 ] )
go func ( ) {
defer func ( ) { f . done <- hash } ( )
// If the parent's unknown, abort insertion
parent := f . getBlock ( block . ParentHash ( ) )
if parent == nil {
glog . V ( logger . Debug ) . Infof ( "Peer %s: parent []%x] of block #%d [%x…] unknown" , block . ParentHash ( ) . Bytes ( ) [ : 4 ] , peer , block . NumberU64 ( ) , hash [ : 4 ] )
return
}
// Quickly validate the header and propagate the block if it passes
switch err := f . validateBlock ( block , parent ) ; err {
case nil :
// All ok, quickly propagate to our peers
propBroadcastOutTimer . UpdateSince ( block . ReceivedAt )
go f . broadcastBlock ( block , true )
case core . BlockFutureErr :
// Weird future block, don't fail, but neither propagate
default :
// Something went very wrong, drop the peer
glog . V ( logger . Debug ) . Infof ( "Peer %s: block #%d [%x…] verification failed: %v" , peer , block . NumberU64 ( ) , hash [ : 4 ] , err )
f . dropPeer ( peer )
return
}
// Run the actual import and log any issues
if _ , err := f . insertChain ( types . Blocks { block } ) ; err != nil {
glog . V ( logger . Warn ) . Infof ( "Peer %s: block #%d [%x…] import failed: %v" , peer , block . NumberU64 ( ) , hash [ : 4 ] , err )
return
}
// If import succeeded, broadcast the block
propAnnounceOutTimer . UpdateSince ( block . ReceivedAt )
go f . broadcastBlock ( block , false )
// Invoke the testing hook if needed
if f . importedHook != nil {
f . importedHook ( block )
}
} ( )
}
// forgetHash removes all traces of a block announcement from the fetcher's
// internal state.
func ( f * Fetcher ) forgetHash ( hash common . Hash ) {
// Remove all pending announces and decrement DOS counters
for _ , announce := range f . announced [ hash ] {
f . announces [ announce . origin ] --
if f . announces [ announce . origin ] == 0 {
delete ( f . announces , announce . origin )
}
}
delete ( f . announced , hash )
if f . announceChangeHook != nil {
f . announceChangeHook ( hash , false )
}
// Remove any pending fetches and decrement the DOS counters
if announce := f . fetching [ hash ] ; announce != nil {
f . announces [ announce . origin ] --
if f . announces [ announce . origin ] == 0 {
delete ( f . announces , announce . origin )
}
delete ( f . fetching , hash )
}
// Remove any pending completion requests and decrement the DOS counters
for _ , announce := range f . fetched [ hash ] {
f . announces [ announce . origin ] --
if f . announces [ announce . origin ] == 0 {
delete ( f . announces , announce . origin )
}
}
delete ( f . fetched , hash )
// Remove any pending completions and decrement the DOS counters
if announce := f . completing [ hash ] ; announce != nil {
f . announces [ announce . origin ] --
if f . announces [ announce . origin ] == 0 {
delete ( f . announces , announce . origin )
}
delete ( f . completing , hash )
}
}
// forgetBlock removes all traces of a queued block from the fetcher's internal
// state.
func ( f * Fetcher ) forgetBlock ( hash common . Hash ) {
if insert := f . queued [ hash ] ; insert != nil {
f . queues [ insert . origin ] --
if f . queues [ insert . origin ] == 0 {
delete ( f . queues , insert . origin )
}
delete ( f . queued , hash )
}
}