@ -32,9 +32,6 @@ import (
const (
const (
stopPendingRequestTimeout = 3 * time . Second // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped
stopPendingRequestTimeout = 3 * time . Second // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped
// NotifierContextKey is the key where the notifier associated with the codec is stored in the context
NotifierContextKey = 1
notificationBufferSize = 10000 // max buffered notifications before codec is closed
notificationBufferSize = 10000 // max buffered notifications before codec is closed
DefaultIPCApis = "admin,eth,debug,miner,net,shh,txpool,personal,web3"
DefaultIPCApis = "admin,eth,debug,miner,net,shh,txpool,personal,web3"
@ -171,7 +168,7 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
// to send notification to clients. It is thight to the codec/connection. If the
// to send notification to clients. It is thight to the codec/connection. If the
// connection is closed the notifier will stop and cancels all active subscriptions.
// connection is closed the notifier will stop and cancels all active subscriptions.
if options & OptionSubscriptions == OptionSubscriptions {
if options & OptionSubscriptions == OptionSubscriptions {
ctx = context . WithValue ( ctx , NotifierContextKey , newBufferedNotifier ( codec , notificationBufferSize ) )
ctx = context . WithValue ( ctx , notifierKey { } , newBufferedNotifier ( codec , notificationBufferSize ) )
}
}
s . codecsMu . Lock ( )
s . codecsMu . Lock ( )
if atomic . LoadInt32 ( & s . run ) != 1 { // server stopped
if atomic . LoadInt32 ( & s . run ) != 1 { // server stopped
@ -275,7 +272,7 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
if req . isUnsubscribe { // cancel subscription, first param must be the subscription id
if req . isUnsubscribe { // cancel subscription, first param must be the subscription id
if len ( req . args ) >= 1 && req . args [ 0 ] . Kind ( ) == reflect . String {
if len ( req . args ) >= 1 && req . args [ 0 ] . Kind ( ) == reflect . String {
notifier , supported := ctx . Value ( NotifierContextKey ) . ( * bufferedNotifier )
notifier , supported := NotifierFromContext ( ctx )
if ! supported { // interface doesn't support subscriptions (e.g. http)
if ! supported { // interface doesn't support subscriptions (e.g. http)
return codec . CreateErrorResponse ( & req . id , & callbackError { ErrNotificationsUnsupported . Error ( ) } ) , nil
return codec . CreateErrorResponse ( & req . id , & callbackError { ErrNotificationsUnsupported . Error ( ) } ) , nil
}
}
@ -298,8 +295,8 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
// active the subscription after the sub id was successful sent to the client
// active the subscription after the sub id was successful sent to the client
activateSub := func ( ) {
activateSub := func ( ) {
notifier , _ := ctx . Value ( NotifierContextKey ) . ( * bufferedNotifier )
notifier , _ := NotifierFromContext ( ctx )
notifier . activate ( subid )
notifier . ( * bufferedNotifier ) . activate ( subid )
}
}
return codec . CreateResponse ( req . id , subid ) , activateSub
return codec . CreateResponse ( req . id , subid ) , activateSub