diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index b9c7716941..066a366315 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -21,7 +21,6 @@ package downloader import ( "errors" - "math" "math/big" "sort" "sync" @@ -232,7 +231,7 @@ func (p *peerConnection) SetNodeDataIdle(delivered int, deliveryTime time.Time) // HeaderCapacity retrieves the peers header download allowance based on its // previously discovered throughput. func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int { - cap := int(math.Ceil(p.rates.Capacity(eth.BlockHeadersMsg, targetRTT))) + cap := p.rates.Capacity(eth.BlockHeadersMsg, targetRTT) if cap > MaxHeaderFetch { cap = MaxHeaderFetch } @@ -242,7 +241,7 @@ func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int { // BlockCapacity retrieves the peers block download allowance based on its // previously discovered throughput. func (p *peerConnection) BlockCapacity(targetRTT time.Duration) int { - cap := int(math.Ceil(p.rates.Capacity(eth.BlockBodiesMsg, targetRTT))) + cap := p.rates.Capacity(eth.BlockBodiesMsg, targetRTT) if cap > MaxBlockFetch { cap = MaxBlockFetch } @@ -252,7 +251,7 @@ func (p *peerConnection) BlockCapacity(targetRTT time.Duration) int { // ReceiptCapacity retrieves the peers receipt download allowance based on its // previously discovered throughput. func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int { - cap := int(math.Ceil(p.rates.Capacity(eth.ReceiptsMsg, targetRTT))) + cap := p.rates.Capacity(eth.ReceiptsMsg, targetRTT) if cap > MaxReceiptFetch { cap = MaxReceiptFetch } @@ -262,7 +261,7 @@ func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int { // NodeDataCapacity retrieves the peers state download allowance based on its // previously discovered throughput. func (p *peerConnection) NodeDataCapacity(targetRTT time.Duration) int { - cap := int(math.Ceil(p.rates.Capacity(eth.NodeDataMsg, targetRTT))) + cap := p.rates.Capacity(eth.NodeDataMsg, targetRTT) if cap > MaxStateFetch { cap = MaxStateFetch } @@ -411,7 +410,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) { idle := func(p *peerConnection) bool { return atomic.LoadInt32(&p.headerIdle) == 0 } - throughput := func(p *peerConnection) float64 { + throughput := func(p *peerConnection) int { return p.rates.Capacity(eth.BlockHeadersMsg, time.Second) } return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) @@ -423,7 +422,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) { idle := func(p *peerConnection) bool { return atomic.LoadInt32(&p.blockIdle) == 0 } - throughput := func(p *peerConnection) float64 { + throughput := func(p *peerConnection) int { return p.rates.Capacity(eth.BlockBodiesMsg, time.Second) } return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) @@ -435,7 +434,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) { idle := func(p *peerConnection) bool { return atomic.LoadInt32(&p.receiptIdle) == 0 } - throughput := func(p *peerConnection) float64 { + throughput := func(p *peerConnection) int { return p.rates.Capacity(eth.ReceiptsMsg, time.Second) } return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) @@ -447,7 +446,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) { idle := func(p *peerConnection) bool { return atomic.LoadInt32(&p.stateIdle) == 0 } - throughput := func(p *peerConnection) float64 { + throughput := func(p *peerConnection) int { return p.rates.Capacity(eth.NodeDataMsg, time.Second) } return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) @@ -455,45 +454,48 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) { // idlePeers retrieves a flat list of all currently idle peers satisfying the // protocol version constraints, using the provided function to check idleness. -// The resulting set of peers are sorted by their measure throughput. -func (ps *peerSet) idlePeers(minProtocol, maxProtocol uint, idleCheck func(*peerConnection) bool, throughput func(*peerConnection) float64) ([]*peerConnection, int) { +// The resulting set of peers are sorted by their capacity. +func (ps *peerSet) idlePeers(minProtocol, maxProtocol uint, idleCheck func(*peerConnection) bool, capacity func(*peerConnection) int) ([]*peerConnection, int) { ps.lock.RLock() defer ps.lock.RUnlock() - idle, total := make([]*peerConnection, 0, len(ps.peers)), 0 - tps := make([]float64, 0, len(ps.peers)) + var ( + total = 0 + idle = make([]*peerConnection, 0, len(ps.peers)) + tps = make([]int, 0, len(ps.peers)) + ) for _, p := range ps.peers { if p.version >= minProtocol && p.version <= maxProtocol { if idleCheck(p) { idle = append(idle, p) - tps = append(tps, throughput(p)) + tps = append(tps, capacity(p)) } total++ } } + // And sort them - sortPeers := &peerThroughputSort{idle, tps} + sortPeers := &peerCapacitySort{idle, tps} sort.Sort(sortPeers) return sortPeers.p, total } -// peerThroughputSort implements the Sort interface, and allows for -// sorting a set of peers by their throughput -// The sorted data is with the _highest_ throughput first -type peerThroughputSort struct { +// peerCapacitySort implements sort.Interface. +// It sorts peer connections by capacity (descending). +type peerCapacitySort struct { p []*peerConnection - tp []float64 + tp []int } -func (ps *peerThroughputSort) Len() int { +func (ps *peerCapacitySort) Len() int { return len(ps.p) } -func (ps *peerThroughputSort) Less(i, j int) bool { +func (ps *peerCapacitySort) Less(i, j int) bool { return ps.tp[i] > ps.tp[j] } -func (ps *peerThroughputSort) Swap(i, j int) { +func (ps *peerCapacitySort) Swap(i, j int) { ps.p[i], ps.p[j] = ps.p[j], ps.p[i] ps.tp[i], ps.tp[j] = ps.tp[j], ps.tp[i] } diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index c57fcd71f6..646df03887 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -861,7 +861,7 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac // Sort the peers by download capacity to use faster ones if many available idlers := &capacitySort{ ids: make([]string, 0, len(s.accountIdlers)), - caps: make([]float64, 0, len(s.accountIdlers)), + caps: make([]int, 0, len(s.accountIdlers)), } targetTTL := s.rates.TargetTimeout() for id := range s.accountIdlers { @@ -958,7 +958,7 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan * // Sort the peers by download capacity to use faster ones if many available idlers := &capacitySort{ ids: make([]string, 0, len(s.bytecodeIdlers)), - caps: make([]float64, 0, len(s.bytecodeIdlers)), + caps: make([]int, 0, len(s.bytecodeIdlers)), } targetTTL := s.rates.TargetTimeout() for id := range s.bytecodeIdlers { @@ -1012,11 +1012,11 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan * if cap > maxCodeRequestCount { cap = maxCodeRequestCount } - hashes := make([]common.Hash, 0, int(cap)) + hashes := make([]common.Hash, 0, cap) for hash := range task.codeTasks { delete(task.codeTasks, hash) hashes = append(hashes, hash) - if len(hashes) >= int(cap) { + if len(hashes) >= cap { break } } @@ -1061,7 +1061,7 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st // Sort the peers by download capacity to use faster ones if many available idlers := &capacitySort{ ids: make([]string, 0, len(s.storageIdlers)), - caps: make([]float64, 0, len(s.storageIdlers)), + caps: make([]int, 0, len(s.storageIdlers)), } targetTTL := s.rates.TargetTimeout() for id := range s.storageIdlers { @@ -1120,7 +1120,7 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st if cap < minRequestSize { // Don't bother with peers below a bare minimum performance cap = minRequestSize } - storageSets := int(cap / 1024) + storageSets := cap / 1024 var ( accounts = make([]common.Hash, 0, storageSets) @@ -1217,7 +1217,7 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai // Sort the peers by download capacity to use faster ones if many available idlers := &capacitySort{ ids: make([]string, 0, len(s.trienodeHealIdlers)), - caps: make([]float64, 0, len(s.trienodeHealIdlers)), + caps: make([]int, 0, len(s.trienodeHealIdlers)), } targetTTL := s.rates.TargetTimeout() for id := range s.trienodeHealIdlers { @@ -1284,9 +1284,9 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai cap = maxTrieRequestCount } var ( - hashes = make([]common.Hash, 0, int(cap)) - paths = make([]trie.SyncPath, 0, int(cap)) - pathsets = make([]TrieNodePathSet, 0, int(cap)) + hashes = make([]common.Hash, 0, cap) + paths = make([]trie.SyncPath, 0, cap) + pathsets = make([]TrieNodePathSet, 0, cap) ) for hash, pathset := range s.healer.trieTasks { delete(s.healer.trieTasks, hash) @@ -1295,7 +1295,7 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai paths = append(paths, pathset) pathsets = append(pathsets, [][]byte(pathset)) // TODO(karalabe): group requests by account hash - if len(hashes) >= int(cap) { + if len(hashes) >= cap { break } } @@ -1341,7 +1341,7 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai // Sort the peers by download capacity to use faster ones if many available idlers := &capacitySort{ ids: make([]string, 0, len(s.bytecodeHealIdlers)), - caps: make([]float64, 0, len(s.bytecodeHealIdlers)), + caps: make([]int, 0, len(s.bytecodeHealIdlers)), } targetTTL := s.rates.TargetTimeout() for id := range s.bytecodeHealIdlers { @@ -1407,12 +1407,12 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai if cap > maxCodeRequestCount { cap = maxCodeRequestCount } - hashes := make([]common.Hash, 0, int(cap)) + hashes := make([]common.Hash, 0, cap) for hash := range s.healer.codeTasks { delete(s.healer.codeTasks, hash) hashes = append(hashes, hash) - if len(hashes) >= int(cap) { + if len(hashes) >= cap { break } } @@ -2852,7 +2852,7 @@ func estimateRemainingSlots(hashes int, last common.Hash) (uint64, error) { // of highest capacity being at the front. type capacitySort struct { ids []string - caps []float64 + caps []int } func (s *capacitySort) Len() int { diff --git a/p2p/msgrate/msgrate.go b/p2p/msgrate/msgrate.go index 7cd172c566..5bfa27b433 100644 --- a/p2p/msgrate/msgrate.go +++ b/p2p/msgrate/msgrate.go @@ -19,6 +19,7 @@ package msgrate import ( "errors" + "math" "sort" "sync" "time" @@ -162,7 +163,7 @@ func NewTracker(caps map[uint64]float64, rtt time.Duration) *Tracker { // the load proportionally to the requested items, so fetching a bit more might // still take the same RTT. By forcefully overshooting by a small amount, we can // avoid locking into a lower-that-real capacity. -func (t *Tracker) Capacity(kind uint64, targetRTT time.Duration) float64 { +func (t *Tracker) Capacity(kind uint64, targetRTT time.Duration) int { t.lock.RLock() defer t.lock.RUnlock() @@ -171,7 +172,14 @@ func (t *Tracker) Capacity(kind uint64, targetRTT time.Duration) float64 { // Return an overestimation to force the peer out of a stuck minima, adding // +1 in case the item count is too low for the overestimator to dent - return 1 + capacityOverestimation*throughput + return roundCapacity(1 + capacityOverestimation*throughput) +} + +// roundCapacity gives the integer value of a capacity. +// The result fits int32, and is guaranteed to be positive. +func roundCapacity(cap float64) int { + const maxInt32 = float64(1<<31 - 1) + return int(math.Min(maxInt32, math.Max(1, math.Ceil(cap)))) } // Update modifies the peer's capacity values for a specific data type with a new @@ -435,7 +443,7 @@ func (t *Trackers) detune() { // Capacity is a helper function to access a specific tracker without having to // track it explicitly outside. -func (t *Trackers) Capacity(id string, kind uint64, targetRTT time.Duration) float64 { +func (t *Trackers) Capacity(id string, kind uint64, targetRTT time.Duration) int { t.lock.RLock() defer t.lock.RUnlock() diff --git a/p2p/msgrate/msgrate_test.go b/p2p/msgrate/msgrate_test.go new file mode 100644 index 0000000000..a5c8dd0518 --- /dev/null +++ b/p2p/msgrate/msgrate_test.go @@ -0,0 +1,28 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package msgrate + +import "testing" + +func TestCapacityOverflow(t *testing.T) { + tracker := NewTracker(nil, 1) + tracker.Update(1, 1, 100000) + cap := tracker.Capacity(1, 10000000) + if int32(cap) < 0 { + t.Fatalf("Negative: %v", int32(cap)) + } +}