rpc: fix megacheck warnings

pull/14930/head
Egon Elbre 7 years ago committed by Felix Lange
parent 43437806fb
commit e063d538b8
  1. 23
      rpc/server.go
  2. 1
      rpc/subscription.go
  3. 16
      rpc/subscription_test.go
  4. 5
      rpc/types.go
  5. 27
      rpc/utils.go

@ -29,11 +29,7 @@ import (
"gopkg.in/fatih/set.v0" "gopkg.in/fatih/set.v0"
) )
const ( const MetadataApi = "rpc"
notificationBufferSize = 10000 // max buffered notifications before codec is closed
MetadataApi = "rpc"
)
// CodecOption specifies which type of messages this codec supports // CodecOption specifies which type of messages this codec supports
type CodecOption int type CodecOption int
@ -50,7 +46,6 @@ const (
func NewServer() *Server { func NewServer() *Server {
server := &Server{ server := &Server{
services: make(serviceRegistry), services: make(serviceRegistry),
subscriptions: make(subscriptionRegistry),
codecs: set.New(), codecs: set.New(),
run: 1, run: 1,
} }
@ -124,16 +119,6 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error {
return nil return nil
} }
// hasOption returns true if option is included in options, otherwise false
func hasOption(option CodecOption, options []CodecOption) bool {
for _, o := range options {
if option == o {
return true
}
}
return false
}
// serveRequest will reads requests from the codec, calls the RPC callback and // serveRequest will reads requests from the codec, calls the RPC callback and
// writes the response to the given codec. // writes the response to the given codec.
// //
@ -148,13 +133,11 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
const size = 64 << 10 const size = 64 << 10
buf := make([]byte, size) buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)] buf = buf[:runtime.Stack(buf, false)]
log.Error(fmt.Sprint(string(buf))) log.Error(string(buf))
} }
s.codecsMu.Lock() s.codecsMu.Lock()
s.codecs.Remove(codec) s.codecs.Remove(codec)
s.codecsMu.Unlock() s.codecsMu.Unlock()
return
}() }()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -246,7 +229,7 @@ func (s *Server) ServeSingleRequest(codec ServerCodec, options CodecOption) {
// close all codecs which will cancel pending requests/subscriptions. // close all codecs which will cancel pending requests/subscriptions.
func (s *Server) Stop() { func (s *Server) Stop() {
if atomic.CompareAndSwapInt32(&s.run, 1, 0) { if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
log.Debug(fmt.Sprint("RPC Server shutdown initiatied")) log.Debug("RPC Server shutdown initiatied")
s.codecsMu.Lock() s.codecsMu.Lock()
defer s.codecsMu.Unlock() defer s.codecsMu.Unlock()
s.codecs.Each(func(c interface{}) bool { s.codecs.Each(func(c interface{}) bool {

@ -53,7 +53,6 @@ type notifierKey struct{}
type Notifier struct { type Notifier struct {
codec ServerCodec codec ServerCodec
subMu sync.RWMutex // guards active and inactive maps subMu sync.RWMutex // guards active and inactive maps
stopped bool
active map[ID]*Subscription active map[ID]*Subscription
inactive map[ID]*Subscription inactive map[ID]*Subscription
} }

@ -165,7 +165,7 @@ func TestNotifications(t *testing.T) {
} }
func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSuccessResponse, func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSuccessResponse,
failures chan<- jsonErrResponse, notifications chan<- jsonNotification) { failures chan<- jsonErrResponse, notifications chan<- jsonNotification, errors chan<- error) {
// read and parse server messages // read and parse server messages
for { for {
@ -177,12 +177,14 @@ func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSucces
var responses []map[string]interface{} var responses []map[string]interface{}
if rmsg[0] == '[' { if rmsg[0] == '[' {
if err := json.Unmarshal(rmsg, &responses); err != nil { if err := json.Unmarshal(rmsg, &responses); err != nil {
t.Fatalf("Received invalid message: %s", rmsg) errors <- fmt.Errorf("Received invalid message: %s", rmsg)
return
} }
} else { } else {
var msg map[string]interface{} var msg map[string]interface{}
if err := json.Unmarshal(rmsg, &msg); err != nil { if err := json.Unmarshal(rmsg, &msg); err != nil {
t.Fatalf("Received invalid message: %s", rmsg) errors <- fmt.Errorf("Received invalid message: %s", rmsg)
return
} }
responses = append(responses, msg) responses = append(responses, msg)
} }
@ -216,7 +218,7 @@ func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSucces
} }
continue continue
} }
t.Fatalf("Received invalid message: %s", msg) errors <- fmt.Errorf("Received invalid message: %s", msg)
} }
} }
} }
@ -235,6 +237,8 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) {
successes = make(chan jsonSuccessResponse) successes = make(chan jsonSuccessResponse)
failures = make(chan jsonErrResponse) failures = make(chan jsonErrResponse)
notifications = make(chan jsonNotification) notifications = make(chan jsonNotification)
errors = make(chan error, 10)
) )
// setup and start server // setup and start server
@ -248,7 +252,7 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) {
defer server.Stop() defer server.Stop()
// wait for message and write them to the given channels // wait for message and write them to the given channels
go waitForMessages(t, in, successes, failures, notifications) go waitForMessages(t, in, successes, failures, notifications, errors)
// create subscriptions one by one // create subscriptions one by one
n := 3 n := 3
@ -297,6 +301,8 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) {
} }
select { select {
case err := <-errors:
t.Fatal(err)
case suc := <-successes: // subscription created case suc := <-successes: // subscription created
subids[namespaces[int(suc.Id.(float64))]] = suc.Result.(string) subids[namespaces[int(suc.Id.(float64))]] = suc.Result.(string)
case failure := <-failures: case failure := <-failures:

@ -48,7 +48,6 @@ type callback struct {
// service represents a registered object // service represents a registered object
type service struct { type service struct {
name string // name for service name string // name for service
rcvr reflect.Value // receiver of methods for the service
typ reflect.Type // receiver type typ reflect.Type // receiver type
callbacks callbacks // registered handlers callbacks callbacks // registered handlers
subscriptions subscriptions // available subscriptions/notifications subscriptions subscriptions // available subscriptions/notifications
@ -58,7 +57,6 @@ type service struct {
type serverRequest struct { type serverRequest struct {
id interface{} id interface{}
svcname string svcname string
rcvr reflect.Value
callb *callback callb *callback
args []reflect.Value args []reflect.Value
isUnsubscribe bool isUnsubscribe bool
@ -68,13 +66,10 @@ type serverRequest struct {
type serviceRegistry map[string]*service // collection of services type serviceRegistry map[string]*service // collection of services
type callbacks map[string]*callback // collection of RPC callbacks type callbacks map[string]*callback // collection of RPC callbacks
type subscriptions map[string]*callback // collection of subscription callbacks type subscriptions map[string]*callback // collection of subscription callbacks
type subscriptionRegistry map[string]*callback // collection of subscription callbacks
// Server represents a RPC server // Server represents a RPC server
type Server struct { type Server struct {
services serviceRegistry services serviceRegistry
muSubcriptions sync.Mutex // protects subscriptions
subscriptions subscriptionRegistry
run int32 run int32
codecsMu sync.Mutex codecsMu sync.Mutex

@ -119,21 +119,6 @@ func isHexNum(t reflect.Type) bool {
return t == bigIntType return t == bigIntType
} }
var blockNumberType = reflect.TypeOf((*BlockNumber)(nil)).Elem()
// Indication if the given block is a BlockNumber
func isBlockNumber(t reflect.Type) bool {
if t == nil {
return false
}
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
return t == blockNumberType
}
// suitableCallbacks iterates over the methods of the given type. It will determine if a method satisfies the criteria // suitableCallbacks iterates over the methods of the given type. It will determine if a method satisfies the criteria
// for a RPC callback or a subscription callback and adds it to the collection of callbacks or subscriptions. See server // for a RPC callback or a subscription callback and adds it to the collection of callbacks or subscriptions. See server
// documentation for a summary of these criteria. // documentation for a summary of these criteria.
@ -210,19 +195,13 @@ METHODS:
} }
switch mtype.NumOut() { switch mtype.NumOut() {
case 0, 1: case 0, 1, 2:
break if mtype.NumOut() == 2 && h.errPos == -1 { // method must one return value and 1 error
case 2:
if h.errPos == -1 { // method must one return value and 1 error
continue METHODS
}
break
default:
continue METHODS continue METHODS
} }
callbacks[mname] = &h callbacks[mname] = &h
} }
}
return callbacks, subscriptions return callbacks, subscriptions
} }

Loading…
Cancel
Save