Merge pull request #2630 from karalabe/adaptive-qos-tuning

eth/downloader: adaptive quality of service tuning
pull/2649/head
Péter Szilágyi 9 years ago
commit 826efc2295
  1. 144
      eth/downloader/downloader.go
  2. 58
      eth/downloader/downloader_test.go
  3. 79
      eth/downloader/peer.go

@ -54,14 +54,15 @@ var (
blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request
blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired
headerTargetRTT = time.Second // [eth/62] Target time for completing a header retrieval request (only for measurements for now) rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests
headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out rttMaxEstimate = 20 * time.Second // Maximum rount-trip time to target for download requests
bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts
receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired
stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request qosTuningPeers = 5 // Number of peers to tune based on (best peers)
stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
maxQueuedHashes = 32 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) maxQueuedHashes = 32 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
@ -113,7 +114,8 @@ type Downloader struct {
fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries) fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
fsPivotFails int // Number of fast sync failures in the critical section fsPivotFails int // Number of fast sync failures in the critical section
interrupt int32 // Atomic boolean to signal termination rttEstimate uint64 // Round trip time to target for download requests
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
// Statistics // Statistics
syncStatsChainOrigin uint64 // Origin block number where syncing started at syncStatsChainOrigin uint64 // Origin block number where syncing started at
@ -159,6 +161,9 @@ type Downloader struct {
cancelCh chan struct{} // Channel to cancel mid-flight syncs cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
quitCh chan struct{} // Quit channel to signal termination
quitLock sync.RWMutex // Lock to prevent double closes
// Testing hooks // Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
@ -172,11 +177,13 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha
headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn,
insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader { insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader {
return &Downloader{ dl := &Downloader{
mode: FullSync, mode: FullSync,
mux: mux, mux: mux,
queue: newQueue(stateDb), queue: newQueue(stateDb),
peers: newPeerSet(), peers: newPeerSet(),
rttEstimate: uint64(rttMaxEstimate),
rttConfidence: uint64(1000000),
hasHeader: hasHeader, hasHeader: hasHeader,
hasBlockAndState: hasBlockAndState, hasBlockAndState: hasBlockAndState,
getHeader: getHeader, getHeader: getHeader,
@ -203,7 +210,10 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha
receiptWakeCh: make(chan bool, 1), receiptWakeCh: make(chan bool, 1),
stateWakeCh: make(chan bool, 1), stateWakeCh: make(chan bool, 1),
headerProcCh: make(chan []*types.Header, 1), headerProcCh: make(chan []*types.Header, 1),
quitCh: make(chan struct{}),
} }
go dl.qosTuner()
return dl
} }
// Progress retrieves the synchronisation boundaries, specifically the origin // Progress retrieves the synchronisation boundaries, specifically the origin
@ -250,6 +260,8 @@ func (d *Downloader) RegisterPeer(id string, version int, head common.Hash,
glog.V(logger.Error).Infoln("Register failed:", err) glog.V(logger.Error).Infoln("Register failed:", err)
return err return err
} }
d.qosReduceConfidence()
return nil return nil
} }
@ -515,7 +527,16 @@ func (d *Downloader) cancel() {
// Terminate interrupts the downloader, canceling all pending operations. // Terminate interrupts the downloader, canceling all pending operations.
// The downloader cannot be reused after calling Terminate. // The downloader cannot be reused after calling Terminate.
func (d *Downloader) Terminate() { func (d *Downloader) Terminate() {
atomic.StoreInt32(&d.interrupt, 1) // Close the termination channel (make sure double close is allowed)
d.quitLock.Lock()
select {
case <-d.quitCh:
default:
close(d.quitCh)
}
d.quitLock.Unlock()
// Cancel any pending download requests
d.cancel() d.cancel()
} }
@ -932,7 +953,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// Reserve a chunk of hashes for a peer. A nil can mean either that // Reserve a chunk of hashes for a peer. A nil can mean either that
// no more hashes are available, or that the peer is known not to // no more hashes are available, or that the peer is known not to
// have them. // have them.
request := d.queue.ReserveBlocks(peer, peer.BlockCapacity()) request := d.queue.ReserveBlocks(peer, peer.BlockCapacity(blockTargetRTT))
if request == nil { if request == nil {
continue continue
} }
@ -973,7 +994,7 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
// Request the advertised remote head block and wait for the response // Request the advertised remote head block and wait for the response
go p.getRelHeaders(p.head, 1, 0, false) go p.getRelHeaders(p.head, 1, 0, false)
timeout := time.After(headerTTL) timeout := time.After(d.requestTTL())
for { for {
select { select {
case <-d.cancelCh: case <-d.cancelCh:
@ -1041,7 +1062,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// Wait for the remote response to the head fetch // Wait for the remote response to the head fetch
number, hash := uint64(0), common.Hash{} number, hash := uint64(0), common.Hash{}
timeout := time.After(hashTTL) timeout := time.After(d.requestTTL())
for finished := false; !finished; { for finished := false; !finished; {
select { select {
@ -1118,7 +1139,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// Split our chain interval in two, and request the hash to cross check // Split our chain interval in two, and request the hash to cross check
check := (start + end) / 2 check := (start + end) / 2
timeout := time.After(hashTTL) timeout := time.After(d.requestTTL())
go p.getAbsHeaders(uint64(check), 1, 0, false) go p.getAbsHeaders(uint64(check), 1, 0, false)
// Wait until a reply arrives to this request // Wait until a reply arrives to this request
@ -1199,7 +1220,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
getHeaders := func(from uint64) { getHeaders := func(from uint64) {
request = time.Now() request = time.Now()
timeout.Reset(headerTTL) timeout.Reset(d.requestTTL())
if skeleton { if skeleton {
glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from) glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from)
@ -1311,13 +1332,13 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
pack := packet.(*headerPack) pack := packet.(*headerPack)
return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh) return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh)
} }
expire = func() map[string]int { return d.queue.ExpireHeaders(headerTTL) } expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
throttle = func() bool { return false } throttle = func() bool { return false }
reserve = func(p *peer, count int) (*fetchRequest, bool, error) { reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveHeaders(p, count), false, nil return d.queue.ReserveHeaders(p, count), false, nil
} }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
capacity = func(p *peer) int { return p.HeaderCapacity() } capacity = func(p *peer) int { return p.HeaderCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) } setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) }
) )
err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire, err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
@ -1341,9 +1362,9 @@ func (d *Downloader) fetchBodies(from uint64) error {
pack := packet.(*bodyPack) pack := packet.(*bodyPack)
return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles) return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
} }
expire = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) } expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
capacity = func(p *peer) int { return p.BlockCapacity() } capacity = func(p *peer) int { return p.BlockCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) } setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
) )
err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
@ -1365,9 +1386,9 @@ func (d *Downloader) fetchReceipts(from uint64) error {
pack := packet.(*receiptPack) pack := packet.(*receiptPack)
return d.queue.DeliverReceipts(pack.peerId, pack.receipts) return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
} }
expire = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) } expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
capacity = func(p *peer) int { return p.ReceiptCapacity() } capacity = func(p *peer) int { return p.ReceiptCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) } setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
) )
err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
@ -1417,13 +1438,13 @@ func (d *Downloader) fetchNodeData() error {
} }
}) })
} }
expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) } expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) }
throttle = func() bool { return false } throttle = func() bool { return false }
reserve = func(p *peer, count int) (*fetchRequest, bool, error) { reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveNodeData(p, count), false, nil return d.queue.ReserveNodeData(p, count), false, nil
} }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
capacity = func(p *peer) int { return p.NodeDataCapacity() } capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) } setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
) )
err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire, err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
@ -1799,8 +1820,10 @@ func (d *Downloader) processContent() error {
} }
for len(results) != 0 { for len(results) != 0 {
// Check for any termination requests // Check for any termination requests
if atomic.LoadInt32(&d.interrupt) == 1 { select {
case <-d.quitCh:
return errCancelContentProcessing return errCancelContentProcessing
default:
} }
// Retrieve the a batch of results to import // Retrieve the a batch of results to import
var ( var (
@ -1901,3 +1924,74 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i
return errNoSyncActive return errNoSyncActive
} }
} }
// qosTuner is the quality of service tuning loop that occasionally gathers the
// peer latency statistics and updates the estimated request round trip time.
func (d *Downloader) qosTuner() {
for {
// Retrieve the current median RTT and integrate into the previoust target RTT
rtt := time.Duration(float64(1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
atomic.StoreUint64(&d.rttEstimate, uint64(rtt))
// A new RTT cycle passed, increase our confidence in the estimated RTT
conf := atomic.LoadUint64(&d.rttConfidence)
conf = conf + (1000000-conf)/2
atomic.StoreUint64(&d.rttConfidence, conf)
// Log the new QoS values and sleep until the next RTT
glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())
select {
case <-d.quitCh:
return
case <-time.After(rtt):
}
}
}
// qosReduceConfidence is meant to be called when a new peer joins the downloader's
// peer set, needing to reduce the confidence we have in out QoS estimates.
func (d *Downloader) qosReduceConfidence() {
// If we have a single peer, confidence is always 1
peers := uint64(d.peers.Len())
if peers == 1 {
atomic.StoreUint64(&d.rttConfidence, 1000000)
return
}
// If we have a ton of peers, don't drop confidence)
if peers >= uint64(qosConfidenceCap) {
return
}
// Otherwise drop the confidence factor
conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers
if float64(conf)/1000000 < rttMinConfidence {
conf = uint64(rttMinConfidence * 1000000)
}
atomic.StoreUint64(&d.rttConfidence, conf)
rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate))
glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())
}
// requestRTT returns the current target round trip time for a download request
// to complete in.
//
// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
// the downloader tries to adapt queries to the RTT, so multiple RTT values can
// be adapted to, but smaller ones are preffered (stabler download stream).
func (d *Downloader) requestRTT() time.Duration {
return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10
}
// requestTTL returns the current timeout allowance for a single download request
// to finish under.
func (d *Downloader) requestTTL() time.Duration {
var (
rtt = time.Duration(atomic.LoadUint64(&d.rttEstimate))
conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0
)
ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf)
if ttl > ttlLimit {
ttl = ttlLimit
}
return ttl
}

