From 8dcbdcad0a92b053c7a5da2dc00b679c0044d050 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Mon, 15 Jun 2015 13:42:44 +0200 Subject: [PATCH 1/2] p2p: track write errors and prevent writes during shutdown As of this commit, we no longer rely on the protocol handler to report write errors in a timely fashion. When a write fails, shutdown is initiated immediately and no new writes can start. This will also prevent new writes from starting after Server.Stop has been called. --- p2p/peer.go | 82 +++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 57 insertions(+), 25 deletions(-) diff --git a/p2p/peer.go b/p2p/peer.go index cbe5ccc84a..5489273bdd 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -115,37 +115,54 @@ func newPeer(conn *conn, protocols []Protocol) *Peer { } func (p *Peer) run() DiscReason { - readErr := make(chan error, 1) + var ( + writeStart = make(chan struct{}, 1) + writeErr = make(chan error, 1) + readErr = make(chan error, 1) + reason DiscReason + requested bool + ) p.wg.Add(2) go p.readLoop(readErr) go p.pingLoop() - p.startProtocols() + // Start all protocol handlers. + writeStart <- struct{}{} + p.startProtocols(writeStart, writeErr) // Wait for an error or disconnect. - var ( - reason DiscReason - requested bool - ) - select { - case err := <-readErr: - if r, ok := err.(DiscReason); ok { - reason = r - } else { - // Note: We rely on protocols to abort if there is a write - // error. It might be more robust to handle them here as well. - glog.V(logger.Detail).Infof("%v: Read error: %v\n", p, err) - reason = DiscNetworkError +loop: + for { + select { + case err := <-writeErr: + // A write finished. Allow the next write to start if + // there was no error. + if err != nil { + glog.V(logger.Detail).Infof("%v: Write error: %v\n", p, err) + reason = DiscNetworkError + break loop + } + writeStart <- struct{}{} + case err := <-readErr: + if r, ok := err.(DiscReason); ok { + reason = r + } else { + glog.V(logger.Detail).Infof("%v: Read error: %v\n", p, err) + reason = DiscNetworkError + } + break loop + case err := <-p.protoErr: + reason = discReasonForError(err) + break loop + case reason = <-p.disc: + requested = true + break loop } - case err := <-p.protoErr: - reason = discReasonForError(err) - case reason = <-p.disc: - requested = true } + close(p.closed) p.rw.close(reason) p.wg.Wait() - if requested { reason = DiscRequested } @@ -247,11 +264,13 @@ outer: return result } -func (p *Peer) startProtocols() { +func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) { p.wg.Add(len(p.running)) for _, proto := range p.running { proto := proto proto.closed = p.closed + proto.wstart = writeStart + proto.werr = writeErr glog.V(logger.Detail).Infof("%v: Starting protocol %s/%d\n", p, proto.Name, proto.Version) go func() { err := proto.Run(p, proto) @@ -280,18 +299,31 @@ func (p *Peer) getProto(code uint64) (*protoRW, error) { type protoRW struct { Protocol - in chan Msg - closed <-chan struct{} + in chan Msg // receices read messages + closed <-chan struct{} // receives when peer is shutting down + wstart <-chan struct{} // receives when write may start + werr chan<- error // for write results offset uint64 w MsgWriter } -func (rw *protoRW) WriteMsg(msg Msg) error { +func (rw *protoRW) WriteMsg(msg Msg) (err error) { if msg.Code >= rw.Length { return newPeerError(errInvalidMsgCode, "not handled") } msg.Code += rw.offset - return rw.w.WriteMsg(msg) + select { + case <-rw.wstart: + err = rw.w.WriteMsg(msg) + // Report write status back to Peer.run. It will initiate + // shutdown if the error is non-nil and unblock the next write + // otherwise. The calling protocol code should exit for errors + // as well but we don't want to rely on that. + rw.werr <- err + case <-rw.closed: + err = fmt.Errorf("shutting down") + } + return err } func (rw *protoRW) ReadMsg() (Msg, error) { From 70da79f04c14e562c024e85c6b081b6b4b8e45ec Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Mon, 15 Jun 2015 14:00:50 +0200 Subject: [PATCH 2/2] p2p: improve disconnect logging --- p2p/peer.go | 11 ++++++----- p2p/peer_test.go | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/p2p/peer.go b/p2p/peer.go index 5489273bdd..40466cf846 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -138,24 +138,27 @@ loop: // A write finished. Allow the next write to start if // there was no error. if err != nil { - glog.V(logger.Detail).Infof("%v: Write error: %v\n", p, err) + glog.V(logger.Detail).Infof("%v: write error: %v\n", p, err) reason = DiscNetworkError break loop } writeStart <- struct{}{} case err := <-readErr: if r, ok := err.(DiscReason); ok { + glog.V(logger.Debug).Infof("%v: remote requested disconnect: %v\n", p, r) + requested = true reason = r } else { - glog.V(logger.Detail).Infof("%v: Read error: %v\n", p, err) + glog.V(logger.Detail).Infof("%v: read error: %v\n", p, err) reason = DiscNetworkError } break loop case err := <-p.protoErr: reason = discReasonForError(err) + glog.V(logger.Debug).Infof("%v: protocol error: %v (%v)\n", p, err, reason) break loop case reason = <-p.disc: - requested = true + glog.V(logger.Debug).Infof("%v: locally requested disconnect: %v\n", p, reason) break loop } } @@ -166,7 +169,6 @@ loop: if requested { reason = DiscRequested } - glog.V(logger.Debug).Infof("%v: Disconnected: %v\n", p, reason) return reason } @@ -213,7 +215,6 @@ func (p *Peer) handle(msg Msg) error { // This is the last message. We don't need to discard or // check errors because, the connection will be closed after it. rlp.Decode(msg.Payload, &reason) - glog.V(logger.Debug).Infof("%v: Disconnect Requested: %v\n", p, reason[0]) return reason[0] case msg.Code < baseProtocolLength: // ignore other base protocol messages diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 7b772e1988..575d0ff793 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -121,7 +121,7 @@ func TestPeerDisconnect(t *testing.T) { } select { case reason := <-disc: - if reason != DiscQuitting { + if reason != DiscRequested { t.Errorf("run returned wrong reason: got %v, want %v", reason, DiscRequested) } case <-time.After(500 * time.Millisecond):