|
|
|
@ -40,6 +40,7 @@ type stateReq struct { |
|
|
|
|
timer *time.Timer // Timer to fire when the RTT timeout expires
|
|
|
|
|
peer *peerConnection // Peer that we're requesting from
|
|
|
|
|
response [][]byte // Response data of the peer (nil for timeouts)
|
|
|
|
|
dropped bool // Flag whether the peer dropped off early
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// timedOut returns if this request timed out.
|
|
|
|
@ -105,6 +106,11 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync { |
|
|
|
|
go s.run() |
|
|
|
|
defer s.Cancel() |
|
|
|
|
|
|
|
|
|
// Listen for peer departure events to cancel assigned tasks
|
|
|
|
|
peerDrop := make(chan *peerConnection, 1024) |
|
|
|
|
peerSub := s.d.peers.SubscribePeerDrops(peerDrop) |
|
|
|
|
defer peerSub.Unsubscribe() |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
|
// Enable sending of the first buffered element if there is one.
|
|
|
|
|
var ( |
|
|
|
@ -143,6 +149,20 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync { |
|
|
|
|
finished = append(finished, req) |
|
|
|
|
delete(active, pack.PeerId()) |
|
|
|
|
|
|
|
|
|
// Handle dropped peer connections:
|
|
|
|
|
case p := <-peerDrop: |
|
|
|
|
// Skip if no request is currently pending
|
|
|
|
|
req := active[p.id] |
|
|
|
|
if req == nil { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
// Finalize the request and queue up for processing
|
|
|
|
|
req.timer.Stop() |
|
|
|
|
req.dropped = true |
|
|
|
|
|
|
|
|
|
finished = append(finished, req) |
|
|
|
|
delete(active, p.id) |
|
|
|
|
|
|
|
|
|
// Handle timed-out requests:
|
|
|
|
|
case req := <-timeout: |
|
|
|
|
// If the peer is already requesting something else, ignore the stale timeout.
|
|
|
|
@ -167,6 +187,9 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync { |
|
|
|
|
log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id) |
|
|
|
|
|
|
|
|
|
// Make sure the previous one doesn't get siletly lost
|
|
|
|
|
old.timer.Stop() |
|
|
|
|
old.dropped = true |
|
|
|
|
|
|
|
|
|
finished = append(finished, old) |
|
|
|
|
} |
|
|
|
|
// Start a timer to notify the sync loop if the peer stalled.
|
|
|
|
@ -269,9 +292,9 @@ func (s *stateSync) loop() error { |
|
|
|
|
return errCancelStateFetch |
|
|
|
|
|
|
|
|
|
case req := <-s.deliver: |
|
|
|
|
// Response or timeout triggered, drop the peer if stalling
|
|
|
|
|
log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "timeout", req.timedOut()) |
|
|
|
|
if len(req.items) <= 2 && req.timedOut() { |
|
|
|
|
// Response, disconnect or timeout triggered, drop the peer if stalling
|
|
|
|
|
log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut()) |
|
|
|
|
if len(req.items) <= 2 && !req.dropped && req.timedOut() { |
|
|
|
|
// 2 items are the minimum requested, if even that times out, we've no use of
|
|
|
|
|
// this peer at the moment.
|
|
|
|
|
log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id) |
|
|
|
|