@ -179,6 +179,12 @@ func newTester() *downloadTester {
return tester return tester
} }
// terminate aborts any operations on the embedded downloader and releases all
// held resources.
func (dl *downloadTester) terminate() {
dl.downloader.Terminate()
}
// sync starts synchronizing with a remote peer, blocking until it completes. // sync starts synchronizing with a remote peer, blocking until it completes.
func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error { func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
dl.lock.RLock() dl.lock.RLock()
@ -740,6 +746,8 @@ func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester() tester := newTester()
defer tester.terminate()
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Synchronise with the peer and make sure all relevant data was retrieved // Synchronise with the peer and make sure all relevant data was retrieved
@ -764,6 +772,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester() tester := newTester()
defer tester.terminate()
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Wrap the importer to allow stepping // Wrap the importer to allow stepping
@ -851,6 +861,8 @@ func testForkedSync(t *testing.T, protocol int, mode SyncMode) {
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, true) hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, true)
tester := newTester() tester := newTester()
defer tester.terminate()
tester.newPeer("fork A", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("fork A", protocol, hashesA, headersA, blocksA, receiptsA)
tester.newPeer("fork B", protocol, hashesB, headersB, blocksB, receiptsB) tester.newPeer("fork B", protocol, hashesB, headersB, blocksB, receiptsB)
@ -885,6 +897,8 @@ func testHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, false) hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, false)
tester := newTester() tester := newTester()
defer tester.terminate()
tester.newPeer("light", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("light", protocol, hashesA, headersA, blocksA, receiptsA)
tester.newPeer("heavy", protocol, hashesB[fork/2:], headersB, blocksB, receiptsB) tester.newPeer("heavy", protocol, hashesB[fork/2:], headersB, blocksB, receiptsB)
@ -934,6 +948,8 @@ func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) {
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, true) hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, true)
tester := newTester() tester := newTester()
defer tester.terminate()
tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA)
tester.newPeer("rewriter", protocol, hashesB, headersB, blocksB, receiptsB) tester.newPeer("rewriter", protocol, hashesB, headersB, blocksB, receiptsB)
@ -968,6 +984,8 @@ func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, false) hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, false)
tester := newTester() tester := newTester()
defer tester.terminate()
tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA)
tester.newPeer("heavy-rewriter", protocol, hashesB[MaxForkAncestry-17:], headersB, blocksB, receiptsB) // Root the fork below the ancestor limit tester.newPeer("heavy-rewriter", protocol, hashesB[MaxForkAncestry-17:], headersB, blocksB, receiptsB) // Root the fork below the ancestor limit
@ -987,7 +1005,9 @@ func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
// bodies. // bodies.
func TestInactiveDownloader62(t *testing.T) { func TestInactiveDownloader62(t *testing.T) {
t.Parallel() t.Parallel()
tester := newTester() tester := newTester()
defer tester.terminate()
// Check that neither block headers nor bodies are accepted // Check that neither block headers nor bodies are accepted
if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive { if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive {
@ -1002,7 +1022,9 @@ func TestInactiveDownloader62(t *testing.T) {
// bodies and receipts. // bodies and receipts.
func TestInactiveDownloader63(t *testing.T) { func TestInactiveDownloader63(t *testing.T) {
t.Parallel() t.Parallel()
tester := newTester() tester := newTester()
defer tester.terminate()
// Check that neither block headers nor bodies are accepted // Check that neither block headers nor bodies are accepted
if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive { if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive {
@ -1039,6 +1061,8 @@ func testCancel(t *testing.T, protocol int, mode SyncMode) {
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester() tester := newTester()
defer tester.terminate()
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Make sure canceling works with a pristine downloader // Make sure canceling works with a pristine downloader
@ -1074,6 +1098,8 @@ func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester() tester := newTester()
defer tester.terminate()
for i := 0; i < targetPeers; i++ { for i := 0; i < targetPeers; i++ {
id := fmt.Sprintf("peer #%d", i) id := fmt.Sprintf("peer #%d", i)
tester.newPeer(id, protocol, hashes[i*blockCacheLimit:], headers, blocks, receipts) tester.newPeer(id, protocol, hashes[i*blockCacheLimit:], headers, blocks, receipts)
@ -1103,6 +1129,8 @@ func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
// Create peers of every type // Create peers of every type
tester := newTester() tester := newTester()
defer tester.terminate()
tester.newPeer("peer 61", 61, hashes, nil, blocks, nil) tester.newPeer("peer 61", 61, hashes, nil, blocks, nil)
tester.newPeer("peer 62", 62, hashes, headers, blocks, nil) tester.newPeer("peer 62", 62, hashes, headers, blocks, nil)
tester.newPeer("peer 63", 63, hashes, headers, blocks, receipts) tester.newPeer("peer 63", 63, hashes, headers, blocks, receipts)
@ -1140,6 +1168,8 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester() tester := newTester()
defer tester.terminate()
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Instrument the downloader to signal body requests // Instrument the downloader to signal body requests
@ -1193,6 +1223,7 @@ func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester() tester := newTester()
defer tester.terminate()
// Attempt a full sync with an attacker feeding gapped headers // Attempt a full sync with an attacker feeding gapped headers
tester.newPeer("attack", protocol, hashes, headers, blocks, receipts) tester.newPeer("attack", protocol, hashes, headers, blocks, receipts)
@ -1225,6 +1256,7 @@ func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester() tester := newTester()
defer tester.terminate()
// Attempt a full sync with an attacker feeding shifted headers // Attempt a full sync with an attacker feeding shifted headers
tester.newPeer("attack", protocol, hashes, headers, blocks, receipts) tester.newPeer("attack", protocol, hashes, headers, blocks, receipts)
@ -1256,6 +1288,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester() tester := newTester()
defer tester.terminate()
// Attempt to sync with an attacker that feeds junk during the fast sync phase. // Attempt to sync with an attacker that feeds junk during the fast sync phase.
// This should result in the last fsHeaderSafetyNet headers being rolled back. // This should result in the last fsHeaderSafetyNet headers being rolled back.
@ -1347,9 +1380,11 @@ func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
t.Parallel() t.Parallel()
tester := newTester() tester := newTester()
hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil, false) defer tester.terminate()
hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil, false)
tester.newPeer("attack", protocol, []common.Hash{hashes[0]}, headers, blocks, receipts) tester.newPeer("attack", protocol, []common.Hash{hashes[0]}, headers, blocks, receipts)
if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer { if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer {
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
} }
@ -1392,6 +1427,8 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
} }
// Run the tests and check disconnection status // Run the tests and check disconnection status
tester := newTester() tester := newTester()
defer tester.terminate()
for i, tt := range tests { for i, tt := range tests {
// Register a new peer and ensure it's presence // Register a new peer and ensure it's presence
id := fmt.Sprintf("test %d", i) id := fmt.Sprintf("test %d", i)
@ -1433,6 +1470,8 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
progress := make(chan struct{}) progress := make(chan struct{})
tester := newTester() tester := newTester()
defer tester.terminate()
tester.downloader.syncInitHook = func(origin, latest uint64) { tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{} starting <- struct{}{}
<-progress <-progress
@ -1505,6 +1544,8 @@ func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
progress := make(chan struct{}) progress := make(chan struct{})
tester := newTester() tester := newTester()
defer tester.terminate()
tester.downloader.syncInitHook = func(origin, latest uint64) { tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{} starting <- struct{}{}
<-progress <-progress
@ -1580,6 +1621,8 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
progress := make(chan struct{}) progress := make(chan struct{})
tester := newTester() tester := newTester()
defer tester.terminate()
tester.downloader.syncInitHook = func(origin, latest uint64) { tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{} starting <- struct{}{}
<-progress <-progress
@ -1656,6 +1699,8 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
progress := make(chan struct{}) progress := make(chan struct{})
tester := newTester() tester := newTester()
defer tester.terminate()
tester.downloader.syncInitHook = func(origin, latest uint64) { tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{} starting <- struct{}{}
<-progress <-progress
@ -1742,7 +1787,7 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
impl := tester.peerGetAbsHeadersFn("peer", 0) impl := tester.peerGetAbsHeadersFn("peer", 0)
go impl(from, count, skip, reverse) go impl(from, count, skip, reverse)
// None of the extra deliveries should block. // None of the extra deliveries should block.
timeout := time.After(5 * time.Second) timeout := time.After(15 * time.Second)
for i := 0; i < cap(deliveriesDone); i++ { for i := 0; i < cap(deliveriesDone); i++ {
select { select {
case <-deliveriesDone: case <-deliveriesDone:
@ -1755,6 +1800,7 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
if err := tester.sync("peer", nil, mode); err != nil { if err := tester.sync("peer", nil, mode); err != nil {
t.Errorf("sync failed: %v", err) t.Errorf("sync failed: %v", err)
} }
tester.terminate()
} }
} }
@ -1772,8 +1818,9 @@ func testFastCriticalRestarts(t *testing.T, protocol int) {
// Create a tester peer with the critical section state roots missing (force failures) // Create a tester peer with the critical section state roots missing (force failures)
tester := newTester() tester := newTester()
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) defer tester.terminate()
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
for i := 0; i < fsPivotInterval; i++ { for i := 0; i < fsPivotInterval; i++ {
tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true
} }
@ -1783,11 +1830,14 @@ func testFastCriticalRestarts(t *testing.T, protocol int) {
if err := tester.sync("peer", nil, FastSync); err == nil { if err := tester.sync("peer", nil, FastSync); err == nil {
t.Fatalf("failing fast sync succeeded: %v", err) t.Fatalf("failing fast sync succeeded: %v", err)
} }
time.Sleep(500 * time.Millisecond) // Make sure no in-flight requests remain
// If it's the first failure, pivot should be locked => reenable all others to detect pivot changes // If it's the first failure, pivot should be locked => reenable all others to detect pivot changes
if i == 0 { if i == 0 {
tester.lock.Lock()
tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true} tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true}
tester.lock.Unlock()
} }
time.Sleep(100 * time.Millisecond) // Make sure no in-flight requests remain
} }
// Retry limit exhausted, downloader will switch to full sync, should succeed // Retry limit exhausted, downloader will switch to full sync, should succeed
if err := tester.sync("peer", nil, FastSync); err != nil { if err := tester.sync("peer", nil, FastSync); err != nil {

@ -23,6 +23,8 @@ import (
"errors" "errors"
"fmt" "fmt"
"math" "math"
"sort"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -32,7 +34,7 @@ import (
const ( const (
maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items
throughputImpact = 0.1 // The impact a single measurement has on a peer's final throughput value. measurementImpact = 0.1 // The impact a single measurement has on a peer's final throughput value.
) )
// Hash and block fetchers belonging to eth/61 and below // Hash and block fetchers belonging to eth/61 and below
@ -68,6 +70,8 @@ type peer struct {
receiptThroughput float64 // Number of receipts measured to be retrievable per second receiptThroughput float64 // Number of receipts measured to be retrievable per second
stateThroughput float64 // Number of node data pieces measured to be retrievable per second stateThroughput float64 // Number of node data pieces measured to be retrievable per second
rtt time.Duration // Request round trip time to track responsiveness (QoS)
headerStarted time.Time // Time instance when the last header fetch was started headerStarted time.Time // Time instance when the last header fetch was started
blockStarted time.Time // Time instance when the last block (body) fetch was started blockStarted time.Time // Time instance when the last block (body) fetch was started
receiptStarted time.Time // Time instance when the last receipt fetch was started receiptStarted time.Time // Time instance when the last receipt fetch was started
@ -290,44 +294,47 @@ func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, id
return return
} }
// Otherwise update the throughput with a new measurement // Otherwise update the throughput with a new measurement
measured := float64(delivered) / (float64(time.Since(started)+1) / float64(time.Second)) // +1 (ns) to ensure non-zero divisor elapsed := time.Since(started) + 1 // +1 (ns) to ensure non-zero divisor
*throughput = (1-throughputImpact)*(*throughput) + throughputImpact*measured measured := float64(delivered) / (float64(elapsed) / float64(time.Second))
*throughput = (1-measurementImpact)*(*throughput) + measurementImpact*measured
p.rtt = time.Duration((1-measurementImpact)*float64(p.rtt) + measurementImpact*float64(elapsed))
} }
// HeaderCapacity retrieves the peers header download allowance based on its // HeaderCapacity retrieves the peers header download allowance based on its
// previously discovered throughput. // previously discovered throughput.
func (p *peer) HeaderCapacity() int { func (p *peer) HeaderCapacity(targetRTT time.Duration) int {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
return int(math.Max(1, math.Min(p.headerThroughput*float64(headerTargetRTT)/float64(time.Second), float64(MaxHeaderFetch)))) return int(math.Min(1+math.Max(1, p.headerThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxHeaderFetch)))
} }
// BlockCapacity retrieves the peers block download allowance based on its // BlockCapacity retrieves the peers block download allowance based on its
// previously discovered throughput. // previously discovered throughput.
func (p *peer) BlockCapacity() int { func (p *peer) BlockCapacity(targetRTT time.Duration) int {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
return int(math.Max(1, math.Min(p.blockThroughput*float64(blockTargetRTT)/float64(time.Second), float64(MaxBlockFetch)))) return int(math.Min(1+math.Max(1, p.blockThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxBlockFetch)))
} }
// ReceiptCapacity retrieves the peers receipt download allowance based on its // ReceiptCapacity retrieves the peers receipt download allowance based on its
// previously discovered throughput. // previously discovered throughput.
func (p *peer) ReceiptCapacity() int { func (p *peer) ReceiptCapacity(targetRTT time.Duration) int {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
return int(math.Max(1, math.Min(p.receiptThroughput*float64(receiptTargetRTT)/float64(time.Second), float64(MaxReceiptFetch)))) return int(math.Min(1+math.Max(1, p.receiptThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxReceiptFetch)))
} }
// NodeDataCapacity retrieves the peers state download allowance based on its // NodeDataCapacity retrieves the peers state download allowance based on its
// previously discovered throughput. // previously discovered throughput.
func (p *peer) NodeDataCapacity() int { func (p *peer) NodeDataCapacity(targetRTT time.Duration) int {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
return int(math.Max(1, math.Min(p.stateThroughput*float64(stateTargetRTT)/float64(time.Second), float64(MaxStateFetch)))) return int(math.Min(1+math.Max(1, p.stateThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxStateFetch)))
} }
// MarkLacking appends a new entity to the set of items (blocks, receipts, states) // MarkLacking appends a new entity to the set of items (blocks, receipts, states)
@ -361,13 +368,14 @@ func (p *peer) String() string {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
return fmt.Sprintf("Peer %s [%s]", p.id, return fmt.Sprintf("Peer %s [%s]", p.id, strings.Join([]string{
fmt.Sprintf("headers %3.2f/s, ", p.headerThroughput)+ fmt.Sprintf("hs %3.2f/s", p.headerThroughput),
fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+ fmt.Sprintf("bs %3.2f/s", p.blockThroughput),
fmt.Sprintf("receipts %3.2f/s, ", p.receiptThroughput)+ fmt.Sprintf("rs %3.2f/s", p.receiptThroughput),
fmt.Sprintf("states %3.2f/s, ", p.stateThroughput)+ fmt.Sprintf("ss %3.2f/s", p.stateThroughput),
fmt.Sprintf("lacking %4d", len(p.lacking)), fmt.Sprintf("miss %4d", len(p.lacking)),
) fmt.Sprintf("rtt %v", p.rtt),
}, ", "))
} }
// peerSet represents the collection of active peer participating in the chain // peerSet represents the collection of active peer participating in the chain
@ -402,6 +410,10 @@ func (ps *peerSet) Reset() {
// average of all existing peers, to give it a realistic chance of being used // average of all existing peers, to give it a realistic chance of being used
// for data retrievals. // for data retrievals.
func (ps *peerSet) Register(p *peer) error { func (ps *peerSet) Register(p *peer) error {
// Retrieve the current median RTT as a sane default
p.rtt = ps.medianRTT()
// Register the new peer with some meaningful defaults
ps.lock.Lock() ps.lock.Lock()
defer ps.lock.Unlock() defer ps.lock.Unlock()
@ -564,3 +576,34 @@ func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer)
} }
return idle, total return idle, total
} }
// medianRTT returns the median RTT of te peerset, considering only the tuning
// peers if there are more peers available.
func (ps *peerSet) medianRTT() time.Duration {
// Gather all the currnetly measured round trip times
ps.lock.RLock()
defer ps.lock.RUnlock()
rtts := make([]float64, 0, len(ps.peers))
for _, p := range ps.peers {
p.lock.RLock()
rtts = append(rtts, float64(p.rtt))
p.lock.RUnlock()
}
sort.Float64s(rtts)
median := rttMaxEstimate
if qosTuningPeers <= len(rtts) {
median = time.Duration(rtts[qosTuningPeers/2]) // Median of our tuning peers
} else if len(rtts) > 0 {
median = time.Duration(rtts[len(rtts)/2]) // Median of our connected peers (maintain even like this some baseline qos)
}
// Restrict the RTT into some QoS defaults, irrelevant of true RTT
if median < rttMinEstimate {
median = rttMinEstimate
}
if median > rttMaxEstimate {
median = rttMaxEstimate
}
return median
}

Loading…
Cancel
Save