mirror of https://github.com/ethereum/go-ethereum
Merge pull request #2885 from bas-vk/subscriptions
rpc: refactor subscriptions and filterspull/2917/head
commit
3369783e0a
@ -1,297 +0,0 @@ |
|||||||
// Copyright 2016 The go-ethereum Authors
|
|
||||||
// This file is part of the go-ethereum library.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Lesser General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Lesser General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU Lesser General Public License
|
|
||||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package rpc |
|
||||||
|
|
||||||
import ( |
|
||||||
"errors" |
|
||||||
"sync" |
|
||||||
"time" |
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/logger" |
|
||||||
"github.com/ethereum/go-ethereum/logger/glog" |
|
||||||
"golang.org/x/net/context" |
|
||||||
) |
|
||||||
|
|
||||||
var ( |
|
||||||
// ErrNotificationsUnsupported is returned when the connection doesn't support notifications
|
|
||||||
ErrNotificationsUnsupported = errors.New("subscription notifications not supported by the current transport") |
|
||||||
|
|
||||||
// ErrNotificationNotFound is returned when the notification for the given id is not found
|
|
||||||
ErrNotificationNotFound = errors.New("notification not found") |
|
||||||
|
|
||||||
// errNotifierStopped is returned when the notifier is stopped (e.g. codec is closed)
|
|
||||||
errNotifierStopped = errors.New("unable to send notification") |
|
||||||
|
|
||||||
// errNotificationQueueFull is returns when there are too many notifications in the queue
|
|
||||||
errNotificationQueueFull = errors.New("too many pending notifications") |
|
||||||
) |
|
||||||
|
|
||||||
// unsubSignal is a signal that the subscription is unsubscribed. It is used to flush buffered
|
|
||||||
// notifications that might be pending in the internal queue.
|
|
||||||
var unsubSignal = new(struct{}) |
|
||||||
|
|
||||||
// UnsubscribeCallback defines a callback that is called when a subcription ends.
|
|
||||||
// It receives the subscription id as argument.
|
|
||||||
type UnsubscribeCallback func(id string) |
|
||||||
|
|
||||||
// notification is a helper object that holds event data for a subscription
|
|
||||||
type notification struct { |
|
||||||
sub *bufferedSubscription // subscription id
|
|
||||||
data interface{} // event data
|
|
||||||
} |
|
||||||
|
|
||||||
// A Notifier type describes the interface for objects that can send create subscriptions
|
|
||||||
type Notifier interface { |
|
||||||
// Create a new subscription. The given callback is called when this subscription
|
|
||||||
// is cancelled (e.g. client send an unsubscribe, connection closed).
|
|
||||||
NewSubscription(UnsubscribeCallback) (Subscription, error) |
|
||||||
// Cancel subscription
|
|
||||||
Unsubscribe(id string) error |
|
||||||
} |
|
||||||
|
|
||||||
type notifierKey struct{} |
|
||||||
|
|
||||||
// NotifierFromContext returns the Notifier value stored in ctx, if any.
|
|
||||||
func NotifierFromContext(ctx context.Context) (Notifier, bool) { |
|
||||||
n, ok := ctx.Value(notifierKey{}).(Notifier) |
|
||||||
return n, ok |
|
||||||
} |
|
||||||
|
|
||||||
// Subscription defines the interface for objects that can notify subscribers
|
|
||||||
type Subscription interface { |
|
||||||
// Inform client of an event
|
|
||||||
Notify(data interface{}) error |
|
||||||
// Unique identifier
|
|
||||||
ID() string |
|
||||||
// Cancel subscription
|
|
||||||
Cancel() error |
|
||||||
} |
|
||||||
|
|
||||||
// bufferedSubscription is a subscription that uses a bufferedNotifier to send
|
|
||||||
// notifications to subscribers.
|
|
||||||
type bufferedSubscription struct { |
|
||||||
id string |
|
||||||
unsubOnce sync.Once // call unsub method once
|
|
||||||
unsub UnsubscribeCallback // called on Unsubscribed
|
|
||||||
notifier *bufferedNotifier // forward notifications to
|
|
||||||
pending chan interface{} // closed when active
|
|
||||||
flushed chan interface{} // closed when all buffered notifications are send
|
|
||||||
lastNotification time.Time // last time a notification was send
|
|
||||||
} |
|
||||||
|
|
||||||
// ID returns the subscription identifier that the client uses to refer to this instance.
|
|
||||||
func (s *bufferedSubscription) ID() string { |
|
||||||
return s.id |
|
||||||
} |
|
||||||
|
|
||||||
// Cancel informs the notifier that this subscription is cancelled by the API
|
|
||||||
func (s *bufferedSubscription) Cancel() error { |
|
||||||
return s.notifier.Unsubscribe(s.id) |
|
||||||
} |
|
||||||
|
|
||||||
// Notify the subscriber of a particular event.
|
|
||||||
func (s *bufferedSubscription) Notify(data interface{}) error { |
|
||||||
return s.notifier.send(s.id, data) |
|
||||||
} |
|
||||||
|
|
||||||
// bufferedNotifier is a notifier that queues notifications in an internal queue and
|
|
||||||
// send them as fast as possible to the client from this queue. It will stop if the
|
|
||||||
// queue grows past a given size.
|
|
||||||
type bufferedNotifier struct { |
|
||||||
codec ServerCodec // underlying connection
|
|
||||||
mu sync.Mutex // guard internal state
|
|
||||||
subscriptions map[string]*bufferedSubscription // keep track of subscriptions associated with codec
|
|
||||||
queueSize int // max number of items in queue
|
|
||||||
queue chan *notification // notification queue
|
|
||||||
stopped bool // indication if this notifier is ordered to stop
|
|
||||||
} |
|
||||||
|
|
||||||
// newBufferedNotifier returns a notifier that queues notifications in an internal queue
|
|
||||||
// from which notifications are send as fast as possible to the client. If the queue size
|
|
||||||
// limit is reached (client is unable to keep up) it will stop and closes the codec.
|
|
||||||
func newBufferedNotifier(codec ServerCodec, size int) *bufferedNotifier { |
|
||||||
notifier := &bufferedNotifier{ |
|
||||||
codec: codec, |
|
||||||
subscriptions: make(map[string]*bufferedSubscription), |
|
||||||
queue: make(chan *notification, size), |
|
||||||
queueSize: size, |
|
||||||
} |
|
||||||
|
|
||||||
go notifier.run() |
|
||||||
|
|
||||||
return notifier |
|
||||||
} |
|
||||||
|
|
||||||
// NewSubscription creates a new subscription that forwards events to this instance internal
|
|
||||||
// queue. The given callback is called when the subscription is unsubscribed/cancelled.
|
|
||||||
func (n *bufferedNotifier) NewSubscription(callback UnsubscribeCallback) (Subscription, error) { |
|
||||||
id, err := newSubscriptionID() |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
|
|
||||||
n.mu.Lock() |
|
||||||
defer n.mu.Unlock() |
|
||||||
|
|
||||||
if n.stopped { |
|
||||||
return nil, errNotifierStopped |
|
||||||
} |
|
||||||
|
|
||||||
sub := &bufferedSubscription{ |
|
||||||
id: id, |
|
||||||
unsub: callback, |
|
||||||
notifier: n, |
|
||||||
pending: make(chan interface{}), |
|
||||||
flushed: make(chan interface{}), |
|
||||||
lastNotification: time.Now(), |
|
||||||
} |
|
||||||
|
|
||||||
n.subscriptions[id] = sub |
|
||||||
|
|
||||||
return sub, nil |
|
||||||
} |
|
||||||
|
|
||||||
// Remove the given subscription. If subscription is not found notificationNotFoundErr is returned.
|
|
||||||
func (n *bufferedNotifier) Unsubscribe(subid string) error { |
|
||||||
n.mu.Lock() |
|
||||||
sub, found := n.subscriptions[subid] |
|
||||||
n.mu.Unlock() |
|
||||||
|
|
||||||
if found { |
|
||||||
// send the unsubscribe signal, this will cause the notifier not to accept new events
|
|
||||||
// for this subscription and will close the flushed channel after the last (buffered)
|
|
||||||
// notification was send to the client.
|
|
||||||
if err := n.send(subid, unsubSignal); err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
|
|
||||||
// wait for confirmation that all (buffered) events are send for this subscription.
|
|
||||||
// this ensures that the unsubscribe method response is not send before all buffered
|
|
||||||
// events for this subscription are send.
|
|
||||||
<-sub.flushed |
|
||||||
|
|
||||||
return nil |
|
||||||
} |
|
||||||
|
|
||||||
return ErrNotificationNotFound |
|
||||||
} |
|
||||||
|
|
||||||
// Send enques the given data for the subscription with public ID on the internal queue. t returns
|
|
||||||
// an error when the notifier is stopped or the queue is full. If data is the unsubscribe signal it
|
|
||||||
// will remove the subscription with the given id from the subscription collection.
|
|
||||||
func (n *bufferedNotifier) send(id string, data interface{}) error { |
|
||||||
n.mu.Lock() |
|
||||||
defer n.mu.Unlock() |
|
||||||
|
|
||||||
if n.stopped { |
|
||||||
return errNotifierStopped |
|
||||||
} |
|
||||||
|
|
||||||
var ( |
|
||||||
subscription *bufferedSubscription |
|
||||||
found bool |
|
||||||
) |
|
||||||
|
|
||||||
// check if subscription is associated with this connection, it might be cancelled
|
|
||||||
// (subscribe/connection closed)
|
|
||||||
if subscription, found = n.subscriptions[id]; !found { |
|
||||||
glog.V(logger.Error).Infof("received notification for unknown subscription %s\n", id) |
|
||||||
return ErrNotificationNotFound |
|
||||||
} |
|
||||||
|
|
||||||
// received the unsubscribe signal. Add it to the queue to make sure any pending notifications
|
|
||||||
// for this subscription are send. When the run loop receives this singal it will signal that
|
|
||||||
// all pending subscriptions are flushed and that the confirmation of the unsubscribe can be
|
|
||||||
// send to the user. Remove the subscriptions to make sure new notifications are not accepted.
|
|
||||||
if data == unsubSignal { |
|
||||||
delete(n.subscriptions, id) |
|
||||||
if subscription.unsub != nil { |
|
||||||
subscription.unsubOnce.Do(func() { subscription.unsub(id) }) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
subscription.lastNotification = time.Now() |
|
||||||
|
|
||||||
if len(n.queue) >= n.queueSize { |
|
||||||
glog.V(logger.Warn).Infoln("too many buffered notifications -> close connection") |
|
||||||
n.codec.Close() |
|
||||||
return errNotificationQueueFull |
|
||||||
} |
|
||||||
|
|
||||||
n.queue <- ¬ification{subscription, data} |
|
||||||
return nil |
|
||||||
} |
|
||||||
|
|
||||||
// run reads notifications from the internal queue and sends them to the client. In case of an
|
|
||||||
// error, or when the codec is closed it will cancel all active subscriptions and returns.
|
|
||||||
func (n *bufferedNotifier) run() { |
|
||||||
defer func() { |
|
||||||
n.mu.Lock() |
|
||||||
defer n.mu.Unlock() |
|
||||||
|
|
||||||
n.stopped = true |
|
||||||
close(n.queue) |
|
||||||
|
|
||||||
// on exit call unsubscribe callback
|
|
||||||
for id, sub := range n.subscriptions { |
|
||||||
if sub.unsub != nil { |
|
||||||
sub.unsubOnce.Do(func() { sub.unsub(id) }) |
|
||||||
} |
|
||||||
close(sub.flushed) |
|
||||||
delete(n.subscriptions, id) |
|
||||||
} |
|
||||||
}() |
|
||||||
|
|
||||||
for { |
|
||||||
select { |
|
||||||
case notification := <-n.queue: |
|
||||||
// It can happen that an event is raised before the RPC server was able to send the sub
|
|
||||||
// id to the client. Therefore subscriptions are marked as pending until the sub id was
|
|
||||||
// send. The RPC server will activate the subscription by closing the pending chan.
|
|
||||||
<-notification.sub.pending |
|
||||||
|
|
||||||
if notification.data == unsubSignal { |
|
||||||
// unsubSignal is the last accepted message for this subscription. Raise the signal
|
|
||||||
// that all buffered notifications are sent by closing the flushed channel. This
|
|
||||||
// indicates that the response for the unsubscribe can be send to the client.
|
|
||||||
close(notification.sub.flushed) |
|
||||||
} else { |
|
||||||
msg := n.codec.CreateNotification(notification.sub.id, notification.data) |
|
||||||
if err := n.codec.Write(msg); err != nil { |
|
||||||
n.codec.Close() |
|
||||||
// unable to send notification to client, unsubscribe all subscriptions
|
|
||||||
glog.V(logger.Warn).Infof("unable to send notification - %v\n", err) |
|
||||||
return |
|
||||||
} |
|
||||||
} |
|
||||||
case <-n.codec.Closed(): // connection was closed
|
|
||||||
glog.V(logger.Debug).Infoln("codec closed, stop subscriptions") |
|
||||||
return |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// Marks the subscription as active. This will causes the notifications for this subscription to be
|
|
||||||
// forwarded to the client.
|
|
||||||
func (n *bufferedNotifier) activate(subid string) { |
|
||||||
n.mu.Lock() |
|
||||||
defer n.mu.Unlock() |
|
||||||
|
|
||||||
if sub, found := n.subscriptions[subid]; found { |
|
||||||
close(sub.pending) |
|
||||||
} |
|
||||||
} |
|
@ -0,0 +1,135 @@ |
|||||||
|
// Copyright 2016 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package rpc |
||||||
|
|
||||||
|
import ( |
||||||
|
"errors" |
||||||
|
"sync" |
||||||
|
|
||||||
|
"golang.org/x/net/context" |
||||||
|
) |
||||||
|
|
||||||
|
var ( |
||||||
|
// ErrNotificationsUnsupported is returned when the connection doesn't support notifications
|
||||||
|
ErrNotificationsUnsupported = errors.New("notifications not supported") |
||||||
|
// ErrNotificationNotFound is returned when the notification for the given id is not found
|
||||||
|
ErrSubscriptionNotFound = errors.New("subscription not found") |
||||||
|
) |
||||||
|
|
||||||
|
// ID defines a psuedo random number that is used to identify RPC subscriptions.
|
||||||
|
type ID string |
||||||
|
|
||||||
|
// a Subscription is created by a notifier and tight to that notifier. The client can use
|
||||||
|
// this subscription to wait for an unsubscribe request for the client, see Err().
|
||||||
|
type Subscription struct { |
||||||
|
ID ID |
||||||
|
err chan error // closed on unsubscribe
|
||||||
|
} |
||||||
|
|
||||||
|
// Err returns a channel that is closed when the client send an unsubscribe request.
|
||||||
|
func (s *Subscription) Err() <-chan error { |
||||||
|
return s.err |
||||||
|
} |
||||||
|
|
||||||
|
// notifierKey is used to store a notifier within the connection context.
|
||||||
|
type notifierKey struct{} |
||||||
|
|
||||||
|
// Notifier is tight to a RPC connection that supports subscriptions.
|
||||||
|
// Server callbacks use the notifier to send notifications.
|
||||||
|
type Notifier struct { |
||||||
|
codec ServerCodec |
||||||
|
subMu sync.RWMutex // guards active and inactive maps
|
||||||
|
stopped bool |
||||||
|
active map[ID]*Subscription |
||||||
|
inactive map[ID]*Subscription |
||||||
|
} |
||||||
|
|
||||||
|
// newNotifier creates a new notifier that can be used to send subscription
|
||||||
|
// notifications to the client.
|
||||||
|
func newNotifier(codec ServerCodec) *Notifier { |
||||||
|
return &Notifier{ |
||||||
|
codec: codec, |
||||||
|
active: make(map[ID]*Subscription), |
||||||
|
inactive: make(map[ID]*Subscription), |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// NotifierFromContext returns the Notifier value stored in ctx, if any.
|
||||||
|
func NotifierFromContext(ctx context.Context) (*Notifier, bool) { |
||||||
|
n, ok := ctx.Value(notifierKey{}).(*Notifier) |
||||||
|
return n, ok |
||||||
|
} |
||||||
|
|
||||||
|
// CreateSubscription returns a new subscription that is coupled to the
|
||||||
|
// RPC connection. By default subscriptions are inactive and notifications
|
||||||
|
// are dropped until the subscription is marked as active. This is done
|
||||||
|
// by the RPC server after the subscription ID is send to the client.
|
||||||
|
func (n *Notifier) CreateSubscription() *Subscription { |
||||||
|
s := &Subscription{NewID(), make(chan error)} |
||||||
|
n.subMu.Lock() |
||||||
|
n.inactive[s.ID] = s |
||||||
|
n.subMu.Unlock() |
||||||
|
return s |
||||||
|
} |
||||||
|
|
||||||
|
// Notify sends a notification to the client with the given data as payload.
|
||||||
|
// If an error occurs the RPC connection is closed and the error is returned.
|
||||||
|
func (n *Notifier) Notify(id ID, data interface{}) error { |
||||||
|
n.subMu.RLock() |
||||||
|
defer n.subMu.RUnlock() |
||||||
|
|
||||||
|
_, active := n.active[id] |
||||||
|
if active { |
||||||
|
notification := n.codec.CreateNotification(string(id), data) |
||||||
|
if err := n.codec.Write(notification); err != nil { |
||||||
|
n.codec.Close() |
||||||
|
return err |
||||||
|
} |
||||||
|
} |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
// Closed returns a channel that is closed when the RPC connection is closed.
|
||||||
|
func (n *Notifier) Closed() <-chan interface{} { |
||||||
|
return n.codec.Closed() |
||||||
|
} |
||||||
|
|
||||||
|
// unsubscribe a subscription.
|
||||||
|
// If the subscription could not be found ErrSubscriptionNotFound is returned.
|
||||||
|
func (n *Notifier) unsubscribe(id ID) error { |
||||||
|
n.subMu.Lock() |
||||||
|
defer n.subMu.Unlock() |
||||||
|
if s, found := n.active[id]; found { |
||||||
|
close(s.err) |
||||||
|
delete(n.active, id) |
||||||
|
return nil |
||||||
|
} |
||||||
|
return ErrSubscriptionNotFound |
||||||
|
} |
||||||
|
|
||||||
|
// activate enables a subscription. Until a subscription is enabled all
|
||||||
|
// notifications are dropped. This method is called by the RPC server after
|
||||||
|
// the subscription ID was sent to client. This prevents notifications being
|
||||||
|
// send to the client before the subscription ID is send to the client.
|
||||||
|
func (n *Notifier) activate(id ID) { |
||||||
|
n.subMu.Lock() |
||||||
|
defer n.subMu.Unlock() |
||||||
|
if sub, found := n.inactive[id]; found { |
||||||
|
n.active[id] = sub |
||||||
|
delete(n.inactive, id) |
||||||
|
} |
||||||
|
} |
Loading…
Reference in new issue