support for large requests/responses

pull/1332/head
Bas van Kervel 10 years ago
parent 6d92fdc0df
commit ffbe5656ff
  1. 46
      rpc/codec/json.go
  2. 1
      rpc/comms/ipc.go
  3. 2
      rpc/comms/ipc_unix.go

@ -2,28 +2,30 @@ package codec
import ( import (
"encoding/json" "encoding/json"
"fmt"
"net" "net"
"time"
"github.com/ethereum/go-ethereum/rpc/shared" "github.com/ethereum/go-ethereum/rpc/shared"
) )
const ( const (
MAX_REQUEST_SIZE = 1024 * 1024 MAX_REQUEST_SIZE = 1024 * 1024
MAX_RESPONSE_SIZE = 1024 * 1024 MAX_RESPONSE_SIZE = 1024 * 1024
) )
// Json serialization support // Json serialization support
type JsonCodec struct { type JsonCodec struct {
c net.Conn c net.Conn
buffer []byte buffer []byte
bytesInBuffer int bytesInBuffer int
} }
// Create new JSON coder instance // Create new JSON coder instance
func NewJsonCoder(conn net.Conn) ApiCoder { func NewJsonCoder(conn net.Conn) ApiCoder {
return &JsonCodec{ return &JsonCodec{
c: conn, c: conn,
buffer: make([]byte, MAX_REQUEST_SIZE), buffer: make([]byte, MAX_REQUEST_SIZE),
bytesInBuffer: 0, bytesInBuffer: 0,
} }
} }
@ -58,28 +60,40 @@ func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool,
} }
func (self *JsonCodec) ReadResponse() (interface{}, error) { func (self *JsonCodec) ReadResponse() (interface{}, error) {
var err error bytesInBuffer := 0
buf := make([]byte, MAX_RESPONSE_SIZE) buf := make([]byte, MAX_RESPONSE_SIZE)
n, _ := self.c.Read(buf)
var failure shared.ErrorResponse deadline := time.Now().Add(15 * time.Second)
if err = json.Unmarshal(buf[:n], &failure); err == nil && failure.Error != nil { self.c.SetDeadline(deadline)
return failure, nil
} for {
n, err := self.c.Read(buf[bytesInBuffer:])
if err != nil {
return nil, err
}
bytesInBuffer += n
var success shared.SuccessResponse var success shared.SuccessResponse
if err = json.Unmarshal(buf[:n], &success); err == nil { if err = json.Unmarshal(buf[:bytesInBuffer], &success); err == nil {
return success, nil return success, nil
}
var failure shared.ErrorResponse
if err = json.Unmarshal(buf[:bytesInBuffer], &failure); err == nil && failure.Error != nil {
return failure, nil
}
} }
return nil, err self.c.Close()
return nil, fmt.Errorf("Unable to read response")
} }
// Encode response to encoded form in underlying stream // Decode data
func (self *JsonCodec) Decode(data []byte, msg interface{}) error { func (self *JsonCodec) Decode(data []byte, msg interface{}) error {
return json.Unmarshal(data, msg) return json.Unmarshal(data, msg)
} }
// Encode message
func (self *JsonCodec) Encode(msg interface{}) ([]byte, error) { func (self *JsonCodec) Encode(msg interface{}) ([]byte, error) {
return json.Marshal(msg) return json.Marshal(msg)
} }

@ -16,6 +16,7 @@ type IpcConfig struct {
type ipcClient struct { type ipcClient struct {
endpoint string endpoint string
c net.Conn
codec codec.Codec codec codec.Codec
coder codec.ApiCoder coder codec.ApiCoder
} }

@ -18,7 +18,7 @@ func newIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) {
return nil, err return nil, err
} }
return &ipcClient{cfg.Endpoint, codec, codec.New(c)}, nil return &ipcClient{cfg.Endpoint, c, codec, codec.New(c)}, nil
} }
func (self *ipcClient) reconnect() error { func (self *ipcClient) reconnect() error {

Loading…
Cancel
Save