|
|
|
@ -313,11 +313,12 @@ func (s *stateSync) loop() (err error) { |
|
|
|
|
s.d.dropPeer(req.peer.id) |
|
|
|
|
} |
|
|
|
|
// Process all the received blobs and check for stale delivery
|
|
|
|
|
if err = s.process(req); err != nil { |
|
|
|
|
delivered, err := s.process(req) |
|
|
|
|
if err != nil { |
|
|
|
|
log.Warn("Node data write error", "err", err) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
req.peer.SetNodeDataIdle(len(req.response)) |
|
|
|
|
req.peer.SetNodeDataIdle(delivered) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
@ -398,9 +399,11 @@ func (s *stateSync) fillTasks(n int, req *stateReq) { |
|
|
|
|
// process iterates over a batch of delivered state data, injecting each item
|
|
|
|
|
// into a running state sync, re-queuing any items that were requested but not
|
|
|
|
|
// delivered.
|
|
|
|
|
func (s *stateSync) process(req *stateReq) error { |
|
|
|
|
// Returns whether the peer actually managed to deliver anything of value,
|
|
|
|
|
// and any error that occurred
|
|
|
|
|
func (s *stateSync) process(req *stateReq) (int, error) { |
|
|
|
|
// Collect processing stats and update progress if valid data was received
|
|
|
|
|
duplicate, unexpected := 0, 0 |
|
|
|
|
duplicate, unexpected, successful := 0, 0, 0 |
|
|
|
|
|
|
|
|
|
defer func(start time.Time) { |
|
|
|
|
if duplicate > 0 || unexpected > 0 { |
|
|
|
@ -410,7 +413,6 @@ func (s *stateSync) process(req *stateReq) error { |
|
|
|
|
|
|
|
|
|
// Iterate over all the delivered data and inject one-by-one into the trie
|
|
|
|
|
progress := false |
|
|
|
|
|
|
|
|
|
for _, blob := range req.response { |
|
|
|
|
prog, hash, err := s.processNodeData(blob) |
|
|
|
|
switch err { |
|
|
|
@ -418,12 +420,13 @@ func (s *stateSync) process(req *stateReq) error { |
|
|
|
|
s.numUncommitted++ |
|
|
|
|
s.bytesUncommitted += len(blob) |
|
|
|
|
progress = progress || prog |
|
|
|
|
successful++ |
|
|
|
|
case trie.ErrNotRequested: |
|
|
|
|
unexpected++ |
|
|
|
|
case trie.ErrAlreadyProcessed: |
|
|
|
|
duplicate++ |
|
|
|
|
default: |
|
|
|
|
return fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err) |
|
|
|
|
return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err) |
|
|
|
|
} |
|
|
|
|
if _, ok := req.tasks[hash]; ok { |
|
|
|
|
delete(req.tasks, hash) |
|
|
|
@ -441,12 +444,12 @@ func (s *stateSync) process(req *stateReq) error { |
|
|
|
|
// If we've requested the node too many times already, it may be a malicious
|
|
|
|
|
// sync where nobody has the right data. Abort.
|
|
|
|
|
if len(task.attempts) >= npeers { |
|
|
|
|
return fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers) |
|
|
|
|
return successful, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers) |
|
|
|
|
} |
|
|
|
|
// Missing item, place into the retry queue.
|
|
|
|
|
s.tasks[hash] = task |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
return successful, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// processNodeData tries to inject a trie node data blob delivered from a remote
|
|
|
|
|