diff --git a/Godeps/_workspace/src/github.com/huin/goupnp/httpu/httpu.go b/Godeps/_workspace/src/github.com/huin/goupnp/httpu/httpu.go
index 862c3def42..3f4606af0f 100644
--- a/Godeps/_workspace/src/github.com/huin/goupnp/httpu/httpu.go
+++ b/Godeps/_workspace/src/github.com/huin/goupnp/httpu/httpu.go
@@ -9,6 +9,8 @@ import (
"net/http"
"sync"
"time"
+
+ "github.com/ethereum/go-ethereum/fdtrack"
)
// HTTPUClient is a client for dealing with HTTPU (HTTP over UDP). Its typical
@@ -25,6 +27,7 @@ func NewHTTPUClient() (*HTTPUClient, error) {
if err != nil {
return nil, err
}
+ fdtrack.Open("upnp")
return &HTTPUClient{conn: conn}, nil
}
@@ -33,6 +36,7 @@ func NewHTTPUClient() (*HTTPUClient, error) {
func (httpu *HTTPUClient) Close() error {
httpu.connLock.Lock()
defer httpu.connLock.Unlock()
+ fdtrack.Close("upnp")
return httpu.conn.Close()
}
diff --git a/Godeps/_workspace/src/github.com/huin/goupnp/soap/soap.go b/Godeps/_workspace/src/github.com/huin/goupnp/soap/soap.go
index 815610734c..786ce6fa84 100644
--- a/Godeps/_workspace/src/github.com/huin/goupnp/soap/soap.go
+++ b/Godeps/_workspace/src/github.com/huin/goupnp/soap/soap.go
@@ -7,9 +7,12 @@ import (
"encoding/xml"
"fmt"
"io/ioutil"
+ "net"
"net/http"
"net/url"
"reflect"
+
+ "github.com/ethereum/go-ethereum/fdtrack"
)
const (
@@ -26,6 +29,17 @@ type SOAPClient struct {
func NewSOAPClient(endpointURL url.URL) *SOAPClient {
return &SOAPClient{
EndpointURL: endpointURL,
+ HTTPClient: http.Client{
+ Transport: &http.Transport{
+ Dial: func(network, addr string) (net.Conn, error) {
+ c, err := net.Dial(network, addr)
+ if c != nil {
+ c = fdtrack.WrapConn("upnp", c)
+ }
+ return c, err
+ },
+ },
+ },
}
}
diff --git a/Godeps/_workspace/src/github.com/jackpal/go-nat-pmp/natpmp.go b/Godeps/_workspace/src/github.com/jackpal/go-nat-pmp/natpmp.go
index 8ce4e8342e..b165c784e3 100644
--- a/Godeps/_workspace/src/github.com/jackpal/go-nat-pmp/natpmp.go
+++ b/Godeps/_workspace/src/github.com/jackpal/go-nat-pmp/natpmp.go
@@ -5,6 +5,8 @@ import (
"log"
"net"
"time"
+
+ "github.com/ethereum/go-ethereum/fdtrack"
)
// Implement the NAT-PMP protocol, typically supported by Apple routers and open source
@@ -102,6 +104,8 @@ func (n *Client) rpc(msg []byte, resultSize int) (result []byte, err error) {
if err != nil {
return
}
+ fdtrack.Open("natpmp")
+ defer fdtrack.Close("natpmp")
defer conn.Close()
result = make([]byte, resultSize)
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go
index 46cc9d0701..6d44f74b38 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go
@@ -18,6 +18,7 @@ import (
"sync"
"time"
+ "github.com/ethereum/go-ethereum/fdtrack"
"github.com/syndtr/goleveldb/leveldb/util"
)
@@ -369,6 +370,8 @@ func (fw fileWrap) Close() error {
err := fw.File.Close()
if err != nil {
f.fs.log(fmt.Sprintf("close %s.%d: %v", f.Type(), f.Num(), err))
+ } else {
+ fdtrack.Close("leveldb")
}
return err
}
@@ -400,6 +403,7 @@ func (f *file) Open() (Reader, error) {
return nil, err
}
ok:
+ fdtrack.Open("leveldb")
f.open = true
f.fs.open++
return fileWrap{of, f}, nil
@@ -418,6 +422,7 @@ func (f *file) Create() (Writer, error) {
if err != nil {
return nil, err
}
+ fdtrack.Open("leveldb")
f.open = true
f.fs.open++
return fileWrap{of, f}, nil
diff --git a/cmd/geth/main.go b/cmd/geth/main.go
index 511e0df9dd..00dd8f7535 100644
--- a/cmd/geth/main.go
+++ b/cmd/geth/main.go
@@ -38,6 +38,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/fdtrack"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/metrics"
@@ -532,6 +533,9 @@ func startEth(ctx *cli.Context, eth *eth.Ethereum) {
// Start Ethereum itself
utils.StartEthereum(eth)
+ // Start logging file descriptor stats.
+ fdtrack.Start()
+
am := eth.AccountManager()
account := ctx.GlobalString(utils.UnlockedAccountFlag.Name)
accounts := strings.Split(account, " ")
diff --git a/fdtrack/fdtrack.go b/fdtrack/fdtrack.go
new file mode 100644
index 0000000000..2f5ab57f44
--- /dev/null
+++ b/fdtrack/fdtrack.go
@@ -0,0 +1,112 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+// Package fdtrack logs statistics about open file descriptors.
+package fdtrack
+
+import (
+ "fmt"
+ "net"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/logger/glog"
+)
+
+var (
+ mutex sync.Mutex
+ all = make(map[string]int)
+)
+
+func Open(desc string) {
+ mutex.Lock()
+ all[desc] += 1
+ mutex.Unlock()
+}
+
+func Close(desc string) {
+ mutex.Lock()
+ defer mutex.Unlock()
+ if c, ok := all[desc]; ok {
+ if c == 1 {
+ delete(all, desc)
+ } else {
+ all[desc]--
+ }
+ }
+}
+
+func WrapListener(desc string, l net.Listener) net.Listener {
+ Open(desc)
+ return &wrappedListener{l, desc}
+}
+
+type wrappedListener struct {
+ net.Listener
+ desc string
+}
+
+func (w *wrappedListener) Accept() (net.Conn, error) {
+ c, err := w.Listener.Accept()
+ if err == nil {
+ c = WrapConn(w.desc, c)
+ }
+ return c, err
+}
+
+func (w *wrappedListener) Close() error {
+ err := w.Listener.Close()
+ if err == nil {
+ Close(w.desc)
+ }
+ return err
+}
+
+func WrapConn(desc string, conn net.Conn) net.Conn {
+ Open(desc)
+ return &wrappedConn{conn, desc}
+}
+
+type wrappedConn struct {
+ net.Conn
+ desc string
+}
+
+func (w *wrappedConn) Close() error {
+ err := w.Conn.Close()
+ if err == nil {
+ Close(w.desc)
+ }
+ return err
+}
+
+func Start() {
+ go func() {
+ for range time.Tick(15 * time.Second) {
+ mutex.Lock()
+ var sum, tracked = 0, []string{}
+ for what, n := range all {
+ sum += n
+ tracked = append(tracked, fmt.Sprintf("%s:%d", what, n))
+ }
+ mutex.Unlock()
+ used, _ := fdusage()
+ sort.Strings(tracked)
+ glog.Infof("fd usage %d/%d, tracked %d %v", used, fdlimit(), sum, tracked)
+ }
+ }()
+}
diff --git a/fdtrack/fdusage.go b/fdtrack/fdusage.go
new file mode 100644
index 0000000000..689625a8f5
--- /dev/null
+++ b/fdtrack/fdusage.go
@@ -0,0 +1,29 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+// +build !linux,!darwin
+
+package fdtrack
+
+import "errors"
+
+func fdlimit() int {
+ return 0
+}
+
+func fdusage() (int, error) {
+ return 0, errors.New("not implemented")
+}
diff --git a/fdtrack/fdusage_darwin.go b/fdtrack/fdusage_darwin.go
new file mode 100644
index 0000000000..04a3a9baf7
--- /dev/null
+++ b/fdtrack/fdusage_darwin.go
@@ -0,0 +1,72 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+// +build darwin
+
+package fdtrack
+
+import (
+ "os"
+ "syscall"
+ "unsafe"
+)
+
+// #cgo CFLAGS: -lproc
+// #include
+// #include
+import "C"
+
+func fdlimit() int {
+ var nofile syscall.Rlimit
+ if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &nofile); err != nil {
+ return 0
+ }
+ return int(nofile.Cur)
+}
+
+func fdusage() (int, error) {
+ pid := C.int(os.Getpid())
+ // Query for a rough estimate on the amout of data that
+ // proc_pidinfo will return.
+ rlen, err := C.proc_pidinfo(pid, C.PROC_PIDLISTFDS, 0, nil, 0)
+ if rlen <= 0 {
+ return 0, err
+ }
+ // Load the list of file descriptors. We don't actually care about
+ // the content, only about the size. Since the number of fds can
+ // change while we're reading them, the loop enlarges the buffer
+ // until proc_pidinfo says the result fitted.
+ var buf unsafe.Pointer
+ defer func() {
+ if buf != nil {
+ C.free(buf)
+ }
+ }()
+ for buflen := rlen; ; buflen *= 2 {
+ buf, err = C.reallocf(buf, C.size_t(buflen))
+ if buf == nil {
+ return 0, err
+ }
+ rlen, err = C.proc_pidinfo(pid, C.PROC_PIDLISTFDS, 0, buf, buflen)
+ if rlen <= 0 {
+ return 0, err
+ } else if rlen == buflen {
+ continue
+ }
+ return int(rlen / C.PROC_PIDLISTFD_SIZE), nil
+ }
+ panic("unreachable")
+}
diff --git a/fdtrack/fdusage_linux.go b/fdtrack/fdusage_linux.go
new file mode 100644
index 0000000000..d9a856a0ca
--- /dev/null
+++ b/fdtrack/fdusage_linux.go
@@ -0,0 +1,53 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+// +build linux
+
+package fdtrack
+
+import (
+ "io"
+ "os"
+ "syscall"
+)
+
+func fdlimit() int {
+ var nofile syscall.Rlimit
+ if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &nofile); err != nil {
+ return 0
+ }
+ return int(nofile.Cur)
+}
+
+func fdusage() (int, error) {
+ f, err := os.Open("/proc/self/fd")
+ if err != nil {
+ return 0, err
+ }
+ defer f.Close()
+ const batchSize = 100
+ n := 0
+ for {
+ list, err := f.Readdirnames(batchSize)
+ n += len(list)
+ if err == io.EOF {
+ break
+ } else if err != nil {
+ return 0, err
+ }
+ }
+ return n, nil
+}
diff --git a/metrics/disk_linux.go b/metrics/disk_linux.go
index e0c8a1a3a7..8967d490e8 100644
--- a/metrics/disk_linux.go
+++ b/metrics/disk_linux.go
@@ -34,6 +34,7 @@ func ReadDiskStats(stats *DiskStats) error {
if err != nil {
return err
}
+ defer inf.Close()
in := bufio.NewReader(inf)
// Iterate over the IO counter, and extract what we need
diff --git a/p2p/dial.go b/p2p/dial.go
index 0fd3a4cf52..8b210bacd2 100644
--- a/p2p/dial.go
+++ b/p2p/dial.go
@@ -23,6 +23,7 @@ import (
"net"
"time"
+ "github.com/ethereum/go-ethereum/fdtrack"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/p2p/discover"
@@ -212,6 +213,7 @@ func (t *dialTask) Do(srv *Server) {
glog.V(logger.Detail).Infof("dial error: %v", err)
return
}
+ fd = fdtrack.WrapConn("p2p", fd)
mfd := newMeteredConn(fd, false)
srv.setupConn(mfd, t.flags, t.dest)
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go
index 4e6ecaf237..d7ca9000d4 100644
--- a/p2p/discover/udp.go
+++ b/p2p/discover/udp.go
@@ -25,6 +25,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/fdtrack"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/p2p/nat"
@@ -197,6 +198,7 @@ func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, nodeDBP
if err != nil {
return nil, err
}
+ fdtrack.Open("p2p")
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
@@ -234,6 +236,7 @@ func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, nodeDBPath strin
func (t *udp) close() {
close(t.closing)
+ fdtrack.Close("p2p")
t.conn.Close()
// TODO: wait for the loops to end.
}
diff --git a/p2p/metrics.go b/p2p/metrics.go
index f98cac2742..8ee4ed04b6 100644
--- a/p2p/metrics.go
+++ b/p2p/metrics.go
@@ -34,7 +34,7 @@ var (
// meteredConn is a wrapper around a network TCP connection that meters both the
// inbound and outbound network traffic.
type meteredConn struct {
- *net.TCPConn // Network connection to wrap with metering
+ net.Conn
}
// newMeteredConn creates a new metered connection, also bumping the ingress or
@@ -45,13 +45,13 @@ func newMeteredConn(conn net.Conn, ingress bool) net.Conn {
} else {
egressConnectMeter.Mark(1)
}
- return &meteredConn{conn.(*net.TCPConn)}
+ return &meteredConn{conn}
}
// Read delegates a network read to the underlying connection, bumping the ingress
// traffic meter along the way.
func (c *meteredConn) Read(b []byte) (n int, err error) {
- n, err = c.TCPConn.Read(b)
+ n, err = c.Conn.Read(b)
ingressTrafficMeter.Mark(int64(n))
return
}
@@ -59,7 +59,7 @@ func (c *meteredConn) Read(b []byte) (n int, err error) {
// Write delegates a network write to the underlying connection, bumping the
// egress traffic meter along the way.
func (c *meteredConn) Write(b []byte) (n int, err error) {
- n, err = c.TCPConn.Write(b)
+ n, err = c.Conn.Write(b)
egressTrafficMeter.Mark(int64(n))
return
}
diff --git a/p2p/server.go b/p2p/server.go
index ba83c55035..7351a26544 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -25,6 +25,7 @@ import (
"sync"
"time"
+ "github.com/ethereum/go-ethereum/fdtrack"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/p2p/discover"
@@ -372,7 +373,7 @@ func (srv *Server) startListening() error {
}
laddr := listener.Addr().(*net.TCPAddr)
srv.ListenAddr = laddr.String()
- srv.listener = listener
+ srv.listener = fdtrack.WrapListener("p2p", listener)
srv.loopWG.Add(1)
go srv.listenLoop()
// Map the TCP listening port if NAT is configured.
diff --git a/rpc/comms/http.go b/rpc/comms/http.go
index 108ba0c5fd..c08b744a13 100644
--- a/rpc/comms/http.go
+++ b/rpc/comms/http.go
@@ -17,13 +17,19 @@
package comms
import (
+ "encoding/json"
"fmt"
+ "net"
"net/http"
"strings"
+ "sync"
+ "time"
"bytes"
+ "io"
"io/ioutil"
+ "github.com/ethereum/go-ethereum/fdtrack"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/rpc/codec"
@@ -31,10 +37,15 @@ import (
"github.com/rs/cors"
)
+const (
+ serverIdleTimeout = 10 * time.Second // idle keep-alive connections
+ serverReadTimeout = 15 * time.Second // per-request read timeout
+ serverWriteTimeout = 15 * time.Second // per-request read timeout
+)
+
var (
- // main HTTP rpc listener
- httpListener *stoppableTCPListener
- listenerStoppedError = fmt.Errorf("Listener has stopped")
+ httpServerMu sync.Mutex
+ httpServer *stopServer
)
type HttpConfig struct {
@@ -43,42 +54,172 @@ type HttpConfig struct {
CorsDomain string
}
+// stopServer augments http.Server with idle connection tracking.
+// Idle keep-alive connections are shut down when Close is called.
+type stopServer struct {
+ *http.Server
+ l net.Listener
+ // connection tracking state
+ mu sync.Mutex
+ shutdown bool // true when Stop has returned
+ idle map[net.Conn]struct{}
+}
+
+type handler struct {
+ codec codec.Codec
+ api shared.EthereumApi
+}
+
+// StartHTTP starts listening for RPC requests sent via HTTP.
func StartHttp(cfg HttpConfig, codec codec.Codec, api shared.EthereumApi) error {
- if httpListener != nil {
- if fmt.Sprintf("%s:%d", cfg.ListenAddress, cfg.ListenPort) != httpListener.Addr().String() {
- return fmt.Errorf("RPC service already running on %s ", httpListener.Addr().String())
+ httpServerMu.Lock()
+ defer httpServerMu.Unlock()
+
+ addr := fmt.Sprintf("%s:%d", cfg.ListenAddress, cfg.ListenPort)
+ if httpServer != nil {
+ if addr != httpServer.Addr {
+ return fmt.Errorf("RPC service already running on %s ", httpServer.Addr)
}
return nil // RPC service already running on given host/port
}
-
- l, err := newStoppableTCPListener(fmt.Sprintf("%s:%d", cfg.ListenAddress, cfg.ListenPort))
+ // Set up the request handler, wrapping it with CORS headers if configured.
+ handler := http.Handler(&handler{codec, api})
+ if len(cfg.CorsDomain) > 0 {
+ opts := cors.Options{
+ AllowedMethods: []string{"POST"},
+ AllowedOrigins: strings.Split(cfg.CorsDomain, " "),
+ }
+ handler = cors.New(opts).Handler(handler)
+ }
+ // Start the server.
+ s, err := listenHTTP(addr, handler)
if err != nil {
glog.V(logger.Error).Infof("Can't listen on %s:%d: %v", cfg.ListenAddress, cfg.ListenPort, err)
return err
}
- httpListener = l
+ httpServer = s
+ return nil
+}
- var handler http.Handler
- if len(cfg.CorsDomain) > 0 {
- var opts cors.Options
- opts.AllowedMethods = []string{"POST"}
- opts.AllowedOrigins = strings.Split(cfg.CorsDomain, " ")
+func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
- c := cors.New(opts)
- handler = newStoppableHandler(c.Handler(gethHttpHandler(codec, api)), l.stop)
- } else {
- handler = newStoppableHandler(gethHttpHandler(codec, api), l.stop)
+ // Limit request size to resist DoS
+ if req.ContentLength > maxHttpSizeReqLength {
+ err := fmt.Errorf("Request too large")
+ response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err)
+ sendJSON(w, &response)
+ return
}
- go http.Serve(l, handler)
+ defer req.Body.Close()
+ payload, err := ioutil.ReadAll(req.Body)
+ if err != nil {
+ err := fmt.Errorf("Could not read request body")
+ response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err)
+ sendJSON(w, &response)
+ return
+ }
- return nil
+ c := h.codec.New(nil)
+ var rpcReq shared.Request
+ if err = c.Decode(payload, &rpcReq); err == nil {
+ reply, err := h.api.Execute(&rpcReq)
+ res := shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err)
+ sendJSON(w, &res)
+ return
+ }
+
+ var reqBatch []shared.Request
+ if err = c.Decode(payload, &reqBatch); err == nil {
+ resBatch := make([]*interface{}, len(reqBatch))
+ resCount := 0
+ for i, rpcReq := range reqBatch {
+ reply, err := h.api.Execute(&rpcReq)
+ if rpcReq.Id != nil { // this leaves nil entries in the response batch for later removal
+ resBatch[i] = shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err)
+ resCount += 1
+ }
+ }
+ // make response omitting nil entries
+ sendJSON(w, resBatch[:resCount])
+ return
+ }
+
+ // invalid request
+ err = fmt.Errorf("Could not decode request")
+ res := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32600, err)
+ sendJSON(w, res)
}
+func sendJSON(w io.Writer, v interface{}) {
+ if glog.V(logger.Detail) {
+ if payload, err := json.MarshalIndent(v, "", "\t"); err == nil {
+ glog.Infof("Sending payload: %s", payload)
+ }
+ }
+ if err := json.NewEncoder(w).Encode(v); err != nil {
+ glog.V(logger.Error).Infoln("Error sending JSON:", err)
+ }
+}
+
+// Stop closes all active HTTP connections and shuts down the server.
func StopHttp() {
- if httpListener != nil {
- httpListener.Stop()
- httpListener = nil
+ httpServerMu.Lock()
+ defer httpServerMu.Unlock()
+ if httpServer != nil {
+ httpServer.Close()
+ httpServer = nil
+ }
+}
+
+func listenHTTP(addr string, h http.Handler) (*stopServer, error) {
+ l, err := net.Listen("tcp", addr)
+ if err != nil {
+ return nil, err
+ }
+ l = fdtrack.WrapListener("rpc", l)
+ s := &stopServer{l: l, idle: make(map[net.Conn]struct{})}
+ s.Server = &http.Server{
+ Addr: addr,
+ Handler: h,
+ ReadTimeout: serverReadTimeout,
+ WriteTimeout: serverWriteTimeout,
+ ConnState: s.connState,
+ }
+ go s.Serve(l)
+ return s, nil
+}
+
+func (s *stopServer) connState(c net.Conn, state http.ConnState) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ // Close c immediately if we're past shutdown.
+ if s.shutdown {
+ if state != http.StateClosed {
+ c.Close()
+ }
+ return
+ }
+ if state == http.StateIdle {
+ s.idle[c] = struct{}{}
+ } else {
+ delete(s.idle, c)
+ }
+}
+
+func (s *stopServer) Close() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ // Shut down the acceptor. No new connections can be created.
+ s.l.Close()
+ // Drop all idle connections. Non-idle connections will be
+ // closed by connState as soon as they become idle.
+ s.shutdown = true
+ for c := range s.idle {
+ glog.V(logger.Detail).Infof("closing idle connection %v", c.RemoteAddr())
+ c.Close()
+ delete(s.idle, c)
}
}
diff --git a/rpc/comms/http_net.go b/rpc/comms/http_net.go
deleted file mode 100644
index dba2029d40..0000000000
--- a/rpc/comms/http_net.go
+++ /dev/null
@@ -1,182 +0,0 @@
-// Copyright 2015 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package comms
-
-import (
- "encoding/json"
- "fmt"
- "io"
- "io/ioutil"
- "net"
- "net/http"
- "time"
-
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/logger/glog"
- "github.com/ethereum/go-ethereum/rpc/codec"
- "github.com/ethereum/go-ethereum/rpc/shared"
-)
-
-// When https://github.com/golang/go/issues/4674 is implemented this could be replaced
-type stoppableTCPListener struct {
- *net.TCPListener
- stop chan struct{} // closed when the listener must stop
-}
-
-func newStoppableTCPListener(addr string) (*stoppableTCPListener, error) {
- wl, err := net.Listen("tcp", addr)
- if err != nil {
- return nil, err
- }
-
- if tcpl, ok := wl.(*net.TCPListener); ok {
- stop := make(chan struct{})
- return &stoppableTCPListener{tcpl, stop}, nil
- }
-
- return nil, fmt.Errorf("Unable to create TCP listener for RPC service")
-}
-
-// Stop the listener and all accepted and still active connections.
-func (self *stoppableTCPListener) Stop() {
- close(self.stop)
-}
-
-func (self *stoppableTCPListener) Accept() (net.Conn, error) {
- for {
- self.SetDeadline(time.Now().Add(time.Duration(1 * time.Second)))
- c, err := self.TCPListener.AcceptTCP()
-
- select {
- case <-self.stop:
- if c != nil { // accept timeout
- c.Close()
- }
- self.TCPListener.Close()
- return nil, listenerStoppedError
- default:
- }
-
- if err != nil {
- if netErr, ok := err.(net.Error); ok && netErr.Timeout() && netErr.Temporary() {
- continue // regular timeout
- }
- }
-
- return &closableConnection{c, self.stop}, err
- }
-}
-
-type closableConnection struct {
- *net.TCPConn
- closed chan struct{}
-}
-
-func (self *closableConnection) Read(b []byte) (n int, err error) {
- select {
- case <-self.closed:
- self.TCPConn.Close()
- return 0, io.EOF
- default:
- return self.TCPConn.Read(b)
- }
-}
-
-// Wraps the default handler and checks if the RPC service was stopped. In that case it returns an
-// error indicating that the service was stopped. This will only happen for connections which are
-// kept open (HTTP keep-alive) when the RPC service was shutdown.
-func newStoppableHandler(h http.Handler, stop chan struct{}) http.Handler {
- return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- select {
- case <-stop:
- w.Header().Set("Content-Type", "application/json")
- err := fmt.Errorf("RPC service stopped")
- response := shared.NewRpcResponse(-1, shared.JsonRpcVersion, nil, err)
- httpSend(w, response)
- default:
- h.ServeHTTP(w, r)
- }
- })
-}
-
-func httpSend(writer io.Writer, v interface{}) (n int, err error) {
- var payload []byte
- payload, err = json.MarshalIndent(v, "", "\t")
- if err != nil {
- glog.V(logger.Error).Infoln("Error marshalling JSON", err)
- return 0, err
- }
- glog.V(logger.Detail).Infof("Sending payload: %s", payload)
-
- return writer.Write(payload)
-}
-
-func gethHttpHandler(codec codec.Codec, a shared.EthereumApi) http.Handler {
- return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
- w.Header().Set("Content-Type", "application/json")
-
- // Limit request size to resist DoS
- if req.ContentLength > maxHttpSizeReqLength {
- err := fmt.Errorf("Request too large")
- response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err)
- httpSend(w, &response)
- return
- }
-
- defer req.Body.Close()
- payload, err := ioutil.ReadAll(req.Body)
- if err != nil {
- err := fmt.Errorf("Could not read request body")
- response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err)
- httpSend(w, &response)
- return
- }
-
- c := codec.New(nil)
- var rpcReq shared.Request
- if err = c.Decode(payload, &rpcReq); err == nil {
- reply, err := a.Execute(&rpcReq)
- res := shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err)
- httpSend(w, &res)
- return
- }
-
- var reqBatch []shared.Request
- if err = c.Decode(payload, &reqBatch); err == nil {
- resBatch := make([]*interface{}, len(reqBatch))
- resCount := 0
-
- for i, rpcReq := range reqBatch {
- reply, err := a.Execute(&rpcReq)
- if rpcReq.Id != nil { // this leaves nil entries in the response batch for later removal
- resBatch[i] = shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err)
- resCount += 1
- }
- }
-
- // make response omitting nil entries
- resBatch = resBatch[:resCount]
- httpSend(w, resBatch)
- return
- }
-
- // invalid request
- err = fmt.Errorf("Could not decode request")
- res := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32600, err)
- httpSend(w, res)
- })
-}
diff --git a/rpc/comms/ipc_unix.go b/rpc/comms/ipc_unix.go
index aff90cfaa0..432bf93b53 100644
--- a/rpc/comms/ipc_unix.go
+++ b/rpc/comms/ipc_unix.go
@@ -22,6 +22,7 @@ import (
"net"
"os"
+ "github.com/ethereum/go-ethereum/fdtrack"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/rpc/codec"
@@ -50,15 +51,16 @@ func (self *ipcClient) reconnect() error {
func startIpc(cfg IpcConfig, codec codec.Codec, api shared.EthereumApi) error {
os.Remove(cfg.Endpoint) // in case it still exists from a previous run
- l, err := net.ListenUnix("unix", &net.UnixAddr{Name: cfg.Endpoint, Net: "unix"})
+ l, err := net.Listen("unix", cfg.Endpoint)
if err != nil {
return err
}
+ l = fdtrack.WrapListener("ipc", l)
os.Chmod(cfg.Endpoint, 0600)
go func() {
for {
- conn, err := l.AcceptUnix()
+ conn, err := l.Accept()
if err != nil {
glog.V(logger.Error).Infof("Error accepting ipc connection - %v\n", err)
continue