|
|
|
@ -265,18 +265,18 @@ type RpcErrorObject struct { |
|
|
|
|
// Data interface{} `json:"data"`
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type ListenerStoppedError struct { |
|
|
|
|
type listenerHasStoppedError struct { |
|
|
|
|
msg string |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self ListenerStoppedError) Error() string { |
|
|
|
|
func (self listenerHasStoppedError) Error() string { |
|
|
|
|
return self.msg |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var listenerStoppedError = ListenerStoppedError{"Listener stopped"} |
|
|
|
|
var listenerStoppedError = listenerHasStoppedError{"Listener stopped"} |
|
|
|
|
|
|
|
|
|
// When https://github.com/golang/go/issues/4674 is fixed this could be replaced
|
|
|
|
|
type StoppableTCPListener struct { |
|
|
|
|
type stoppableTCPListener struct { |
|
|
|
|
*net.TCPListener |
|
|
|
|
stop chan struct{} // closed when the listener must stop
|
|
|
|
|
} |
|
|
|
@ -284,7 +284,7 @@ type StoppableTCPListener struct { |
|
|
|
|
// Wraps the default handler and checks if the RPC service was stopped. In that case it returns an
|
|
|
|
|
// error indicating that the service was stopped. This will only happen for connections which are
|
|
|
|
|
// kept open (HTTP keep-alive) when the RPC service was shutdown.
|
|
|
|
|
func NewStoppableHandler(h http.Handler, stop chan struct{}) http.Handler { |
|
|
|
|
func newStoppableHandler(h http.Handler, stop chan struct{}) http.Handler { |
|
|
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
|
|
|
|
select { |
|
|
|
|
case <-stop: |
|
|
|
@ -298,11 +298,11 @@ func NewStoppableHandler(h http.Handler, stop chan struct{}) http.Handler { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Stop the listener and all accepted and still active connections.
|
|
|
|
|
func (self *StoppableTCPListener) Stop() { |
|
|
|
|
func (self *stoppableTCPListener) Stop() { |
|
|
|
|
close(self.stop) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func NewStoppableTCPListener(addr string) (*StoppableTCPListener, error) { |
|
|
|
|
func newStoppableTCPListener(addr string) (*stoppableTCPListener, error) { |
|
|
|
|
wl, err := net.Listen("tcp", addr) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
@ -310,14 +310,14 @@ func NewStoppableTCPListener(addr string) (*StoppableTCPListener, error) { |
|
|
|
|
|
|
|
|
|
if tcpl, ok := wl.(*net.TCPListener); ok { |
|
|
|
|
stop := make(chan struct{}) |
|
|
|
|
l := &StoppableTCPListener{tcpl, stop} |
|
|
|
|
l := &stoppableTCPListener{tcpl, stop} |
|
|
|
|
return l, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return nil, errors.New("Unable to create TCP listener for RPC service") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self *StoppableTCPListener) Accept() (net.Conn, error) { |
|
|
|
|
func (self *stoppableTCPListener) Accept() (net.Conn, error) { |
|
|
|
|
for { |
|
|
|
|
self.SetDeadline(time.Now().Add(time.Duration(1 * time.Second))) |
|
|
|
|
c, err := self.TCPListener.AcceptTCP() |
|
|
|
@ -338,16 +338,16 @@ func (self *StoppableTCPListener) Accept() (net.Conn, error) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return &ClosableConnection{c, self.stop}, err |
|
|
|
|
return &closableConnection{c, self.stop}, err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type ClosableConnection struct { |
|
|
|
|
type closableConnection struct { |
|
|
|
|
*net.TCPConn |
|
|
|
|
closed chan struct{} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self *ClosableConnection) Read(b []byte) (n int, err error) { |
|
|
|
|
func (self *closableConnection) Read(b []byte) (n int, err error) { |
|
|
|
|
select { |
|
|
|
|
case <-self.closed: |
|
|
|
|
self.TCPConn.Close() |
|
|
|
|