eth/protocols/snap: use saner limits, sort idle peers by cap

msgrater
Péter Szilágyi 4 years ago
parent 5476b43f7e
commit 2e2c72891a
No known key found for this signature in database
GPG Key ID: E9AE538CEDF8293D
  1. 236
      eth/protocols/snap/sync.go

@ -55,12 +55,12 @@ const (
// minRequestSize is the minimum number of bytes to request from a remote peer.
// This number is used as the low cap for account and storage range requests.
// Bytecode and trienode are limited inherently by item count (1).
minRequestSize = 16 * 1024
minRequestSize = 64 * 1024
// maxRequestSize is the maximum number of bytes to request from a remote peer.
// This number is used as the high cap for account and storage range requests.
// Bytecode and trienode are limited more explicitly by the caps below.
maxRequestSize = 2 * 1024 * 1024
maxRequestSize = 512 * 1024
// maxCodeRequestCount is the maximum number of bytecode blobs to request in a
// single query. If this number is too low, we're not filling responses fully
@ -858,10 +858,24 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac
s.lock.Lock()
defer s.lock.Unlock()
// If there are no idle peers, short circuit assignment
if len(s.accountIdlers) == 0 {
// Sort the peers by download capacity to use faster ones if many available
idlers := &capacitySort{
ids: make([]string, 0, len(s.accountIdlers)),
caps: make([]float64, 0, len(s.accountIdlers)),
}
targetTTL := s.rates.TargetTimeout()
for id := range s.accountIdlers {
if _, ok := s.statelessPeers[id]; ok {
continue
}
idlers.ids = append(idlers.ids, id)
idlers.caps = append(idlers.caps, s.rates.Capacity(id, AccountRangeMsg, targetTTL))
}
if len(idlers.ids) == 0 {
return
}
sort.Sort(idlers)
// Iterate over all the tasks and try to find a pending one
for _, task := range s.tasks {
// Skip any tasks already filling
@ -871,20 +885,15 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac
// Task pending retrieval, try to find an idle peer. If no such peer
// exists, we probably assigned tasks for all (or they are stateless).
// Abort the entire assignment mechanism.
var idle string
for id := range s.accountIdlers {
// If the peer rejected a query in this sync cycle, don't bother asking
// again for anything, it's either out of sync or already pruned
if _, ok := s.statelessPeers[id]; ok {
continue
}
idle = id
break
}
if idle == "" {
if len(idlers.ids) == 0 {
return
}
peer := s.peers[idle]
var (
idle = idlers.ids[0]
peer = s.peers[idle]
cap = idlers.caps[0]
)
idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:]
// Matched a pending task to an idle peer, allocate a unique request id
var reqid uint64
@ -924,14 +933,13 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac
defer s.pend.Done()
// Attempt to send the remote request and revert if it fails
cap := uint64(s.rates.Capacity(idle, AccountRangeMsg, s.rates.TargetRoundTrip()))
if cap > maxRequestSize {
cap = maxRequestSize
}
if cap < minRequestSize { // Don't bother with peers below a bare minimum performance
cap = minRequestSize
}
if err := peer.RequestAccountRange(reqid, root, req.origin, req.limit, cap); err != nil {
if err := peer.RequestAccountRange(reqid, root, req.origin, req.limit, uint64(cap)); err != nil {
peer.Log().Debug("Failed to request account range", "err", err)
s.scheduleRevertAccountRequest(req)
}
@ -947,10 +955,24 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan *
s.lock.Lock()
defer s.lock.Unlock()
// If there are no idle peers, short circuit assignment
if len(s.bytecodeIdlers) == 0 {
// Sort the peers by download capacity to use faster ones if many available
idlers := &capacitySort{
ids: make([]string, 0, len(s.bytecodeIdlers)),
caps: make([]float64, 0, len(s.bytecodeIdlers)),
}
targetTTL := s.rates.TargetTimeout()
for id := range s.bytecodeIdlers {
if _, ok := s.statelessPeers[id]; ok {
continue
}
idlers.ids = append(idlers.ids, id)
idlers.caps = append(idlers.caps, s.rates.Capacity(id, ByteCodesMsg, targetTTL))
}
if len(idlers.ids) == 0 {
return
}
sort.Sort(idlers)
// Iterate over all the tasks and try to find a pending one
for _, task := range s.tasks {
// Skip any tasks not in the bytecode retrieval phase
@ -964,20 +986,15 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan *
// Task pending retrieval, try to find an idle peer. If no such peer
// exists, we probably assigned tasks for all (or they are stateless).
// Abort the entire assignment mechanism.
var idle string
for id := range s.bytecodeIdlers {
// If the peer rejected a query in this sync cycle, don't bother asking
// again for anything, it's either out of sync or already pruned
if _, ok := s.statelessPeers[id]; ok {
continue
}
idle = id
break
}
if idle == "" {
if len(idlers.ids) == 0 {
return
}
peer := s.peers[idle]
var (
idle = idlers.ids[0]
peer = s.peers[idle]
cap = idlers.caps[0]
)
idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:]
// Matched a pending task to an idle peer, allocate a unique request id
var reqid uint64
@ -992,15 +1009,14 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan *
break
}
// Generate the network query and send it to the peer
cap := int(s.rates.Capacity(idle, ByteCodesMsg, s.rates.TargetRoundTrip()))
if cap > maxCodeRequestCount {
cap = maxCodeRequestCount
}
hashes := make([]common.Hash, 0, cap)
hashes := make([]common.Hash, 0, int(cap))
for hash := range task.codeTasks {
delete(task.codeTasks, hash)
hashes = append(hashes, hash)
if len(hashes) >= cap {
if len(hashes) >= int(cap) {
break
}
}
@ -1042,10 +1058,24 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
s.lock.Lock()
defer s.lock.Unlock()
// If there are no idle peers, short circuit assignment
if len(s.storageIdlers) == 0 {
// Sort the peers by download capacity to use faster ones if many available
idlers := &capacitySort{
ids: make([]string, 0, len(s.storageIdlers)),
caps: make([]float64, 0, len(s.storageIdlers)),
}
targetTTL := s.rates.TargetTimeout()
for id := range s.storageIdlers {
if _, ok := s.statelessPeers[id]; ok {
continue
}
idlers.ids = append(idlers.ids, id)
idlers.caps = append(idlers.caps, s.rates.Capacity(id, StorageRangesMsg, targetTTL))
}
if len(idlers.ids) == 0 {
return
}
sort.Sort(idlers)
// Iterate over all the tasks and try to find a pending one
for _, task := range s.tasks {
// Skip any tasks not in the storage retrieval phase
@ -1059,20 +1089,15 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
// Task pending retrieval, try to find an idle peer. If no such peer
// exists, we probably assigned tasks for all (or they are stateless).
// Abort the entire assignment mechanism.
var idle string
for id := range s.storageIdlers {
// If the peer rejected a query in this sync cycle, don't bother asking
// again for anything, it's either out of sync or already pruned
if _, ok := s.statelessPeers[id]; ok {
continue
}
idle = id
break
}
if idle == "" {
if len(idlers.ids) == 0 {
return
}
peer := s.peers[idle]
var (
idle = idlers.ids[0]
peer = s.peers[idle]
cap = idlers.caps[0]
)
idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:]
// Matched a pending task to an idle peer, allocate a unique request id
var reqid uint64
@ -1089,7 +1114,6 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
// Generate the network query and send it to the peer. If there are
// large contract tasks pending, complete those before diving into
// even more new contracts.
cap := uint64(s.rates.Capacity(idle, StorageRangesMsg, s.rates.TargetRoundTrip()))
if cap > maxRequestSize {
cap = maxRequestSize
}
@ -1171,7 +1195,7 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
if subtask != nil {
origin, limit = req.origin[:], req.limit[:]
}
if err := peer.RequestStorageRanges(reqid, root, accounts, origin, limit, cap); err != nil {
if err := peer.RequestStorageRanges(reqid, root, accounts, origin, limit, uint64(cap)); err != nil {
log.Debug("Failed to request storage", "err", err)
s.scheduleRevertStorageRequest(req)
}
@ -1190,10 +1214,24 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
s.lock.Lock()
defer s.lock.Unlock()
// If there are no idle peers, short circuit assignment
if len(s.trienodeHealIdlers) == 0 {
// Sort the peers by download capacity to use faster ones if many available
idlers := &capacitySort{
ids: make([]string, 0, len(s.trienodeHealIdlers)),
caps: make([]float64, 0, len(s.trienodeHealIdlers)),
}
targetTTL := s.rates.TargetTimeout()
for id := range s.trienodeHealIdlers {
if _, ok := s.statelessPeers[id]; ok {
continue
}
idlers.ids = append(idlers.ids, id)
idlers.caps = append(idlers.caps, s.rates.Capacity(id, TrieNodesMsg, targetTTL))
}
if len(idlers.ids) == 0 {
return
}
sort.Sort(idlers)
// Iterate over pending tasks and try to find a peer to retrieve with
for len(s.healer.trieTasks) > 0 || s.healer.scheduler.Pending() > 0 {
// If there are not enough trie tasks queued to fully assign, fill the
@ -1219,20 +1257,15 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
// Task pending retrieval, try to find an idle peer. If no such peer
// exists, we probably assigned tasks for all (or they are stateless).
// Abort the entire assignment mechanism.
var idle string
for id := range s.trienodeHealIdlers {
// If the peer rejected a query in this sync cycle, don't bother asking
// again for anything, it's either out of sync or already pruned
if _, ok := s.statelessPeers[id]; ok {
continue
}
idle = id
break
}
if idle == "" {
if len(idlers.ids) == 0 {
return
}
peer := s.peers[idle]
var (
idle = idlers.ids[0]
peer = s.peers[idle]
cap = idlers.caps[0]
)
idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:]
// Matched a pending task to an idle peer, allocate a unique request id
var reqid uint64
@ -1247,14 +1280,13 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
break
}
// Generate the network query and send it to the peer
cap := int(s.rates.Capacity(idle, TrieNodesMsg, s.rates.TargetRoundTrip()))
if cap > maxTrieRequestCount {
cap = maxTrieRequestCount
}
var (
hashes = make([]common.Hash, 0, cap)
paths = make([]trie.SyncPath, 0, cap)
pathsets = make([]TrieNodePathSet, 0, cap)
hashes = make([]common.Hash, 0, int(cap))
paths = make([]trie.SyncPath, 0, int(cap))
pathsets = make([]TrieNodePathSet, 0, int(cap))
)
for hash, pathset := range s.healer.trieTasks {
delete(s.healer.trieTasks, hash)
@ -1263,7 +1295,7 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
paths = append(paths, pathset)
pathsets = append(pathsets, [][]byte(pathset)) // TODO(karalabe): group requests by account hash
if len(hashes) >= cap {
if len(hashes) >= int(cap) {
break
}
}
@ -1306,10 +1338,24 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai
s.lock.Lock()
defer s.lock.Unlock()
// If there are no idle peers, short circuit assignment
if len(s.bytecodeHealIdlers) == 0 {
// Sort the peers by download capacity to use faster ones if many available
idlers := &capacitySort{
ids: make([]string, 0, len(s.bytecodeHealIdlers)),
caps: make([]float64, 0, len(s.bytecodeHealIdlers)),
}
targetTTL := s.rates.TargetTimeout()
for id := range s.bytecodeHealIdlers {
if _, ok := s.statelessPeers[id]; ok {
continue
}
idlers.ids = append(idlers.ids, id)
idlers.caps = append(idlers.caps, s.rates.Capacity(id, ByteCodesMsg, targetTTL))
}
if len(idlers.ids) == 0 {
return
}
sort.Sort(idlers)
// Iterate over pending tasks and try to find a peer to retrieve with
for len(s.healer.codeTasks) > 0 || s.healer.scheduler.Pending() > 0 {
// If there are not enough trie tasks queued to fully assign, fill the
@ -1335,20 +1381,15 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai
// Task pending retrieval, try to find an idle peer. If no such peer
// exists, we probably assigned tasks for all (or they are stateless).
// Abort the entire assignment mechanism.
var idle string
for id := range s.bytecodeHealIdlers {
// If the peer rejected a query in this sync cycle, don't bother asking
// again for anything, it's either out of sync or already pruned
if _, ok := s.statelessPeers[id]; ok {
continue
}
idle = id
break
}
if idle == "" {
if len(idlers.ids) == 0 {
return
}
peer := s.peers[idle]
var (
idle = idlers.ids[0]
peer = s.peers[idle]
cap = idlers.caps[0]
)
idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:]
// Matched a pending task to an idle peer, allocate a unique request id
var reqid uint64
@ -1363,16 +1404,15 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai
break
}
// Generate the network query and send it to the peer
cap := int(s.rates.Capacity(idle, ByteCodesMsg, s.rates.TargetRoundTrip()))
if cap > maxCodeRequestCount {
cap = maxCodeRequestCount
}
hashes := make([]common.Hash, 0, cap)
hashes := make([]common.Hash, 0, int(cap))
for hash := range s.healer.codeTasks {
delete(s.healer.codeTasks, hash)
hashes = append(hashes, hash)
if len(hashes) >= cap {
if len(hashes) >= int(cap) {
break
}
}
@ -2630,6 +2670,7 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e
return nil
}
delete(s.bytecodeHealReqs, id)
s.rates.Update(peer.ID(), ByteCodesMsg, time.Since(req.time), len(bytecodes))
// Clean up the request timeout timer, we'll see how to proceed further based
// on the actual delivered content
@ -2652,7 +2693,6 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e
return nil
}
s.lock.Unlock()
s.rates.Update(peer.ID(), ByteCodesMsg, time.Since(req.time), len(bytecodes))
// Cross reference the requested bytecodes with the response to find gaps
// that the serving node is missing
@ -2806,3 +2846,23 @@ func estimateRemainingSlots(hashes int, last common.Hash) (uint64, error) {
}
return space.Uint64() - uint64(hashes), nil
}
// capacitySort implements the Sort interface, allowing sorting py peer message
// throughput (highest first).
type capacitySort struct {
ids []string
caps []float64
}
func (s *capacitySort) Len() int {
return len(s.ids)
}
func (s *capacitySort) Less(i, j int) bool {
return s.caps[i] > s.caps[j]
}
func (s *capacitySort) Swap(i, j int) {
s.ids[i], s.ids[j] = s.ids[j], s.ids[i]
s.caps[i], s.caps[j] = s.caps[j], s.caps[i]
}

Loading…
Cancel
Save