rpc: ensure client doesn't block for slow subscribers

I initially made the client block if the 100-element buffer was
exceeded. It turns out that this is inconvenient for simple uses of the
client which subscribe and perform calls on the same goroutine, e.g.

    client, _ := rpc.Dial(...)
    ch := make(chan int) // note: no buffer
    sub, _ := client.EthSubscribe(ch, "something")
    for event := range ch {
        client.Call(...)
    }

This innocent looking code will lock up if the server suddenly decides
to send 2000 notifications. In this case, the client's main loop won't
accept the call because it is trying to deliver a notification to ch.

The issue is kind of hard to explain in the docs and few people will
actually read them. Buffering is the simple option and works with close
to no overhead for subscribers that always listen.
pull/2891/head
Felix Lange 8 years ago
parent 464660651d
commit f5f042ffdc
  1. 92
      rpc/client.go
  2. 51
      rpc/client_test.go
  3. 4
      rpc/notification_test.go

@ -18,6 +18,7 @@ package rpc
import ( import (
"bytes" "bytes"
"container/list"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -35,16 +36,31 @@ import (
) )
var ( var (
ErrClientQuit = errors.New("client is closed") ErrClientQuit = errors.New("client is closed")
ErrNoResult = errors.New("no result in JSON-RPC response") ErrNoResult = errors.New("no result in JSON-RPC response")
ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow")
) )
const ( const (
clientSubscriptionBuffer = 100 // if exceeded, the client stops reading // Timeouts
tcpKeepAliveInterval = 30 * time.Second tcpKeepAliveInterval = 30 * time.Second
defaultDialTimeout = 10 * time.Second // used when dialing if the context has no deadline defaultDialTimeout = 10 * time.Second // used when dialing if the context has no deadline
defaultWriteTimeout = 10 * time.Second // used for calls if the context has no deadline defaultWriteTimeout = 10 * time.Second // used for calls if the context has no deadline
subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls
)
const (
// Subscriptions are removed when the subscriber cannot keep up.
//
// This can be worked around by supplying a channel with sufficiently sized buffer,
// but this can be inconvenient and hard to explain in the docs. Another issue with
// buffered channels is that the buffer is static even though it might not be needed
// most of the time.
//
// The approach taken here is to maintain a per-subscription linked list buffer
// shrinks on demand. If the buffer reaches the size below, the subscription is
// dropped.
maxClientSubscriptionBuffer = 8000
) )
// BatchElem is an element in a batch request. // BatchElem is an element in a batch request.
@ -276,9 +292,9 @@ func (c *Client) BatchCall(b []BatchElem) error {
// to return a response for all of them. The wait duration is bounded by the // to return a response for all of them. The wait duration is bounded by the
// context's deadline. // context's deadline.
// //
// In contrast to CallContext, BatchCallContext only returns I/O errors. Any // In contrast to CallContext, BatchCallContext only returns errors that have occurred
// error specific to a request is reported through the Error field of the // while sending the request. Any error specific to a request is reported through the
// corresponding BatchElem. // Error field of the corresponding BatchElem.
// //
// Note that batch calls may not be executed atomically on the server side. // Note that batch calls may not be executed atomically on the server side.
func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
@ -338,11 +354,11 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
// sent to the given channel. The element type of the channel must match the // sent to the given channel. The element type of the channel must match the
// expected type of content returned by the subscription. // expected type of content returned by the subscription.
// //
// Callers should not use the same channel for multiple calls to EthSubscribe.
// The channel is closed when the notification is unsubscribed or an error
// occurs. The error can be retrieved via the Err method of the subscription.
// //
// Slow subscribers will block the clients ingress path eventually. // Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications
// before considering the subscriber dead. The subscription Err channel will receive
// ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure
// that the channel usually has at least one reader to prevent this issue.
func (c *Client) EthSubscribe(channel interface{}, args ...interface{}) (*ClientSubscription, error) { func (c *Client) EthSubscribe(channel interface{}, args ...interface{}) (*ClientSubscription, error) {
// Check type of channel first. // Check type of channel first.
chanVal := reflect.ValueOf(channel) chanVal := reflect.ValueOf(channel)
@ -657,8 +673,7 @@ func newClientSubscription(c *Client, channel reflect.Value) *ClientSubscription
channel: channel, channel: channel,
quit: make(chan struct{}), quit: make(chan struct{}),
err: make(chan error, 1), err: make(chan error, 1),
// in is buffered so dispatch can continue even if the subscriber is slow. in: make(chan json.RawMessage),
in: make(chan json.RawMessage, clientSubscriptionBuffer),
} }
return sub return sub
} }
@ -684,13 +699,16 @@ func (sub *ClientSubscription) Unsubscribe() {
func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) { func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) {
sub.quitOnce.Do(func() { sub.quitOnce.Do(func() {
// The dispatch loop won't be able to execute the unsubscribe call
// if it is blocked on deliver. Close sub.quit first because it
// unblocks deliver.
close(sub.quit)
if unsubscribeServer { if unsubscribeServer {
sub.requestUnsubscribe() sub.requestUnsubscribe()
} }
if err != nil { if err != nil {
sub.err <- err sub.err <- err
} }
close(sub.quit)
}) })
} }
@ -710,32 +728,46 @@ func (sub *ClientSubscription) start() {
func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) { func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) {
cases := []reflect.SelectCase{ cases := []reflect.SelectCase{
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)}, {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)},
{Dir: reflect.SelectSend, Chan: sub.channel}, {Dir: reflect.SelectSend, Chan: sub.channel},
} }
buffer := list.New()
defer buffer.Init()
for { for {
select { var chosen int
case result := <-sub.in: var recv reflect.Value
val, err := sub.unmarshal(result) if buffer.Len() == 0 {
// Idle, omit send case.
chosen, recv, _ = reflect.Select(cases[:2])
} else {
// Non-empty buffer, send the first queued item.
cases[2].Send = reflect.ValueOf(buffer.Front().Value)
chosen, recv, _ = reflect.Select(cases)
}
switch chosen {
case 0: // <-sub.quit
return nil, false
case 1: // <-sub.in
val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
if err != nil { if err != nil {
return err, true return err, true
} }
cases[1].Send = val if buffer.Len() == maxClientSubscriptionBuffer {
switch chosen, _, _ := reflect.Select(cases); chosen { return ErrSubscriptionQueueOverflow, true
case 0: // <-sub.quit
return nil, false
case 1: // sub.channel<-
continue
} }
case <-sub.quit: buffer.PushBack(val)
return nil, false case 2: // sub.channel<-
cases[2].Send = reflect.Value{} // Don't hold onto the value.
buffer.Remove(buffer.Front())
} }
} }
} }
func (sub *ClientSubscription) unmarshal(result json.RawMessage) (reflect.Value, error) { func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) {
val := reflect.New(sub.etype) val := reflect.New(sub.etype)
err := json.Unmarshal(result, val.Interface()) err := json.Unmarshal(result, val.Interface())
return val.Elem(), err return val.Elem().Interface(), err
} }
func (sub *ClientSubscription) requestUnsubscribe() error { func (sub *ClientSubscription) requestUnsubscribe() error {

@ -296,6 +296,57 @@ func TestClientSubscribeClose(t *testing.T) {
} }
} }
// This test checks that Client doesn't lock up when a single subscriber
// doesn't read subscription events.
func TestClientNotificationStorm(t *testing.T) {
server := newTestServer("eth", new(NotificationTestService))
defer server.Stop()
doTest := func(count int, wantError bool) {
client := DialInProc(server)
defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Subscribe on the server. It will start sending many notifications
// very quickly.
nc := make(chan int)
sub, err := client.EthSubscribe(nc, "someSubscription", count, 0)
if err != nil {
t.Fatal("can't subscribe:", err)
}
defer sub.Unsubscribe()
// Process each notification, try to run a call in between each of them.
for i := 0; i < count; i++ {
select {
case val := <-nc:
if val != i {
t.Fatalf("(%d/%d) unexpected value %d", i, count, val)
}
case err := <-sub.Err():
if wantError && err != ErrSubscriptionQueueOverflow {
t.Fatalf("(%d/%d) got error %q, want %q", i, count, err, ErrSubscriptionQueueOverflow)
} else if !wantError {
t.Fatalf("(%d/%d) got unexpected error %q", i, count, err)
}
return
}
var r int
err := client.CallContext(ctx, &r, "eth_echo", i)
if err != nil {
if !wantError {
t.Fatalf("(%d/%d) call error: %v", i, count, err)
}
return
}
}
}
doTest(8000, false)
doTest(10000, true)
}
func TestClientHTTP(t *testing.T) { func TestClientHTTP(t *testing.T) {
server := newTestServer("service", new(Service)) server := newTestServer("service", new(Service))
defer server.Stop() defer server.Stop()

@ -34,6 +34,10 @@ type NotificationTestService struct {
unblockHangSubscription chan struct{} unblockHangSubscription chan struct{}
} }
func (s *NotificationTestService) Echo(i int) int {
return i
}
func (s *NotificationTestService) wasUnsubCallbackCalled() bool { func (s *NotificationTestService) wasUnsubCallbackCalled() bool {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()

Loading…
Cancel
Save