@ -21,6 +21,7 @@ import (
"fmt"
"io"
"reflect"
"strconv"
"strings"
"sync"
@ -40,14 +41,14 @@ const (
type JSONRequest struct {
Method string ` json:"method" `
Version string ` json:"jsonrpc" `
Id * int64 ` json:"id,omitempty" `
Id json . RawMessage ` json:"id,omitempty" `
Payload json . RawMessage ` json:"params,omitempty" `
}
// JSON-RPC response
type JSONSuccessResponse struct {
Version string ` json:"jsonrpc" `
Id int64 ` json:"id" `
Id interface { } ` json:"id,omitempty " `
Result interface { } ` json:"result" `
}
@ -61,7 +62,7 @@ type JSONError struct {
// JSON-RPC error response
type JSONErrResponse struct {
Version string ` json:"jsonrpc" `
Id * int64 ` json:"id,omitempty" `
Id interface { } ` json:"id,omitempty" `
Error JSONError ` json:"error" `
}
@ -78,16 +79,16 @@ type jsonNotification struct {
Params jsonSubscription ` json:"params" `
}
// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has support for parsing arguments
// and serializing (result) objects.
// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It
// also has support for parsing arguments a nd serializing (result) objects.
type jsonCodec struct {
closed chan interface { }
closer sync . Once
d * json . Decoder
muEncoder sync . Mutex
e * json . Encoder
req JSONRequest
rw io . ReadWriteCloser
closer sync . Once // close closed channel once
closed chan interface { } // closed on Close
decMu sync . Mutex // guards d
d * json . Decoder // decodes incoming requests
encMu sync . Mutex // guards e
e * json . Encoder // encodes responses
rw io . ReadWriteCloser // connection
}
// NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0
@ -109,9 +110,13 @@ func isBatch(msg json.RawMessage) bool {
return false
}
// ReadRequestHeaders will read new requests without parsing the arguments. It will return a collection of requests, an
// indication if these requests are in batch form or an error when the incoming message could not be read/parsed.
// ReadRequestHeaders will read new requests without parsing the arguments. It will
// return a collection of requests, an indication if these requests are in batch
// form or an error when the incoming message could not be read/parsed.
func ( c * jsonCodec ) ReadRequestHeaders ( ) ( [ ] rpcRequest , bool , RPCError ) {
c . decMu . Lock ( )
defer c . decMu . Unlock ( )
var incomingMsg json . RawMessage
if err := c . d . Decode ( & incomingMsg ) ; err != nil {
return nil , false , & invalidRequestError { err . Error ( ) }
@ -124,21 +129,38 @@ func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, RPCError) {
return parseRequest ( incomingMsg )
}
// parseRequest will parse a single request from the given RawMessage. It will return the parsed request, an indication
// if the request was a batch or an error when the request could not be parsed.
// checkReqId returns an error when the given reqId isn't valid for RPC method calls.
// valid id's are strings, numbers or null
func checkReqId ( reqId json . RawMessage ) error {
if len ( reqId ) == 0 {
return fmt . Errorf ( "missing request id" )
}
if _ , err := strconv . ParseFloat ( string ( reqId ) , 64 ) ; err == nil {
return nil
}
var str string
if err := json . Unmarshal ( reqId , & str ) ; err == nil {
return nil
}
return fmt . Errorf ( "invalid request id" )
}
// parseRequest will parse a single request from the given RawMessage. It will return
// the parsed request, an indication if the request was a batch or an error when
// the request could not be parsed.
func parseRequest ( incomingMsg json . RawMessage ) ( [ ] rpcRequest , bool , RPCError ) {
var in JSONRequest
if err := json . Unmarshal ( incomingMsg , & in ) ; err != nil {
return nil , false , & invalidMessageError { err . Error ( ) }
}
if in . Id == nil {
return nil , false , & invalidMessageError { "Server cannot handle notifications" }
if err := checkReqId ( in . Id ) ; err ! = nil {
return nil , false , & invalidMessageError { err . Error ( ) }
}
// subscribe are special, they will always use `subscribeMethod` as service method
// subscribe are special, they will always use `subscribeMethod` as first param in the payloa d
if in . Method == subscribeMethod {
reqs := [ ] rpcRequest { rpcRequest { id : * in . Id , isPubSub : true } }
reqs := [ ] rpcRequest { rpcRequest { id : & in . Id , isPubSub : true } }
if len ( in . Payload ) > 0 {
// first param must be subscription name
var subscribeMethod [ 1 ] string
@ -156,7 +178,7 @@ func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) {
}
if in . Method == unsubscribeMethod {
return [ ] rpcRequest { rpcRequest { id : * in . Id , isPubSub : true ,
return [ ] rpcRequest { rpcRequest { id : & in . Id , isPubSub : true ,
method : unsubscribeMethod , params : in . Payload } } , false , nil
}
@ -167,10 +189,10 @@ func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) {
}
if len ( in . Payload ) == 0 {
return [ ] rpcRequest { rpcRequest { service : elems [ 0 ] , method : elems [ 1 ] , id : * in . Id } } , false , nil
return [ ] rpcRequest { rpcRequest { service : elems [ 0 ] , method : elems [ 1 ] , id : & in . Id } } , false , nil
}
return [ ] rpcRequest { rpcRequest { service : elems [ 0 ] , method : elems [ 1 ] , id : * in . Id , params : in . Payload } } , false , nil
return [ ] rpcRequest { rpcRequest { service : elems [ 0 ] , method : elems [ 1 ] , id : & in . Id , params : in . Payload } } , false , nil
}
// parseBatchRequest will parse a batch request into a collection of requests from the given RawMessage, an indication
@ -183,14 +205,17 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCErro
requests := make ( [ ] rpcRequest , len ( in ) )
for i , r := range in {
if r . Id = = nil {
return nil , tru e, & invalidMessageError { "Server cannot handle notifications" }
if err := checkReqId ( r . Id ) ; err ! = nil {
return nil , fals e, & invalidMessageError { err . Error ( ) }
}
// (un)subscribe are special, they will always use the same service.method
id := & in [ i ] . Id
// subscribe are special, they will always use `subscribeMethod` as first param in the payload
if r . Method == subscribeMethod {
requests [ i ] = rpcRequest { id : * r . Id , isPubSub : true }
requests [ i ] = rpcRequest { id : i d, isPubSub : true }
if len ( r . Payload ) > 0 {
// first param must be subscription name
var subscribeMethod [ 1 ] string
if err := json . Unmarshal ( r . Payload , & subscribeMethod ) ; err != nil {
glog . V ( logger . Debug ) . Infof ( "Unable to parse subscription method: %v\n" , err )
@ -207,7 +232,7 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCErro
}
if r . Method == unsubscribeMethod {
requests [ i ] = rpcRequest { id : * r . I d, isPubSub : true , method : unsubscribeMethod , params : r . Payload }
requests [ i ] = rpcRequest { id : i d, isPubSub : true , method : unsubscribeMethod , params : r . Payload }
continue
}
@ -217,9 +242,9 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCErro
}
if len ( r . Payload ) == 0 {
requests [ i ] = rpcRequest { service : elems [ 0 ] , method : elems [ 1 ] , id : * r . I d, params : nil }
requests [ i ] = rpcRequest { service : elems [ 0 ] , method : elems [ 1 ] , id : i d, params : nil }
} else {
requests [ i ] = rpcRequest { service : elems [ 0 ] , method : elems [ 1 ] , id : * r . I d, params : r . Payload }
requests [ i ] = rpcRequest { service : elems [ 0 ] , method : elems [ 1 ] , id : i d, params : r . Payload }
}
}
@ -236,58 +261,38 @@ func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interf
}
}
func countArguments ( args json . RawMessage ) ( int , error ) {
var cnt [ ] interface { }
if err := json . Unmarshal ( args , & cnt ) ; err != nil {
return - 1 , nil
}
return len ( cnt ) , nil
// parsePositionalArguments tries to parse the given args to an array of values with the given types.
// It returns the parsed values or an error when the args could not be parsed. Missing optional arguments
// are returned as reflect.Zero values.
func parsePositionalArguments ( args json . RawMessage , callbackArgs [ ] reflect . Type ) ( [ ] reflect . Value , RPCError ) {
params := make ( [ ] interface { } , 0 , len ( callbackArgs ) )
for _ , t := range callbackArgs {
params = append ( params , reflect . New ( t ) . Interface ( ) )
}
// parsePositionalArguments tries to parse the given args to an array of values with the given types. It returns the
// parsed values or an error when the args could not be parsed.
func parsePositionalArguments ( args json . RawMessage , argTypes [ ] reflect . Type ) ( [ ] reflect . Value , RPCError ) {
argValues := make ( [ ] reflect . Value , len ( argTypes ) )
params := make ( [ ] interface { } , len ( argTypes ) )
n , err := countArguments ( args )
if err != nil {
if err := json . Unmarshal ( args , & params ) ; err != nil {
return nil , & invalidParamsError { err . Error ( ) }
}
if n != len ( argTypes ) {
return nil , & invalidParamsError { fmt . Sprintf ( "insufficient params, want %d have %d" , len ( argTypes ) , n ) }
}
for i , t := range argTypes {
if t . Kind ( ) == reflect . Ptr {
// values must be pointers for the Unmarshal method, reflect.
// Dereference otherwise reflect.New would create **SomeType
argValues [ i ] = reflect . New ( t . Elem ( ) )
params [ i ] = argValues [ i ] . Interface ( )
// when not specified blockNumbers are by default latest (-1)
if blockNumber , ok := params [ i ] . ( * BlockNumber ) ; ok {
* blockNumber = BlockNumber ( - 1 )
if len ( params ) > len ( callbackArgs ) {
return nil , & invalidParamsError { fmt . Sprintf ( "too many params, want %d got %d" , len ( callbackArgs ) , len ( params ) ) }
}
} else {
argValues [ i ] = reflect . New ( t )
params [ i ] = argValues [ i ] . Interface ( )
// when not specified blockNumbers are by default latest (-1)
if blockNumber , ok := params [ i ] . ( * BlockNumber ) ; ok {
* blockNumber = BlockNumber ( - 1 )
}
}
// assume missing params are null values
for i := len ( params ) ; i < len ( callbackArgs ) ; i ++ {
params = append ( params , nil )
}
if err := json . Unmarshal ( args , & params ) ; err != nil {
return nil , & invalidParamsError { err . Error ( ) }
argValues := make ( [ ] reflect . Value , len ( params ) )
for i , p := range params {
// verify that JSON null values are only supplied for optional arguments (ptr types)
if p == nil && callbackArgs [ i ] . Kind ( ) != reflect . Ptr {
return nil , & invalidParamsError { fmt . Sprintf ( "invalid or missing value for params[%d]" , i ) }
}
// Convert pointers back to values where necessary
for i , a := range argValues {
if a . Kind ( ) != argTypes [ i ] . Kind ( ) {
argValues [ i ] = reflect . Indirect ( argValues [ i ] )
if p == nil {
argValues [ i ] = reflect . Zero ( callbackArgs [ i ] )
} else { // deref pointers values creates previously with reflect.New
argValues [ i ] = reflect . ValueOf ( p ) . Elem ( )
}
}
@ -295,7 +300,7 @@ func parsePositionalArguments(args json.RawMessage, argTypes []reflect.Type) ([]
}
// CreateResponse will create a JSON-RPC success response with the given id and reply as result.
func ( c * jsonCodec ) CreateResponse ( id int64 , reply interface { } ) interface { } {
func ( c * jsonCodec ) CreateResponse ( id interface { } , reply interface { } ) interface { } {
if isHexNum ( reflect . TypeOf ( reply ) ) {
return & JSONSuccessResponse { Version : jsonRPCVersion , Id : id , Result : fmt . Sprintf ( ` %#x ` , reply ) }
}
@ -303,13 +308,13 @@ func (c *jsonCodec) CreateResponse(id int64, reply interface{}) interface{} {
}
// CreateErrorResponse will create a JSON-RPC error response with the given id and error.
func ( c * jsonCodec ) CreateErrorResponse ( id * int64 , err RPCError ) interface { } {
func ( c * jsonCodec ) CreateErrorResponse ( id interface { } , err RPCError ) interface { } {
return & JSONErrResponse { Version : jsonRPCVersion , Id : id , Error : JSONError { Code : err . Code ( ) , Message : err . Error ( ) } }
}
// CreateErrorResponseWithInfo will create a JSON-RPC error response with the given id and error.
// info is optional and contains additional information about the error. When an empty string is passed it is ignored.
func ( c * jsonCodec ) CreateErrorResponseWithInfo ( id * int64 , err RPCError , info interface { } ) interface { } {
func ( c * jsonCodec ) CreateErrorResponseWithInfo ( id interface { } , err RPCError , info interface { } ) interface { } {
return & JSONErrResponse { Version : jsonRPCVersion , Id : id ,
Error : JSONError { Code : err . Code ( ) , Message : err . Error ( ) , Data : info } }
}
@ -327,8 +332,8 @@ func (c *jsonCodec) CreateNotification(subid string, event interface{}) interfac
// Write message to client
func ( c * jsonCodec ) Write ( res interface { } ) error {
c . muEncoder . Lock ( )
defer c . muEncoder . Unlock ( )
c . encMu . Lock ( )
defer c . encMu . Unlock ( )
return c . e . Encode ( res )
}