diff --git a/rpc/server.go b/rpc/server.go index 62b84af34e..30c288349e 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -29,11 +29,7 @@ import ( "gopkg.in/fatih/set.v0" ) -const ( - notificationBufferSize = 10000 // max buffered notifications before codec is closed - - MetadataApi = "rpc" -) +const MetadataApi = "rpc" // CodecOption specifies which type of messages this codec supports type CodecOption int @@ -49,10 +45,9 @@ const ( // NewServer will create a new server instance with no registered handlers. func NewServer() *Server { server := &Server{ - services: make(serviceRegistry), - subscriptions: make(subscriptionRegistry), - codecs: set.New(), - run: 1, + services: make(serviceRegistry), + codecs: set.New(), + run: 1, } // register a default service which will provide meta information about the RPC service such as the services and @@ -124,16 +119,6 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error { return nil } -// hasOption returns true if option is included in options, otherwise false -func hasOption(option CodecOption, options []CodecOption) bool { - for _, o := range options { - if option == o { - return true - } - } - return false -} - // serveRequest will reads requests from the codec, calls the RPC callback and // writes the response to the given codec. // @@ -148,13 +133,11 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO const size = 64 << 10 buf := make([]byte, size) buf = buf[:runtime.Stack(buf, false)] - log.Error(fmt.Sprint(string(buf))) + log.Error(string(buf)) } s.codecsMu.Lock() s.codecs.Remove(codec) s.codecsMu.Unlock() - - return }() ctx, cancel := context.WithCancel(context.Background()) @@ -246,7 +229,7 @@ func (s *Server) ServeSingleRequest(codec ServerCodec, options CodecOption) { // close all codecs which will cancel pending requests/subscriptions. func (s *Server) Stop() { if atomic.CompareAndSwapInt32(&s.run, 1, 0) { - log.Debug(fmt.Sprint("RPC Server shutdown initiatied")) + log.Debug("RPC Server shutdown initiatied") s.codecsMu.Lock() defer s.codecsMu.Unlock() s.codecs.Each(func(c interface{}) bool { diff --git a/rpc/subscription.go b/rpc/subscription.go index 720e4dd06b..6ce7befa1d 100644 --- a/rpc/subscription.go +++ b/rpc/subscription.go @@ -53,7 +53,6 @@ type notifierKey struct{} type Notifier struct { codec ServerCodec subMu sync.RWMutex // guards active and inactive maps - stopped bool active map[ID]*Subscription inactive map[ID]*Subscription } diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go index 0ed15ddfe8..39f7596923 100644 --- a/rpc/subscription_test.go +++ b/rpc/subscription_test.go @@ -165,7 +165,7 @@ func TestNotifications(t *testing.T) { } func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSuccessResponse, - failures chan<- jsonErrResponse, notifications chan<- jsonNotification) { + failures chan<- jsonErrResponse, notifications chan<- jsonNotification, errors chan<- error) { // read and parse server messages for { @@ -177,12 +177,14 @@ func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSucces var responses []map[string]interface{} if rmsg[0] == '[' { if err := json.Unmarshal(rmsg, &responses); err != nil { - t.Fatalf("Received invalid message: %s", rmsg) + errors <- fmt.Errorf("Received invalid message: %s", rmsg) + return } } else { var msg map[string]interface{} if err := json.Unmarshal(rmsg, &msg); err != nil { - t.Fatalf("Received invalid message: %s", rmsg) + errors <- fmt.Errorf("Received invalid message: %s", rmsg) + return } responses = append(responses, msg) } @@ -216,7 +218,7 @@ func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSucces } continue } - t.Fatalf("Received invalid message: %s", msg) + errors <- fmt.Errorf("Received invalid message: %s", msg) } } } @@ -235,6 +237,8 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) { successes = make(chan jsonSuccessResponse) failures = make(chan jsonErrResponse) notifications = make(chan jsonNotification) + + errors = make(chan error, 10) ) // setup and start server @@ -248,7 +252,7 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) { defer server.Stop() // wait for message and write them to the given channels - go waitForMessages(t, in, successes, failures, notifications) + go waitForMessages(t, in, successes, failures, notifications, errors) // create subscriptions one by one n := 3 @@ -297,6 +301,8 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) { } select { + case err := <-errors: + t.Fatal(err) case suc := <-successes: // subscription created subids[namespaces[int(suc.Id.(float64))]] = suc.Result.(string) case failure := <-failures: diff --git a/rpc/types.go b/rpc/types.go index a7b8c9788c..f2375604ed 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -48,7 +48,6 @@ type callback struct { // service represents a registered object type service struct { name string // name for service - rcvr reflect.Value // receiver of methods for the service typ reflect.Type // receiver type callbacks callbacks // registered handlers subscriptions subscriptions // available subscriptions/notifications @@ -58,23 +57,19 @@ type service struct { type serverRequest struct { id interface{} svcname string - rcvr reflect.Value callb *callback args []reflect.Value isUnsubscribe bool err Error } -type serviceRegistry map[string]*service // collection of services -type callbacks map[string]*callback // collection of RPC callbacks -type subscriptions map[string]*callback // collection of subscription callbacks -type subscriptionRegistry map[string]*callback // collection of subscription callbacks +type serviceRegistry map[string]*service // collection of services +type callbacks map[string]*callback // collection of RPC callbacks +type subscriptions map[string]*callback // collection of subscription callbacks // Server represents a RPC server type Server struct { - services serviceRegistry - muSubcriptions sync.Mutex // protects subscriptions - subscriptions subscriptionRegistry + services serviceRegistry run int32 codecsMu sync.Mutex diff --git a/rpc/utils.go b/rpc/utils.go index 2506c48331..9315cab591 100644 --- a/rpc/utils.go +++ b/rpc/utils.go @@ -119,21 +119,6 @@ func isHexNum(t reflect.Type) bool { return t == bigIntType } -var blockNumberType = reflect.TypeOf((*BlockNumber)(nil)).Elem() - -// Indication if the given block is a BlockNumber -func isBlockNumber(t reflect.Type) bool { - if t == nil { - return false - } - - for t.Kind() == reflect.Ptr { - t = t.Elem() - } - - return t == blockNumberType -} - // suitableCallbacks iterates over the methods of the given type. It will determine if a method satisfies the criteria // for a RPC callback or a subscription callback and adds it to the collection of callbacks or subscriptions. See server // documentation for a summary of these criteria. @@ -210,18 +195,12 @@ METHODS: } switch mtype.NumOut() { - case 0, 1: - break - case 2: - if h.errPos == -1 { // method must one return value and 1 error + case 0, 1, 2: + if mtype.NumOut() == 2 && h.errPos == -1 { // method must one return value and 1 error continue METHODS } - break - default: - continue METHODS + callbacks[mname] = &h } - - callbacks[mname] = &h } return callbacks, subscriptions