@ -18,7 +18,9 @@ package rpc
import (
import (
"context"
"context"
"errors"
"io"
"io"
"net"
"sync"
"sync"
"sync/atomic"
"sync/atomic"
@ -151,8 +153,8 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
reqs , batch , err := codec . readBatch ( )
reqs , batch , err := codec . readBatch ( )
if err != nil {
if err != nil {
if err != io . EOF {
if msg := messageForReadError ( err ) ; msg != "" {
resp := errorMessage ( & invalidMessageError { "parse error" } )
resp := errorMessage ( & invalidMessageError { msg } )
codec . writeJSON ( ctx , resp , true )
codec . writeJSON ( ctx , resp , true )
}
}
return
return
@ -164,6 +166,20 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
}
}
}
}
func messageForReadError ( err error ) string {
var netErr net . Error
if errors . As ( err , & netErr ) {
if netErr . Timeout ( ) {
return "read timeout"
} else {
return "read error"
}
} else if err != io . EOF {
return "parse error"
}
return ""
}
// Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending
// Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending
// requests to finish, then closes all codecs which will cancel pending requests and
// requests to finish, then closes all codecs which will cancel pending requests and
// subscriptions.
// subscriptions.