From 464660651ddf7e8938a0fbb03f140502180a8062 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 4 Aug 2016 02:10:44 +0200 Subject: [PATCH 1/3] rpc: don't exceed context deadline while waiting for send lock --- rpc/client.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rpc/client.go b/rpc/client.go index 4ff9a8cb9a..0c52402ea6 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -398,6 +398,10 @@ func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error err := c.write(ctx, msg) c.sendDone <- err return err + case <-ctx.Done(): + // This can happen if the client is overloaded or unable to keep up with + // subscription notifications. + return ctx.Err() case <-c.didQuit: return ErrClientQuit } From f5f042ffdc9fed3094b86f3dbbc85bb63a4f9537 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 4 Aug 2016 21:18:13 +0200 Subject: [PATCH 2/3] rpc: ensure client doesn't block for slow subscribers I initially made the client block if the 100-element buffer was exceeded. It turns out that this is inconvenient for simple uses of the client which subscribe and perform calls on the same goroutine, e.g. client, _ := rpc.Dial(...) ch := make(chan int) // note: no buffer sub, _ := client.EthSubscribe(ch, "something") for event := range ch { client.Call(...) } This innocent looking code will lock up if the server suddenly decides to send 2000 notifications. In this case, the client's main loop won't accept the call because it is trying to deliver a notification to ch. The issue is kind of hard to explain in the docs and few people will actually read them. Buffering is the simple option and works with close to no overhead for subscribers that always listen. --- rpc/client.go | 92 +++++++++++++++++++++++++++------------- rpc/client_test.go | 51 ++++++++++++++++++++++ rpc/notification_test.go | 4 ++ 3 files changed, 117 insertions(+), 30 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index 0c52402ea6..6846e1ddaf 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -18,6 +18,7 @@ package rpc import ( "bytes" + "container/list" "encoding/json" "errors" "fmt" @@ -35,16 +36,31 @@ import ( ) var ( - ErrClientQuit = errors.New("client is closed") - ErrNoResult = errors.New("no result in JSON-RPC response") + ErrClientQuit = errors.New("client is closed") + ErrNoResult = errors.New("no result in JSON-RPC response") + ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow") ) const ( - clientSubscriptionBuffer = 100 // if exceeded, the client stops reading - tcpKeepAliveInterval = 30 * time.Second - defaultDialTimeout = 10 * time.Second // used when dialing if the context has no deadline - defaultWriteTimeout = 10 * time.Second // used for calls if the context has no deadline - subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls + // Timeouts + tcpKeepAliveInterval = 30 * time.Second + defaultDialTimeout = 10 * time.Second // used when dialing if the context has no deadline + defaultWriteTimeout = 10 * time.Second // used for calls if the context has no deadline + subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls +) + +const ( + // Subscriptions are removed when the subscriber cannot keep up. + // + // This can be worked around by supplying a channel with sufficiently sized buffer, + // but this can be inconvenient and hard to explain in the docs. Another issue with + // buffered channels is that the buffer is static even though it might not be needed + // most of the time. + // + // The approach taken here is to maintain a per-subscription linked list buffer + // shrinks on demand. If the buffer reaches the size below, the subscription is + // dropped. + maxClientSubscriptionBuffer = 8000 ) // BatchElem is an element in a batch request. @@ -276,9 +292,9 @@ func (c *Client) BatchCall(b []BatchElem) error { // to return a response for all of them. The wait duration is bounded by the // context's deadline. // -// In contrast to CallContext, BatchCallContext only returns I/O errors. Any -// error specific to a request is reported through the Error field of the -// corresponding BatchElem. +// In contrast to CallContext, BatchCallContext only returns errors that have occurred +// while sending the request. Any error specific to a request is reported through the +// Error field of the corresponding BatchElem. // // Note that batch calls may not be executed atomically on the server side. func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { @@ -338,11 +354,11 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { // sent to the given channel. The element type of the channel must match the // expected type of content returned by the subscription. // -// Callers should not use the same channel for multiple calls to EthSubscribe. -// The channel is closed when the notification is unsubscribed or an error -// occurs. The error can be retrieved via the Err method of the subscription. // -// Slow subscribers will block the clients ingress path eventually. +// Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications +// before considering the subscriber dead. The subscription Err channel will receive +// ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure +// that the channel usually has at least one reader to prevent this issue. func (c *Client) EthSubscribe(channel interface{}, args ...interface{}) (*ClientSubscription, error) { // Check type of channel first. chanVal := reflect.ValueOf(channel) @@ -657,8 +673,7 @@ func newClientSubscription(c *Client, channel reflect.Value) *ClientSubscription channel: channel, quit: make(chan struct{}), err: make(chan error, 1), - // in is buffered so dispatch can continue even if the subscriber is slow. - in: make(chan json.RawMessage, clientSubscriptionBuffer), + in: make(chan json.RawMessage), } return sub } @@ -684,13 +699,16 @@ func (sub *ClientSubscription) Unsubscribe() { func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) { sub.quitOnce.Do(func() { + // The dispatch loop won't be able to execute the unsubscribe call + // if it is blocked on deliver. Close sub.quit first because it + // unblocks deliver. + close(sub.quit) if unsubscribeServer { sub.requestUnsubscribe() } if err != nil { sub.err <- err } - close(sub.quit) }) } @@ -710,32 +728,46 @@ func (sub *ClientSubscription) start() { func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) { cases := []reflect.SelectCase{ {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)}, + {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)}, {Dir: reflect.SelectSend, Chan: sub.channel}, } + buffer := list.New() + defer buffer.Init() for { - select { - case result := <-sub.in: - val, err := sub.unmarshal(result) + var chosen int + var recv reflect.Value + if buffer.Len() == 0 { + // Idle, omit send case. + chosen, recv, _ = reflect.Select(cases[:2]) + } else { + // Non-empty buffer, send the first queued item. + cases[2].Send = reflect.ValueOf(buffer.Front().Value) + chosen, recv, _ = reflect.Select(cases) + } + + switch chosen { + case 0: // <-sub.quit + return nil, false + case 1: // <-sub.in + val, err := sub.unmarshal(recv.Interface().(json.RawMessage)) if err != nil { return err, true } - cases[1].Send = val - switch chosen, _, _ := reflect.Select(cases); chosen { - case 0: // <-sub.quit - return nil, false - case 1: // sub.channel<- - continue + if buffer.Len() == maxClientSubscriptionBuffer { + return ErrSubscriptionQueueOverflow, true } - case <-sub.quit: - return nil, false + buffer.PushBack(val) + case 2: // sub.channel<- + cases[2].Send = reflect.Value{} // Don't hold onto the value. + buffer.Remove(buffer.Front()) } } } -func (sub *ClientSubscription) unmarshal(result json.RawMessage) (reflect.Value, error) { +func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) { val := reflect.New(sub.etype) err := json.Unmarshal(result, val.Interface()) - return val.Elem(), err + return val.Elem().Interface(), err } func (sub *ClientSubscription) requestUnsubscribe() error { diff --git a/rpc/client_test.go b/rpc/client_test.go index 58dceada06..424d7f5bc0 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -296,6 +296,57 @@ func TestClientSubscribeClose(t *testing.T) { } } +// This test checks that Client doesn't lock up when a single subscriber +// doesn't read subscription events. +func TestClientNotificationStorm(t *testing.T) { + server := newTestServer("eth", new(NotificationTestService)) + defer server.Stop() + + doTest := func(count int, wantError bool) { + client := DialInProc(server) + defer client.Close() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Subscribe on the server. It will start sending many notifications + // very quickly. + nc := make(chan int) + sub, err := client.EthSubscribe(nc, "someSubscription", count, 0) + if err != nil { + t.Fatal("can't subscribe:", err) + } + defer sub.Unsubscribe() + + // Process each notification, try to run a call in between each of them. + for i := 0; i < count; i++ { + select { + case val := <-nc: + if val != i { + t.Fatalf("(%d/%d) unexpected value %d", i, count, val) + } + case err := <-sub.Err(): + if wantError && err != ErrSubscriptionQueueOverflow { + t.Fatalf("(%d/%d) got error %q, want %q", i, count, err, ErrSubscriptionQueueOverflow) + } else if !wantError { + t.Fatalf("(%d/%d) got unexpected error %q", i, count, err) + } + return + } + var r int + err := client.CallContext(ctx, &r, "eth_echo", i) + if err != nil { + if !wantError { + t.Fatalf("(%d/%d) call error: %v", i, count, err) + } + return + } + } + } + + doTest(8000, false) + doTest(10000, true) +} + func TestClientHTTP(t *testing.T) { server := newTestServer("service", new(Service)) defer server.Stop() diff --git a/rpc/notification_test.go b/rpc/notification_test.go index 2805032225..52352848c4 100644 --- a/rpc/notification_test.go +++ b/rpc/notification_test.go @@ -34,6 +34,10 @@ type NotificationTestService struct { unblockHangSubscription chan struct{} } +func (s *NotificationTestService) Echo(i int) int { + return i +} + func (s *NotificationTestService) wasUnsubCallbackCalled() bool { s.mu.Lock() defer s.mu.Unlock() From e32925397b3110f6dd5e18c79f6641bd2a55776f Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 5 Aug 2016 13:24:48 +0200 Subject: [PATCH 3/3] rpc: add context argument to EthSubscribe It's inconsistent not to pass it and most callers will work with contexts anyway. --- rpc/client.go | 6 ++-- rpc/client_example_test.go | 57 +++++++++++++++++++++----------------- rpc/client_test.go | 8 +++--- 3 files changed, 38 insertions(+), 33 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index 6846e1ddaf..34a3b78317 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -354,12 +354,14 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { // sent to the given channel. The element type of the channel must match the // expected type of content returned by the subscription. // +// The context argument cancels the RPC request that sets up the subscription but has no +// effect on the subscription after EthSubscribe has returned. // // Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications // before considering the subscriber dead. The subscription Err channel will receive // ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure // that the channel usually has at least one reader to prevent this issue. -func (c *Client) EthSubscribe(channel interface{}, args ...interface{}) (*ClientSubscription, error) { +func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { // Check type of channel first. chanVal := reflect.ValueOf(channel) if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 { @@ -381,8 +383,6 @@ func (c *Client) EthSubscribe(channel interface{}, args ...interface{}) (*Client resp: make(chan *jsonrpcMessage), sub: newClientSubscription(c, chanVal), } - ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) - defer cancel() // Send the subscription request. // The arrival and validity of the response is signaled on sub.quit. diff --git a/rpc/client_example_test.go b/rpc/client_example_test.go index 84b4b67bbd..3462b3685b 100644 --- a/rpc/client_example_test.go +++ b/rpc/client_example_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/ethereum/go-ethereum/rpc" + "golang.org/x/net/context" ) // In this example, our client whishes to track the latest 'block number' @@ -41,7 +42,16 @@ func ExampleClientSubscription() { // Connect the client. client, _ := rpc.Dial("ws://127.0.0.1:8485") subch := make(chan Block) - go subscribeBlocks(client, subch) + + // Ensure that subch receives the latest block. + go func() { + for i := 0; ; i++ { + if i > 0 { + time.Sleep(2 * time.Second) + } + subscribeBlocks(client, subch) + } + }() // Print events from the subscription as they arrive. for block := range subch { @@ -52,32 +62,27 @@ func ExampleClientSubscription() { // subscribeBlocks runs in its own goroutine and maintains // a subscription for new blocks. func subscribeBlocks(client *rpc.Client, subch chan Block) { - for i := 0; ; i++ { - if i > 0 { - time.Sleep(2 * time.Second) - } - - // Subscribe to new blocks. - sub, err := client.EthSubscribe(subch, "newBlocks") - if err == rpc.ErrClientQuit { - return // Stop reconnecting if the client was closed. - } else if err != nil { - fmt.Println("subscribe error:", err) - continue - } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() - // The connection is established now. - // Update the channel with the current block. - var lastBlock Block - if err := client.Call(&lastBlock, "eth_getBlockByNumber", "latest"); err != nil { - fmt.Println("can't get latest block:", err) - continue - } - subch <- lastBlock + // Subscribe to new blocks. + sub, err := client.EthSubscribe(ctx, subch, "newBlocks") + if err != nil { + fmt.Println("subscribe error:", err) + return + } - // The subscription will deliver events to the channel. Wait for the - // subscription to end for any reason, then loop around to re-establish - // the connection. - fmt.Println("connection lost: ", <-sub.Err()) + // The connection is established now. + // Update the channel with the current block. + var lastBlock Block + if err := client.CallContext(ctx, &lastBlock, "eth_getBlockByNumber", "latest"); err != nil { + fmt.Println("can't get latest block:", err) + return } + subch <- lastBlock + + // The subscription will deliver events to the channel. Wait for the + // subscription to end for any reason, then loop around to re-establish + // the connection. + fmt.Println("connection lost: ", <-sub.Err()) } diff --git a/rpc/client_test.go b/rpc/client_test.go index 424d7f5bc0..476c8c6f36 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -215,7 +215,7 @@ func TestClientSubscribeInvalidArg(t *testing.T) { t.Error(string(buf)) } }() - client.EthSubscribe(arg, "foo_bar") + client.EthSubscribe(context.Background(), arg, "foo_bar") } check(true, nil) check(true, 1) @@ -233,7 +233,7 @@ func TestClientSubscribe(t *testing.T) { nc := make(chan int) count := 10 - sub, err := client.EthSubscribe(nc, "someSubscription", count, 0) + sub, err := client.EthSubscribe(context.Background(), nc, "someSubscription", count, 0) if err != nil { t.Fatal("can't subscribe:", err) } @@ -275,7 +275,7 @@ func TestClientSubscribeClose(t *testing.T) { err error ) go func() { - sub, err = client.EthSubscribe(nc, "hangSubscription", 999) + sub, err = client.EthSubscribe(context.Background(), nc, "hangSubscription", 999) errc <- err }() @@ -311,7 +311,7 @@ func TestClientNotificationStorm(t *testing.T) { // Subscribe on the server. It will start sending many notifications // very quickly. nc := make(chan int) - sub, err := client.EthSubscribe(nc, "someSubscription", count, 0) + sub, err := client.EthSubscribe(ctx, nc, "someSubscription", count, 0) if err != nil { t.Fatal("can't subscribe:", err) }