Merge pull request #2315 from karalabe/concurrent-headers-2

eth/downloader: concurrent header downloads
release/1.5
Jeffrey Wilcke 9 years ago
commit a8472e0fdb
  1. 443
      eth/downloader/downloader.go
  2. 8
      eth/downloader/downloader_test.go
  3. 62
      eth/downloader/peer.go
  4. 223
      eth/downloader/queue.go

@ -42,6 +42,7 @@ var (
MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request
MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly
MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
MaxStateFetch = 384 // Amount of node state values to allow fetching per request MaxStateFetch = 384 // Amount of node state values to allow fetching per request
@ -52,6 +53,7 @@ var (
blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request
blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired
headerTargetRTT = time.Second // [eth/62] Target time for completing a header retrieval request (only for measurements for now)
headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out
bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request
bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired
@ -60,9 +62,10 @@ var (
stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request
stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired
maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) maxQueuedHashes = 32 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
maxResultsProcess = 256 // Number of download results to import at once into the chain maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
maxResultsProcess = 2048 // Number of content download results to import at once into the chain
fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
@ -93,7 +96,8 @@ var (
errCancelBodyFetch = errors.New("block body download canceled (requested)") errCancelBodyFetch = errors.New("block body download canceled (requested)")
errCancelReceiptFetch = errors.New("receipt download canceled (requested)") errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
errCancelStateFetch = errors.New("state data download canceled (requested)") errCancelStateFetch = errors.New("state data download canceled (requested)")
errCancelProcessing = errors.New("processing canceled (requested)") errCancelHeaderProcessing = errors.New("header processing canceled (requested)")
errCancelContentProcessing = errors.New("content processing canceled (requested)")
errNoSyncActive = errors.New("no sync active") errNoSyncActive = errors.New("no sync active")
) )
@ -147,6 +151,7 @@ type Downloader struct {
bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
cancelCh chan struct{} // Channel to cancel mid-flight syncs cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
@ -194,6 +199,7 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha
bodyWakeCh: make(chan bool, 1), bodyWakeCh: make(chan bool, 1),
receiptWakeCh: make(chan bool, 1), receiptWakeCh: make(chan bool, 1),
stateWakeCh: make(chan bool, 1), stateWakeCh: make(chan bool, 1),
headerProcCh: make(chan []*types.Header, 1),
} }
} }
@ -308,6 +314,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
default: default:
} }
} }
for empty := false; !empty; {
select {
case <-d.headerProcCh:
default:
empty = true
}
}
// Reset any ephemeral sync statistics // Reset any ephemeral sync statistics
d.syncStatsLock.Lock() d.syncStatsLock.Lock()
d.syncStatsStateTotal = 0 d.syncStatsStateTotal = 0
@ -373,7 +386,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
if d.syncInitHook != nil { if d.syncInitHook != nil {
d.syncInitHook(origin, latest) d.syncInitHook(origin, latest)
} }
return d.spawnSync( return d.spawnSync(origin+1,
func() error { return d.fetchHashes61(p, td, origin+1) }, func() error { return d.fetchHashes61(p, td, origin+1) },
func() error { return d.fetchBlocks61(origin + 1) }, func() error { return d.fetchBlocks61(origin + 1) },
) )
@ -423,8 +436,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
if d.syncInitHook != nil { if d.syncInitHook != nil {
d.syncInitHook(origin, latest) d.syncInitHook(origin, latest)
} }
return d.spawnSync( return d.spawnSync(origin+1,
func() error { return d.fetchHeaders(p, td, origin+1) }, // Headers are always retrieved func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
@ -439,11 +453,11 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
// spawnSync runs d.process and all given fetcher functions to completion in // spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears. // separate goroutines, returning the first error that appears.
func (d *Downloader) spawnSync(fetchers ...func() error) error { func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
var wg sync.WaitGroup var wg sync.WaitGroup
errc := make(chan error, len(fetchers)+1) errc := make(chan error, len(fetchers)+1)
wg.Add(len(fetchers) + 1) wg.Add(len(fetchers) + 1)
go func() { defer wg.Done(); errc <- d.process() }() go func() { defer wg.Done(); errc <- d.processContent() }()
for _, fn := range fetchers { for _, fn := range fetchers {
fn := fn fn := fn
go func() { defer wg.Done(); errc <- fn() }() go func() { defer wg.Done(); errc <- fn() }()
@ -702,9 +716,9 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
getHashes := func(from uint64) { getHashes := func(from uint64) {
glog.V(logger.Detail).Infof("%v: fetching %d hashes from #%d", p, MaxHashFetch, from) glog.V(logger.Detail).Infof("%v: fetching %d hashes from #%d", p, MaxHashFetch, from)
go p.getAbsHashes(from, MaxHashFetch)
request = time.Now() request = time.Now()
timeout.Reset(hashTTL) timeout.Reset(hashTTL)
go p.getAbsHashes(from, MaxHashFetch)
} }
// Start pulling hashes, until all are exhausted // Start pulling hashes, until all are exhausted
getHashes(from) getHashes(from)
@ -1050,7 +1064,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
continue continue
} }
// Otherwise check if we already know the header or not // Otherwise check if we already know the header or not
if (d.mode != LightSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode == LightSync && d.hasHeader(headers[i].Hash())) { if (d.mode == FullSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) {
number, hash = headers[i].Number.Uint64(), headers[i].Hash() number, hash = headers[i].Number.Uint64(), headers[i].Hash()
break break
} }
@ -1149,55 +1163,39 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
return start, nil return start, nil
} }
// fetchHeaders keeps retrieving headers from the requested number, until no more // fetchHeaders keeps retrieving headers concurrently from the number
// are returned, potentially throttling on the way. // requested, until no more are returned, potentially throttling on the way. To
// // facilitate concurrency but still protect against malicious nodes sending bad
// The queue parameter can be used to switch between queuing headers for block // headers, we construct a header chain skeleton using the "origin" peer we are
// body download too, or directly import as pure header chains. // syncing with, and fill in the missing headers using anyone else. Headers from
func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { // other peers are only accepted if they map cleanly to the skeleton. If no one
glog.V(logger.Debug).Infof("%v: downloading headers from #%d", p, from) // can fill in the skeleton - not even the origin peer - it's assumed invalid and
// the origin is dropped.
func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
glog.V(logger.Debug).Infof("%v: directing header downloads from #%d", p, from)
defer glog.V(logger.Debug).Infof("%v: header download terminated", p) defer glog.V(logger.Debug).Infof("%v: header download terminated", p)
// Calculate the pivoting point for switching from fast to slow sync // Create a timeout timer, and the associated header fetcher
pivot := d.queue.FastSyncPivot() skeleton := true // Skeleton assembly phase or finishing up
request := time.Now() // time of the last skeleton fetch request
// Keep a count of uncertain headers to roll back
rollback := []*types.Header{}
defer func() {
if len(rollback) > 0 {
// Flatten the headers and roll them back
hashes := make([]common.Hash, len(rollback))
for i, header := range rollback {
hashes[i] = header.Hash()
}
lh, lfb, lb := d.headHeader().Number, d.headFastBlock().Number(), d.headBlock().Number()
d.rollback(hashes)
glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
len(hashes), lh, d.headHeader().Number, lfb, d.headFastBlock().Number(), lb, d.headBlock().Number())
// If we're already past the pivot point, this could be an attack, disable fast sync
if rollback[len(rollback)-1].Number.Uint64() > pivot {
d.noFast = true
}
}
}()
// Create a timeout timer, and the associated hash fetcher
request := time.Now() // time of the last fetch request
timeout := time.NewTimer(0) // timer to dump a non-responsive active peer timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
<-timeout.C // timeout channel should be initially empty <-timeout.C // timeout channel should be initially empty
defer timeout.Stop() defer timeout.Stop()
getHeaders := func(from uint64) { getHeaders := func(from uint64) {
glog.V(logger.Detail).Infof("%v: fetching %d headers from #%d", p, MaxHeaderFetch, from)
go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
request = time.Now() request = time.Now()
timeout.Reset(headerTTL) timeout.Reset(headerTTL)
if skeleton {
glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from)
go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
} else {
glog.V(logger.Detail).Infof("%v: fetching %d full headers from #%d", p, MaxHeaderFetch, from)
go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
}
} }
// Start pulling headers, until all are exhausted // Start pulling the header chain skeleton until all is done
getHeaders(from) getHeaders(from)
gotHeaders := false
for { for {
select { select {
@ -1205,116 +1203,48 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
return errCancelHeaderFetch return errCancelHeaderFetch
case packet := <-d.headerCh: case packet := <-d.headerCh:
// Make sure the active peer is giving us the headers // Make sure the active peer is giving us the skeleton headers
if packet.PeerId() != p.id { if packet.PeerId() != p.id {
glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", packet.PeerId()) glog.V(logger.Debug).Infof("Received skeleton headers from incorrect peer (%s)", packet.PeerId())
break break
} }
headerReqTimer.UpdateSince(request) headerReqTimer.UpdateSince(request)
timeout.Stop() timeout.Stop()
// If the skeleton's finished, pull any remaining head headers directly from the origin
if packet.Items() == 0 && skeleton {
skeleton = false
getHeaders(from)
continue
}
// If no more headers are inbound, notify the content fetchers and return // If no more headers are inbound, notify the content fetchers and return
if packet.Items() == 0 { if packet.Items() == 0 {
glog.V(logger.Debug).Infof("%v: no available headers", p) glog.V(logger.Debug).Infof("%v: no available headers", p)
d.headerProcCh <- nil
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
}
}
// If no headers were retrieved at all, the peer violated it's TD promise that it had a
// better chain compared to ours. The only exception is if it's promised blocks were
// already imported by other means (e.g. fetcher):
//
// R <remote peer>, L <local node>: Both at block 10
// R: Mine block 11, and propagate it to L
// L: Queue block 11 for import
// L: Notice that R's head and TD increased compared to ours, start sync
// L: Import of block 11 finishes
// L: Sync begins, and finds common ancestor at 11
// L: Request new headers up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
return errStallingPeer
}
// If fast or light syncing, ensure promised headers are indeed delivered. This is
// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
// of delivering the post-pivot blocks that would flag the invalid content.
//
// This check cannot be executed "as is" for full imports, since blocks may still be
// queued for processing when the header download completes. However, as long as the
// peer gave us something useful, we're already happy/progressed (above check).
if d.mode == FastSync || d.mode == LightSync {
if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 {
return errStallingPeer
}
}
rollback = nil
return nil return nil
} }
gotHeaders = true
headers := packet.(*headerPack).headers headers := packet.(*headerPack).headers
// Otherwise insert all the new headers, aborting in case of junk // If we received a skeleton batch, resolve internals concurrently
glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from) if skeleton {
filled, proced, err := d.fillHeaderSkeleton(from, headers)
if d.mode == FastSync || d.mode == LightSync { if err != nil {
// Collect the yet unknown headers to mark them as uncertain glog.V(logger.Debug).Infof("%v: skeleton chain invalid: %v", p, err)
unknown := make([]*types.Header, 0, len(headers))
for _, header := range headers {
if !d.hasHeader(header.Hash()) {
unknown = append(unknown, header)
}
}
// If we're importing pure headers, verify based on their recentness
frequency := fsHeaderCheckFrequency
if headers[len(headers)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
frequency = 1
}
if n, err := d.insertHeaders(headers, frequency); err != nil {
// If some headers were inserted, add them too to the rollback list
if n > 0 {
rollback = append(rollback, headers[:n]...)
}
glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headers[n].Number, headers[n].Hash().Bytes()[:4], err)
return errInvalidChain return errInvalidChain
} }
// All verifications passed, store newly found uncertain headers headers = filled[proced:]
rollback = append(rollback, unknown...) from += uint64(proced)
if len(rollback) > fsHeaderSafetyNet {
rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
}
}
if d.mode == FullSync || d.mode == FastSync {
inserts := d.queue.Schedule(headers, from)
if len(inserts) != len(headers) {
glog.V(logger.Debug).Infof("%v: stale headers", p)
return errBadPeer
} }
} // Insert all the new headers and fetch the next batch
// Notify the content fetchers of new headers, but stop if queue is full if len(headers) > 0 {
cont := d.queue.PendingBlocks() < maxQueuedHeaders && d.queue.PendingReceipts() < maxQueuedHeaders glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from)
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
if cont {
// We still have headers to fetch, send continuation wake signal (potential)
select {
case ch <- true:
default:
}
} else {
// Header limit reached, send a termination wake signal (enforced)
select { select {
case ch <- false: case d.headerProcCh <- headers:
case <-d.cancelCh: case <-d.cancelCh:
return errCancelHeaderFetch
} }
}
}
if !cont {
return nil
}
// Queue not yet full, fetch the next batch
from += uint64(len(headers)) from += uint64(len(headers))
}
getHeaders(from) getHeaders(from)
case <-timeout.C: case <-timeout.C:
@ -1330,7 +1260,11 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
case <-d.cancelCh: case <-d.cancelCh:
} }
} }
return nil select {
case d.headerProcCh <- nil:
case <-d.cancelCh:
}
return errBadPeer
case <-d.hashCh: case <-d.hashCh:
case <-d.blockCh: case <-d.blockCh:
@ -1340,6 +1274,43 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
} }
} }
// fillHeaderSkeleton concurrently retrieves headers from all our available peers
// and maps them to the provided skeleton header chain.
//
// Any partial results from the beginning of the skeleton is (if possible) forwarded
// immediately to the header processor to keep the rest of the pipeline full even
// in the case of header stalls.
//
// The method returs the entire filled skeleton and also the number of headers
// already forwarded for processing.
func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
glog.V(logger.Debug).Infof("Filling up skeleton from #%d", from)
d.queue.ScheduleSkeleton(from, skeleton)
var (
deliver = func(packet dataPack) (int, error) {
pack := packet.(*headerPack)
return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh)
}
expire = func() map[string]int { return d.queue.ExpireHeaders(headerTTL) }
throttle = func() bool { return false }
reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveHeaders(p, count), false, nil
}
fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
capacity = func(p *peer) int { return p.HeaderCapacity() }
setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) }
)
err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "Header")
glog.V(logger.Debug).Infof("Skeleton fill terminated: %v", err)
filled, proced := d.queue.RetrieveHeaders()
return filled, proced, err
}
// fetchBodies iteratively downloads the scheduled block bodies, taking any // fetchBodies iteratively downloads the scheduled block bodies, taking any
// available peers, reserving a chunk of blocks for each, waiting for delivery // available peers, reserving a chunk of blocks for each, waiting for delivery
// and also periodically checking for timeouts. // and also periodically checking for timeouts.
@ -1398,6 +1369,11 @@ func (d *Downloader) fetchNodeData() error {
deliver = func(packet dataPack) (int, error) { deliver = func(packet dataPack) (int, error) {
start := time.Now() start := time.Now()
return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) { return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) {
// If the peer gave us nothing, stalling fast sync, drop
if delivered == 0 {
glog.V(logger.Debug).Infof("peer %s: stalling state delivery, dropping", packet.PeerId())
d.dropPeer(packet.PeerId())
}
if err != nil { if err != nil {
// If the node data processing failed, the root hash is very wrong, abort // If the node data processing failed, the root hash is very wrong, abort
glog.V(logger.Error).Infof("peer %d: state processing failed: %v", packet.PeerId(), err) glog.V(logger.Error).Infof("peer %d: state processing failed: %v", packet.PeerId(), err)
@ -1438,6 +1414,28 @@ func (d *Downloader) fetchNodeData() error {
// fetchParts iteratively downloads scheduled block parts, taking any available // fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and // peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts. // also periodically checking for timeouts.
//
// As the scheduling/timeout logic mostly is the same for all downloaded data
// types, this method is used by each for data gathering and is instrumented with
// various callbacks to handle the slight differences between processing them.
//
// The instrumentation parameters:
// - errCancel: error type to return if the fetch operation is cancelled (mostly makes logging nicer)
// - deliveryCh: channel from which to retrieve downloaded data packets (merged from all concurrent peers)
// - deliver: processing callback to deliver data packets into type specific download queues (usually within `queue`)
// - wakeCh: notification channel for waking the fetcher when new tasks are available (or sync completed)
// - expire: task callback method to abort requests that took too long and return the faulty peers (traffic shaping)
// - pending: task callback for the number of requests still needing download (detect completion/non-completability)
// - inFlight: task callback for the number of in-progress requests (wait for all active downloads to finish)
// - throttle: task callback to check if the processing queue is full and activate throttling (bound memory use)
// - reserve: task callback to reserve new download tasks to a particular peer (also signals partial completions)
// - fetchHook: tester callback to notify of new tasks being initiated (allows testing the scheduling logic)
// - fetch: network callback to actually send a particular download request to a physical remote peer
// - cancel: task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer)
// - capacity: network callback to retreive the estimated type-specific bandwidth capacity of a peer (traffic shaping)
// - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
// - kind: textual label of the type being downloaded to display in log mesages
func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int,
@ -1554,7 +1552,9 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
continue continue
} }
if glog.V(logger.Detail) { if glog.V(logger.Detail) {
if len(request.Headers) > 0 { if request.From > 0 {
glog.Infof("%s: requesting %s(s) from #%d", peer, strings.ToLower(kind), request.From)
} else if len(request.Headers) > 0 {
glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number) glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number)
} else { } else {
glog.Infof("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind)) glog.Infof("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind))
@ -1588,9 +1588,162 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
} }
} }
// process takes fetch results from the queue and tries to import them into the // processHeaders takes batches of retrieved headers from an input channel and
// chain. The type of import operation will depend on the result contents. // keeps processing and scheduling them into the header chain and downloader's
func (d *Downloader) process() error { // queue until the stream ends or a failure occurs.
func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// Calculate the pivoting point for switching from fast to slow sync
pivot := d.queue.FastSyncPivot()
// Keep a count of uncertain headers to roll back
rollback := []*types.Header{}
defer func() {
if len(rollback) > 0 {
// Flatten the headers and roll them back
hashes := make([]common.Hash, len(rollback))
for i, header := range rollback {
hashes[i] = header.Hash()
}
lastHeader, lastFastBlock, lastBlock := d.headHeader().Number, d.headFastBlock().Number(), d.headBlock().Number()
d.rollback(hashes)
glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, d.headFastBlock().Number(), lastBlock, d.headBlock().Number())
// If we're already past the pivot point, this could be an attack, disable fast sync
if rollback[len(rollback)-1].Number.Uint64() > pivot {
d.noFast = true
}
}
}()
// Wait for batches of headers to process
gotHeaders := false
for {
select {
case <-d.cancelCh:
return errCancelHeaderProcessing
case headers := <-d.headerProcCh:
// Terminate header processing if we synced up
if len(headers) == 0 {
// Notify everyone that headers are fully processed
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
}
}
// If no headers were retrieved at all, the peer violated it's TD promise that it had a
// better chain compared to ours. The only exception is if it's promised blocks were
// already imported by other means (e.g. fecher):
//
// R <remote peer>, L <local node>: Both at block 10
// R: Mine block 11, and propagate it to L
// L: Queue block 11 for import
// L: Notice that R's head and TD increased compared to ours, start sync
// L: Import of block 11 finishes
// L: Sync begins, and finds common ancestor at 11
// L: Request new headers up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
return errStallingPeer
}
// If fast or light syncing, ensure promised headers are indeed delivered. This is
// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
// of delivering the post-pivot blocks that would flag the invalid content.
//
// This check cannot be executed "as is" for full imports, since blocks may still be
// queued for processing when the header download completes. However, as long as the
// peer gave us something useful, we're already happy/progressed (above check).
if d.mode == FastSync || d.mode == LightSync {
if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 {
return errStallingPeer
}
}
// Disable any rollback and return
rollback = nil
return nil
}
// Otherwise split the chunk of headers into batches and process them
gotHeaders = true
for len(headers) > 0 {
// Terminate if something failed in between processing chunks
select {
case <-d.cancelCh:
return errCancelHeaderProcessing
default:
}
// Select the next chunk of headers to import
limit := maxHeadersProcess
if limit > len(headers) {
limit = len(headers)
}
chunk := headers[:limit]
// In case of header only syncing, validate the chunk immediately
if d.mode == FastSync || d.mode == LightSync {
// Collect the yet unknown headers to mark them as uncertain
unknown := make([]*types.Header, 0, len(headers))
for _, header := range chunk {
if !d.hasHeader(header.Hash()) {
unknown = append(unknown, header)
}
}
// If we're importing pure headers, verify based on their recentness
frequency := fsHeaderCheckFrequency
if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
frequency = 1
}
if n, err := d.insertHeaders(chunk, frequency); err != nil {
// If some headers were inserted, add them too to the rollback list
if n > 0 {
rollback = append(rollback, chunk[:n]...)
}
glog.V(logger.Debug).Infof("invalid header #%d [%x…]: %v", chunk[n].Number, chunk[n].Hash().Bytes()[:4], err)
return errInvalidChain
}
// All verifications passed, store newly found uncertain headers
rollback = append(rollback, unknown...)
if len(rollback) > fsHeaderSafetyNet {
rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
}
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if d.mode == FullSync || d.mode == FastSync {
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
select {
case <-d.cancelCh:
return errCancelHeaderProcessing
case <-time.After(time.Second):
}
}
// Otherwise insert the headers for content retrieval
inserts := d.queue.Schedule(chunk, origin)
if len(inserts) != len(chunk) {
glog.V(logger.Debug).Infof("stale headers")
return errBadPeer
}
}
headers = headers[limit:]
origin += uint64(limit)
}
// Signal the content downloaders of the availablility of new tasks
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
select {
case ch <- true:
default:
}
}
}
}
}
// processContent takes fetch results from the queue and tries to import them
// into the chain. The type of import operation will depend on the result contents.
func (d *Downloader) processContent() error {
pivot := d.queue.FastSyncPivot() pivot := d.queue.FastSyncPivot()
for { for {
results := d.queue.WaitResults() results := d.queue.WaitResults()
@ -1608,7 +1761,7 @@ func (d *Downloader) process() error {
for len(results) != 0 { for len(results) != 0 {
// Check for any termination requests // Check for any termination requests
if atomic.LoadInt32(&d.interrupt) == 1 { if atomic.LoadInt32(&d.interrupt) == 1 {
return errCancelProcessing return errCancelContentProcessing
} }
// Retrieve the a batch of results to import // Retrieve the a batch of results to import
var ( var (

@ -560,8 +560,8 @@ func (dl *downloadTester) peerGetAbsHeadersFn(id string, delay time.Duration) fu
hashes := dl.peerHashes[id] hashes := dl.peerHashes[id]
headers := dl.peerHeaders[id] headers := dl.peerHeaders[id]
result := make([]*types.Header, 0, amount) result := make([]*types.Header, 0, amount)
for i := 0; i < amount && len(hashes)-int(origin)-1-i >= 0; i++ { for i := 0; i < amount && len(hashes)-int(origin)-1-i*(skip+1) >= 0; i++ {
if header, ok := headers[hashes[len(hashes)-int(origin)-1-i]]; ok { if header, ok := headers[hashes[len(hashes)-int(origin)-1-i*(skip+1)]]; ok {
result = append(result, header) result = append(result, header)
} }
} }
@ -1258,6 +1258,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
// rolled back, and also the pivot point being reverted to a non-block status. // rolled back, and also the pivot point being reverted to a non-block status.
tester.newPeer("block-attack", protocol, hashes, headers, blocks, receipts) tester.newPeer("block-attack", protocol, hashes, headers, blocks, receipts)
missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1 missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1
delete(tester.peerHeaders["fast-attack"], hashes[len(hashes)-missing]) // Make sure the fast-attacker doesn't fill in
delete(tester.peerHeaders["block-attack"], hashes[len(hashes)-missing]) delete(tester.peerHeaders["block-attack"], hashes[len(hashes)-missing])
if err := tester.sync("block-attack", nil, mode); err == nil { if err := tester.sync("block-attack", nil, mode); err == nil {
@ -1368,7 +1369,8 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
{errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelHeaderProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelContentProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
} }
// Run the tests and check disconnection status // Run the tests and check disconnection status
tester := newTester() tester := newTester()

@ -58,14 +58,17 @@ type peer struct {
id string // Unique identifier of the peer id string // Unique identifier of the peer
head common.Hash // Hash of the peers latest known block head common.Hash // Hash of the peers latest known block
headerIdle int32 // Current header activity state of the peer (idle = 0, active = 1)
blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1) blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1) receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1) stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1)
headerThroughput float64 // Number of headers measured to be retrievable per second
blockThroughput float64 // Number of blocks (bodies) measured to be retrievable per second blockThroughput float64 // Number of blocks (bodies) measured to be retrievable per second
receiptThroughput float64 // Number of receipts measured to be retrievable per second receiptThroughput float64 // Number of receipts measured to be retrievable per second
stateThroughput float64 // Number of node data pieces measured to be retrievable per second stateThroughput float64 // Number of node data pieces measured to be retrievable per second
headerStarted time.Time // Time instance when the last header fetch was started
blockStarted time.Time // Time instance when the last block (body) fetch was started blockStarted time.Time // Time instance when the last block (body) fetch was started
receiptStarted time.Time // Time instance when the last receipt fetch was started receiptStarted time.Time // Time instance when the last receipt fetch was started
stateStarted time.Time // Time instance when the last node data fetch was started stateStarted time.Time // Time instance when the last node data fetch was started
@ -118,10 +121,12 @@ func (p *peer) Reset() {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
atomic.StoreInt32(&p.headerIdle, 0)
atomic.StoreInt32(&p.blockIdle, 0) atomic.StoreInt32(&p.blockIdle, 0)
atomic.StoreInt32(&p.receiptIdle, 0) atomic.StoreInt32(&p.receiptIdle, 0)
atomic.StoreInt32(&p.stateIdle, 0) atomic.StoreInt32(&p.stateIdle, 0)
p.headerThroughput = 0
p.blockThroughput = 0 p.blockThroughput = 0
p.receiptThroughput = 0 p.receiptThroughput = 0
p.stateThroughput = 0 p.stateThroughput = 0
@ -151,6 +156,24 @@ func (p *peer) Fetch61(request *fetchRequest) error {
return nil return nil
} }
// FetchHeaders sends a header retrieval request to the remote peer.
func (p *peer) FetchHeaders(from uint64, count int) error {
// Sanity check the protocol version
if p.version < 62 {
panic(fmt.Sprintf("header fetch [eth/62+] requested on eth/%d", p.version))
}
// Short circuit if the peer is already fetching
if !atomic.CompareAndSwapInt32(&p.headerIdle, 0, 1) {
return errAlreadyFetching
}
p.headerStarted = time.Now()
// Issue the header retrieval request (absolut upwards without gaps)
go p.getAbsHeaders(from, count, 0, false)
return nil
}
// FetchBodies sends a block body retrieval request to the remote peer. // FetchBodies sends a block body retrieval request to the remote peer.
func (p *peer) FetchBodies(request *fetchRequest) error { func (p *peer) FetchBodies(request *fetchRequest) error {
// Sanity check the protocol version // Sanity check the protocol version
@ -217,6 +240,13 @@ func (p *peer) FetchNodeData(request *fetchRequest) error {
return nil return nil
} }
// SetHeadersIdle sets the peer to idle, allowing it to execute new header retrieval
// requests. Its estimated header retrieval throughput is updated with that measured
// just now.
func (p *peer) SetHeadersIdle(delivered int) {
p.setIdle(p.headerStarted, delivered, &p.headerThroughput, &p.headerIdle)
}
// SetBlocksIdle sets the peer to idle, allowing it to execute new block retrieval // SetBlocksIdle sets the peer to idle, allowing it to execute new block retrieval
// requests. Its estimated block retrieval throughput is updated with that measured // requests. Its estimated block retrieval throughput is updated with that measured
// just now. // just now.
@ -264,6 +294,15 @@ func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, id
*throughput = (1-throughputImpact)*(*throughput) + throughputImpact*measured *throughput = (1-throughputImpact)*(*throughput) + throughputImpact*measured
} }
// HeaderCapacity retrieves the peers header download allowance based on its
// previously discovered throughput.
func (p *peer) HeaderCapacity() int {
p.lock.RLock()
defer p.lock.RUnlock()
return int(math.Max(1, math.Min(p.headerThroughput*float64(headerTargetRTT)/float64(time.Second), float64(MaxHeaderFetch))))
}
// BlockCapacity retrieves the peers block download allowance based on its // BlockCapacity retrieves the peers block download allowance based on its
// previously discovered throughput. // previously discovered throughput.
func (p *peer) BlockCapacity() int { func (p *peer) BlockCapacity() int {
@ -323,6 +362,7 @@ func (p *peer) String() string {
defer p.lock.RUnlock() defer p.lock.RUnlock()
return fmt.Sprintf("Peer %s [%s]", p.id, return fmt.Sprintf("Peer %s [%s]", p.id,
fmt.Sprintf("headers %3.2f/s, ", p.headerThroughput)+
fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+ fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+
fmt.Sprintf("receipts %3.2f/s, ", p.receiptThroughput)+ fmt.Sprintf("receipts %3.2f/s, ", p.receiptThroughput)+
fmt.Sprintf("states %3.2f/s, ", p.stateThroughput)+ fmt.Sprintf("states %3.2f/s, ", p.stateThroughput)+
@ -330,7 +370,7 @@ func (p *peer) String() string {
) )
} }
// peerSet represents the collection of active peer participating in the block // peerSet represents the collection of active peer participating in the chain
// download procedure. // download procedure.
type peerSet struct { type peerSet struct {
peers map[string]*peer peers map[string]*peer
@ -359,7 +399,7 @@ func (ps *peerSet) Reset() {
// peer is already known. // peer is already known.
// //
// The method also sets the starting throughput values of the new peer to the // The method also sets the starting throughput values of the new peer to the
// average of all existing peers, to give it a realistic change of being used // average of all existing peers, to give it a realistic chance of being used
// for data retrievals. // for data retrievals.
func (ps *peerSet) Register(p *peer) error { func (ps *peerSet) Register(p *peer) error {
ps.lock.Lock() ps.lock.Lock()
@ -369,15 +409,17 @@ func (ps *peerSet) Register(p *peer) error {
return errAlreadyRegistered return errAlreadyRegistered
} }
if len(ps.peers) > 0 { if len(ps.peers) > 0 {
p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0 p.headerThroughput, p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0, 0
for _, peer := range ps.peers { for _, peer := range ps.peers {
peer.lock.RLock() peer.lock.RLock()
p.headerThroughput += peer.headerThroughput
p.blockThroughput += peer.blockThroughput p.blockThroughput += peer.blockThroughput
p.receiptThroughput += peer.receiptThroughput p.receiptThroughput += peer.receiptThroughput
p.stateThroughput += peer.stateThroughput p.stateThroughput += peer.stateThroughput
peer.lock.RUnlock() peer.lock.RUnlock()
} }
p.headerThroughput /= float64(len(ps.peers))
p.blockThroughput /= float64(len(ps.peers)) p.blockThroughput /= float64(len(ps.peers))
p.receiptThroughput /= float64(len(ps.peers)) p.receiptThroughput /= float64(len(ps.peers))
p.stateThroughput /= float64(len(ps.peers)) p.stateThroughput /= float64(len(ps.peers))
@ -441,6 +483,20 @@ func (ps *peerSet) BlockIdlePeers() ([]*peer, int) {
return ps.idlePeers(61, 61, idle, throughput) return ps.idlePeers(61, 61, idle, throughput)
} }
// HeaderIdlePeers retrieves a flat list of all the currently header-idle peers
// within the active peer set, ordered by their reputation.
func (ps *peerSet) HeaderIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool {
return atomic.LoadInt32(&p.headerIdle) == 0
}
throughput := func(p *peer) float64 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.headerThroughput
}
return ps.idlePeers(62, 64, idle, throughput)
}
// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within // BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
// the active peer set, ordered by their reputation. // the active peer set, ordered by their reputation.
func (ps *peerSet) BodyIdlePeers() ([]*peer, int) { func (ps *peerSet) BodyIdlePeers() ([]*peer, int) {

@ -40,7 +40,7 @@ import (
var ( var (
blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download
maxInFlightStates = 4096 // Maximum number of state downloads to allow concurrently maxInFlightStates = 8192 // Maximum number of state downloads to allow concurrently
) )
var ( var (
@ -52,6 +52,7 @@ var (
// fetchRequest is a currently running data retrieval operation. // fetchRequest is a currently running data retrieval operation.
type fetchRequest struct { type fetchRequest struct {
Peer *peer // Peer to which the request was sent Peer *peer // Peer to which the request was sent
From uint64 // [eth/62] Requested chain element index (used for skeleton fills only)
Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority) Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority)
Headers []*types.Header // [eth/62] Requested headers, sorted by request order Headers []*types.Header // [eth/62] Requested headers, sorted by request order
Time time.Time // Time when the request was made Time time.Time // Time when the request was made
@ -79,6 +80,18 @@ type queue struct {
headerHead common.Hash // [eth/62] Hash of the last queued header to verify order headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
// Headers are "special", they download in batches, supported by a skeleton chain
headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers
headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for
headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable
headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations
headerDonePool map[uint64]struct{} // [eth/62] Set of the completed header fetches
headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers
headerProced int // [eth/62] Number of headers already processed from the results
headerOffset uint64 // [eth/62] Number of the first header in the result cache
headerContCh chan bool // [eth/62] Channel to notify when header download finishes
// All data retrievals below are based on an already assembles header chain
blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers
blockTaskQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the blocks (bodies) for blockTaskQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the blocks (bodies) for
blockPendPool map[string]*fetchRequest // [eth/62] Currently pending block (body) retrieval operations blockPendPool map[string]*fetchRequest // [eth/62] Currently pending block (body) retrieval operations
@ -113,6 +126,8 @@ func newQueue(stateDb ethdb.Database) *queue {
return &queue{ return &queue{
hashPool: make(map[common.Hash]int), hashPool: make(map[common.Hash]int),
hashQueue: prque.New(), hashQueue: prque.New(),
headerPendPool: make(map[string]*fetchRequest),
headerContCh: make(chan bool),
blockTaskPool: make(map[common.Hash]*types.Header), blockTaskPool: make(map[common.Hash]*types.Header),
blockTaskQueue: prque.New(), blockTaskQueue: prque.New(),
blockPendPool: make(map[string]*fetchRequest), blockPendPool: make(map[string]*fetchRequest),
@ -149,6 +164,8 @@ func (q *queue) Reset() {
q.headerHead = common.Hash{} q.headerHead = common.Hash{}
q.headerPendPool = make(map[string]*fetchRequest)
q.blockTaskPool = make(map[common.Hash]*types.Header) q.blockTaskPool = make(map[common.Hash]*types.Header)
q.blockTaskQueue.Reset() q.blockTaskQueue.Reset()
q.blockPendPool = make(map[string]*fetchRequest) q.blockPendPool = make(map[string]*fetchRequest)
@ -178,6 +195,14 @@ func (q *queue) Close() {
q.active.Broadcast() q.active.Broadcast()
} }
// PendingHeaders retrieves the number of header requests pending for retrieval.
func (q *queue) PendingHeaders() int {
q.lock.Lock()
defer q.lock.Unlock()
return q.headerTaskQueue.Size()
}
// PendingBlocks retrieves the number of block (body) requests pending for retrieval. // PendingBlocks retrieves the number of block (body) requests pending for retrieval.
func (q *queue) PendingBlocks() int { func (q *queue) PendingBlocks() int {
q.lock.Lock() q.lock.Lock()
@ -205,6 +230,15 @@ func (q *queue) PendingNodeData() int {
return 0 return 0
} }
// InFlightHeaders retrieves whether there are header fetch requests currently
// in flight.
func (q *queue) InFlightHeaders() bool {
q.lock.Lock()
defer q.lock.Unlock()
return len(q.headerPendPool) > 0
}
// InFlightBlocks retrieves whether there are block fetch requests currently in // InFlightBlocks retrieves whether there are block fetch requests currently in
// flight. // flight.
func (q *queue) InFlightBlocks() bool { func (q *queue) InFlightBlocks() bool {
@ -317,6 +351,45 @@ func (q *queue) Schedule61(hashes []common.Hash, fifo bool) []common.Hash {
return inserts return inserts
} }
// ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill
// up an already retrieved header skeleton.
func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
q.lock.Lock()
defer q.lock.Unlock()
// No skeleton retrieval can be in progress, fail hard if so (huge implementation bug)
if q.headerResults != nil {
panic("skeleton assembly already in progress")
}
// Shedule all the header retrieval tasks for the skeleton assembly
q.headerTaskPool = make(map[uint64]*types.Header)
q.headerTaskQueue = prque.New()
q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
q.headerProced = 0
q.headerOffset = from
q.headerContCh = make(chan bool, 1)
for i, header := range skeleton {
index := from + uint64(i*MaxHeaderFetch)
q.headerTaskPool[index] = header
q.headerTaskQueue.Push(index, -float32(index))
}
}
// RetrieveHeaders retrieves the header chain assemble based on the scheduled
// skeleton.
func (q *queue) RetrieveHeaders() ([]*types.Header, int) {
q.lock.Lock()
defer q.lock.Unlock()
headers, proced := q.headerResults, q.headerProced
q.headerResults, q.headerProced = nil, 0
return headers, proced
}
// Schedule adds a set of headers for the download queue for scheduling, returning // Schedule adds a set of headers for the download queue for scheduling, returning
// the new headers encountered. // the new headers encountered.
func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
@ -437,6 +510,46 @@ func (q *queue) countProcessableItems() int {
return len(q.resultCache) return len(q.resultCache)
} }
// ReserveHeaders reserves a set of headers for the given peer, skipping any
// previously failed batches.
func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the peer's already downloading something (sanity check to
// not corrupt state)
if _, ok := q.headerPendPool[p.id]; ok {
return nil
}
// Retrieve a batch of hashes, skipping previously failed ones
send, skip := uint64(0), []uint64{}
for send == 0 && !q.headerTaskQueue.Empty() {
from, _ := q.headerTaskQueue.Pop()
if q.headerPeerMiss[p.id] != nil {
if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok {
skip = append(skip, from.(uint64))
continue
}
}
send = from.(uint64)
}
// Merge all the skipped batches back
for _, from := range skip {
q.headerTaskQueue.Push(from, -float32(from))
}
// Assemble and return the block download request
if send == 0 {
return nil
}
request := &fetchRequest{
Peer: p,
From: send,
Time: time.Now(),
}
q.headerPendPool[p.id] = request
return request
}
// ReserveBlocks reserves a set of block hashes for the given peer, skipping any // ReserveBlocks reserves a set of block hashes for the given peer, skipping any
// previously failed download. // previously failed download.
func (q *queue) ReserveBlocks(p *peer, count int) *fetchRequest { func (q *queue) ReserveBlocks(p *peer, count int) *fetchRequest {
@ -635,6 +748,11 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
return request, progress, nil return request, progress, nil
} }
// CancelHeaders aborts a fetch request, returning all pending skeleton indexes to the queue.
func (q *queue) CancelHeaders(request *fetchRequest) {
q.cancel(request, q.headerTaskQueue, q.headerPendPool)
}
// CancelBlocks aborts a fetch request, returning all pending hashes to the queue. // CancelBlocks aborts a fetch request, returning all pending hashes to the queue.
func (q *queue) CancelBlocks(request *fetchRequest) { func (q *queue) CancelBlocks(request *fetchRequest) {
q.cancel(request, q.hashQueue, q.blockPendPool) q.cancel(request, q.hashQueue, q.blockPendPool)
@ -663,6 +781,9 @@ func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool m
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
if request.From > 0 {
taskQueue.Push(request.From, -float32(request.From))
}
for hash, index := range request.Hashes { for hash, index := range request.Hashes {
taskQueue.Push(hash, float32(index)) taskQueue.Push(hash, float32(index))
} }
@ -702,6 +823,15 @@ func (q *queue) Revoke(peerId string) {
} }
} }
// ExpireHeaders checks for in flight requests that exceeded a timeout allowance,
// canceling them and returning the responsible peers for penalisation.
func (q *queue) ExpireHeaders(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
return q.expire(timeout, q.headerPendPool, q.headerTaskQueue, headerTimeoutMeter)
}
// ExpireBlocks checks for in flight requests that exceeded a timeout allowance, // ExpireBlocks checks for in flight requests that exceeded a timeout allowance,
// canceling them and returning the responsible peers for penalisation. // canceling them and returning the responsible peers for penalisation.
func (q *queue) ExpireBlocks(timeout time.Duration) map[string]int { func (q *queue) ExpireBlocks(timeout time.Duration) map[string]int {
@ -753,6 +883,9 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
timeoutMeter.Mark(1) timeoutMeter.Mark(1)
// Return any non satisfied requests to the pool // Return any non satisfied requests to the pool
if request.From > 0 {
taskQueue.Push(request.From, -float32(request.From))
}
for hash, index := range request.Hashes { for hash, index := range request.Hashes {
taskQueue.Push(hash, float32(index)) taskQueue.Push(hash, float32(index))
} }
@ -842,6 +975,94 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) {
} }
} }
// DeliverHeaders injects a header retrieval response into the header results
// cache. This method either accepts all headers it received, or none of them
// if they do not map correctly to the skeleton.
//
// If the headers are accepted, the method makes an attempt to deliver the set
// of ready headers to the processor to keep the pipeline full. However it will
// not block to prevent stalling other pending deliveries.
func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the data was never requested
request := q.headerPendPool[id]
if request == nil {
return 0, errNoFetchesPending
}
headerReqTimer.UpdateSince(request.Time)
delete(q.headerPendPool, id)
// Ensure headers can be mapped onto the skeleton chain
target := q.headerTaskPool[request.From].Hash()
accepted := len(headers) == MaxHeaderFetch
if accepted {
if headers[0].Number.Uint64() != request.From {
glog.V(logger.Detail).Infof("Peer %s: first header #%v [%x] broke chain ordering, expected %d", id, headers[0].Number, headers[0].Hash().Bytes()[:4], request.From)
accepted = false
} else if headers[len(headers)-1].Hash() != target {
glog.V(logger.Detail).Infof("Peer %s: last header #%v [%x] broke skeleton structure, expected %x", id, headers[len(headers)-1].Number, headers[len(headers)-1].Hash().Bytes()[:4], target[:4])
accepted = false
}
}
if accepted {
for i, header := range headers[1:] {
hash := header.Hash()
if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
glog.V(logger.Warn).Infof("Peer %s: header #%v [%x] broke chain ordering, expected %d", id, header.Number, hash[:4], want)
accepted = false
break
}
if headers[i].Hash() != header.ParentHash {
glog.V(logger.Warn).Infof("Peer %s: header #%v [%x] broke chain ancestry", id, header.Number, hash[:4])
accepted = false
break
}
}
}
// If the batch of headers wasn't accepted, mark as unavailable
if !accepted {
glog.V(logger.Detail).Infof("Peer %s: skeleton filling from header #%d not accepted", id, request.From)
miss := q.headerPeerMiss[id]
if miss == nil {
q.headerPeerMiss[id] = make(map[uint64]struct{})
miss = q.headerPeerMiss[id]
}
miss[request.From] = struct{}{}
q.headerTaskQueue.Push(request.From, -float32(request.From))
return 0, errors.New("delivery not accepted")
}
// Clean up a successful fetch and try to deliver any sub-results
copy(q.headerResults[request.From-q.headerOffset:], headers)
delete(q.headerTaskPool, request.From)
ready := 0
for q.headerProced+ready < len(q.headerResults) && q.headerResults[q.headerProced+ready] != nil {
ready += MaxHeaderFetch
}
if ready > 0 {
// Headers are ready for delivery, gather them and push forward (non blocking)
process := make([]*types.Header, ready)
copy(process, q.headerResults[q.headerProced:q.headerProced+ready])
select {
case headerProcCh <- process:
glog.V(logger.Detail).Infof("%s: pre-scheduled %d headers from #%v", id, len(process), process[0].Number)
q.headerProced += len(process)
default:
}
}
// Check for termination and return
if len(q.headerTaskPool) == 0 {
q.headerContCh <- false
}
return len(headers), nil
}
// DeliverBodies injects a block body retrieval response into the results queue. // DeliverBodies injects a block body retrieval response into the results queue.
// The method returns the number of blocks bodies accepted from the delivery and // The method returns the number of blocks bodies accepted from the delivery and
// also wakes any threads waiting for data delivery. // also wakes any threads waiting for data delivery.

Loading…
Cancel
Save