mirror of https://github.com/ethereum/go-ethereum
This commit introduces a new Subscription type, which is synonymous with ethereum.Subscription. It also adds a couple of utilities that make working with Subscriptions easier. The mot complex utility is Feed, a synchronisation device that implements broadcast subscriptions. Feed is slightly faster than TypeMux and will replace uses of TypeMux across the go-ethereum codebase in the future.pull/3605/head
parent
9b62facdd4
commit
6d5e100d0d
@ -0,0 +1,73 @@ |
||||
// Copyright 2017 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package event_test |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/ethereum/go-ethereum/event" |
||||
) |
||||
|
||||
func ExampleFeed_acknowledgedEvents() { |
||||
// This example shows how the return value of Send can be used for request/reply
|
||||
// interaction between event consumers and producers.
|
||||
var feed event.Feed |
||||
type ackedEvent struct { |
||||
i int |
||||
ack chan<- struct{} |
||||
} |
||||
|
||||
// Consumers wait for events on the feed and acknowledge processing.
|
||||
done := make(chan struct{}) |
||||
defer close(done) |
||||
for i := 0; i < 3; i++ { |
||||
ch := make(chan ackedEvent, 100) |
||||
sub := feed.Subscribe(ch) |
||||
go func() { |
||||
defer sub.Unsubscribe() |
||||
for { |
||||
select { |
||||
case ev := <-ch: |
||||
fmt.Println(ev.i) // "process" the event
|
||||
ev.ack <- struct{}{} |
||||
case <-done: |
||||
return |
||||
} |
||||
} |
||||
}() |
||||
} |
||||
|
||||
// The producer sends values of type ackedEvent with increasing values of i.
|
||||
// It waits for all consumers to acknowledge before sending the next event.
|
||||
for i := 0; i < 3; i++ { |
||||
acksignal := make(chan struct{}) |
||||
n := feed.Send(ackedEvent{i, acksignal}) |
||||
for ack := 0; ack < n; ack++ { |
||||
<-acksignal |
||||
} |
||||
} |
||||
// Output:
|
||||
// 0
|
||||
// 0
|
||||
// 0
|
||||
// 1
|
||||
// 1
|
||||
// 1
|
||||
// 2
|
||||
// 2
|
||||
// 2
|
||||
} |
@ -0,0 +1,128 @@ |
||||
// Copyright 2017 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package event_test |
||||
|
||||
import ( |
||||
"fmt" |
||||
"sync" |
||||
|
||||
"github.com/ethereum/go-ethereum/event" |
||||
) |
||||
|
||||
// This example demonstrates how SubscriptionScope can be used to control the lifetime of
|
||||
// subscriptions.
|
||||
//
|
||||
// Our example program consists of two servers, each of which performs a calculation when
|
||||
// requested. The servers also allow subscribing to results of all computations.
|
||||
type divServer struct{ results event.Feed } |
||||
type mulServer struct{ results event.Feed } |
||||
|
||||
func (s *divServer) do(a, b int) int { |
||||
r := a / b |
||||
s.results.Send(r) |
||||
return r |
||||
} |
||||
|
||||
func (s *mulServer) do(a, b int) int { |
||||
r := a * b |
||||
s.results.Send(r) |
||||
return r |
||||
} |
||||
|
||||
// The servers are contained in an App. The app controls the servers and exposes them
|
||||
// through its API.
|
||||
type App struct { |
||||
divServer |
||||
mulServer |
||||
scope event.SubscriptionScope |
||||
} |
||||
|
||||
func (s *App) Calc(op byte, a, b int) int { |
||||
switch op { |
||||
case '/': |
||||
return s.divServer.do(a, b) |
||||
case '*': |
||||
return s.mulServer.do(a, b) |
||||
default: |
||||
panic("invalid op") |
||||
} |
||||
} |
||||
|
||||
// The app's SubscribeResults method starts sending calculation results to the given
|
||||
// channel. Subscriptions created through this method are tied to the lifetime of the App
|
||||
// because they are registered in the scope.
|
||||
func (s *App) SubscribeResults(op byte, ch chan<- int) event.Subscription { |
||||
switch op { |
||||
case '/': |
||||
return s.scope.Track(s.divServer.results.Subscribe(ch)) |
||||
case '*': |
||||
return s.scope.Track(s.mulServer.results.Subscribe(ch)) |
||||
default: |
||||
panic("invalid op") |
||||
} |
||||
} |
||||
|
||||
// Stop stops the App, closing all subscriptions created through SubscribeResults.
|
||||
func (s *App) Stop() { |
||||
s.scope.Close() |
||||
} |
||||
|
||||
func ExampleSubscriptionScope() { |
||||
// Create the app.
|
||||
var ( |
||||
app App |
||||
wg sync.WaitGroup |
||||
divs = make(chan int) |
||||
muls = make(chan int) |
||||
) |
||||
|
||||
// Run a subscriber in the background.
|
||||
divsub := app.SubscribeResults('/', divs) |
||||
mulsub := app.SubscribeResults('*', muls) |
||||
wg.Add(1) |
||||
go func() { |
||||
defer wg.Done() |
||||
defer fmt.Println("subscriber exited") |
||||
defer divsub.Unsubscribe() |
||||
defer mulsub.Unsubscribe() |
||||
for { |
||||
select { |
||||
case result := <-divs: |
||||
fmt.Println("division happened:", result) |
||||
case result := <-muls: |
||||
fmt.Println("multiplication happened:", result) |
||||
case <-divsub.Err(): |
||||
return |
||||
case <-mulsub.Err(): |
||||
return |
||||
} |
||||
} |
||||
}() |
||||
|
||||
// Interact with the app.
|
||||
app.Calc('/', 22, 11) |
||||
app.Calc('*', 3, 4) |
||||
|
||||
// Stop the app. This shuts down the subscriptions, causing the subscriber to exit.
|
||||
app.Stop() |
||||
wg.Wait() |
||||
|
||||
// Output:
|
||||
// division happened: 2
|
||||
// multiplication happened: 12
|
||||
// subscriber exited
|
||||
} |
@ -0,0 +1,56 @@ |
||||
// Copyright 2017 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package event_test |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/ethereum/go-ethereum/event" |
||||
) |
||||
|
||||
func ExampleNewSubscription() { |
||||
// Create a subscription that sends 10 integers on ch.
|
||||
ch := make(chan int) |
||||
sub := event.NewSubscription(func(quit <-chan struct{}) error { |
||||
for i := 0; i < 10; i++ { |
||||
select { |
||||
case ch <- i: |
||||
case <-quit: |
||||
fmt.Println("unsubscribed") |
||||
return nil |
||||
} |
||||
} |
||||
return nil |
||||
}) |
||||
|
||||
// This is the consumer. It reads 5 integers, then aborts the subscription.
|
||||
// Note that Unsubscribe waits until the producer has shut down.
|
||||
for i := range ch { |
||||
fmt.Println(i) |
||||
if i == 4 { |
||||
sub.Unsubscribe() |
||||
break |
||||
} |
||||
} |
||||
// Output:
|
||||
// 0
|
||||
// 1
|
||||
// 2
|
||||
// 3
|
||||
// 4
|
||||
// unsubscribed
|
||||
} |
@ -0,0 +1,240 @@ |
||||
// Copyright 2016 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package event |
||||
|
||||
import ( |
||||
"errors" |
||||
"reflect" |
||||
"sync" |
||||
) |
||||
|
||||
var errBadChannel = errors.New("event: Subscribe argument does not have sendable channel type") |
||||
|
||||
// Feed implements one-to-many subscriptions where the carrier of events is a channel.
|
||||
// Values sent to a Feed are delivered to all subscribed channels simultaneously.
|
||||
//
|
||||
// Feeds can only be used with a single type. The type is determined by the first Send or
|
||||
// Subscribe operation. Subsequent calls to these methods panic if the type does not
|
||||
// match.
|
||||
//
|
||||
// The zero value is ready to use.
|
||||
type Feed struct { |
||||
sendLock chan struct{} // one-element buffer, empty when held
|
||||
removeSub chan interface{} // interrupts Send
|
||||
sendCases caseList // the active set of select cases used by Send
|
||||
|
||||
// The inbox holds newly subscribed channels until they are added to sendCases.
|
||||
mu sync.Mutex |
||||
inbox caseList |
||||
etype reflect.Type |
||||
closed bool |
||||
} |
||||
|
||||
type feedTypeError struct { |
||||
got, want reflect.Type |
||||
op string |
||||
} |
||||
|
||||
func (e feedTypeError) Error() string { |
||||
return "event: wrong type in " + e.op + " got " + e.got.String() + ", want " + e.want.String() |
||||
} |
||||
|
||||
func (f *Feed) init() { |
||||
if f.sendLock != nil { |
||||
return |
||||
} |
||||
f.removeSub = make(chan interface{}) |
||||
f.sendLock = make(chan struct{}, 1) |
||||
f.sendLock <- struct{}{} |
||||
f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}} |
||||
} |
||||
|
||||
// Subscribe adds a channel to the feed. Future sends will be delivered on the channel
|
||||
// until the subscription is canceled. All channels added must have the same element type.
|
||||
//
|
||||
// The channel should have ample buffer space to avoid blocking other subscribers.
|
||||
func (f *Feed) Subscribe(channel interface{}) Subscription { |
||||
chanval := reflect.ValueOf(channel) |
||||
chantyp := chanval.Type() |
||||
if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 { |
||||
panic(errBadChannel) |
||||
} |
||||
sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)} |
||||
|
||||
f.mu.Lock() |
||||
defer f.mu.Unlock() |
||||
f.init() |
||||
if !f.typecheck(chantyp.Elem()) { |
||||
panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)}) |
||||
} |
||||
// Add the select case to the inbox.
|
||||
// The next Send will add it to f.sendCases.
|
||||
cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval} |
||||
f.inbox = append(f.inbox, cas) |
||||
return sub |
||||
} |
||||
|
||||
// note: callers must hold f.mu
|
||||
func (f *Feed) typecheck(typ reflect.Type) bool { |
||||
if f.etype == nil { |
||||
f.etype = typ |
||||
return true |
||||
} |
||||
return f.etype == typ |
||||
} |
||||
|
||||
func (f *Feed) remove(sub *feedSub) { |
||||
// Delete from inbox first, which covers channels
|
||||
// that have not been added to f.sendCases yet.
|
||||
ch := sub.channel.Interface() |
||||
f.mu.Lock() |
||||
index := f.inbox.find(ch) |
||||
if index != -1 { |
||||
f.inbox = f.inbox.delete(index) |
||||
f.mu.Unlock() |
||||
return |
||||
} |
||||
f.mu.Unlock() |
||||
|
||||
select { |
||||
case f.removeSub <- ch: |
||||
// Send will remove the channel from f.sendCases.
|
||||
case <-f.sendLock: |
||||
// No Send is in progress, delete the channel now that we have the send lock.
|
||||
f.sendCases = f.sendCases.delete(f.sendCases.find(ch)) |
||||
f.sendLock <- struct{}{} |
||||
} |
||||
} |
||||
|
||||
// Send delivers to all subscribed channels simultaneously.
|
||||
// It returns the number of subscribers that the value was sent to.
|
||||
func (f *Feed) Send(value interface{}) (nsent int) { |
||||
f.mu.Lock() |
||||
f.init() |
||||
<-f.sendLock |
||||
// Add new subscriptions from the inbox, then clear it.
|
||||
f.sendCases = append(f.sendCases, f.inbox...) |
||||
for i := range f.inbox { |
||||
f.inbox[i] = reflect.SelectCase{} |
||||
} |
||||
f.inbox = f.inbox[:0] |
||||
f.mu.Unlock() |
||||
|
||||
// Set the sent value on all channels.
|
||||
rvalue := reflect.ValueOf(value) |
||||
if !f.typecheck(rvalue.Type()) { |
||||
f.sendLock <- struct{}{} |
||||
panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype}) |
||||
} |
||||
for i := 1; i < len(f.sendCases); i++ { |
||||
f.sendCases[i].Send = rvalue |
||||
} |
||||
|
||||
// Send until all channels except removeSub have been chosen.
|
||||
cases := f.sendCases |
||||
for { |
||||
// Fast path: try sending without blocking before adding to the select set.
|
||||
// This should usually succeed if subscribers are fast enough and have free
|
||||
// buffer space.
|
||||
for i := 1; i < len(cases); i++ { |
||||
if cases[i].Chan.TrySend(rvalue) { |
||||
cases = cases.deactivate(i) |
||||
nsent++ |
||||
} |
||||
} |
||||
if len(cases) == 1 { |
||||
break |
||||
} |
||||
// Select on all the receivers, waiting for them to unblock.
|
||||
chosen, recv, _ := reflect.Select(cases) |
||||
if chosen == 0 /* <-f.removeSub */ { |
||||
index := f.sendCases.find(recv.Interface()) |
||||
f.sendCases = f.sendCases.delete(index) |
||||
if index >= 0 && index < len(cases) { |
||||
cases = f.sendCases[:len(cases)-1] |
||||
} |
||||
} else { |
||||
cases = cases.deactivate(chosen) |
||||
nsent++ |
||||
} |
||||
} |
||||
|
||||
// Forget about the sent value and hand off the send lock.
|
||||
for i := 1; i < len(f.sendCases); i++ { |
||||
f.sendCases[i].Send = reflect.Value{} |
||||
} |
||||
f.sendLock <- struct{}{} |
||||
return nsent |
||||
} |
||||
|
||||
type feedSub struct { |
||||
feed *Feed |
||||
channel reflect.Value |
||||
errOnce sync.Once |
||||
err chan error |
||||
} |
||||
|
||||
func (sub *feedSub) Unsubscribe() { |
||||
sub.errOnce.Do(func() { |
||||
sub.feed.remove(sub) |
||||
close(sub.err) |
||||
}) |
||||
} |
||||
|
||||
func (sub *feedSub) Err() <-chan error { |
||||
return sub.err |
||||
} |
||||
|
||||
type caseList []reflect.SelectCase |
||||
|
||||
// find returns the index of a case containing the given channel.
|
||||
func (cs caseList) find(channel interface{}) int { |
||||
for i, cas := range cs { |
||||
if cas.Chan.Interface() == channel { |
||||
return i |
||||
} |
||||
} |
||||
return -1 |
||||
} |
||||
|
||||
// delete removes the given case from cs.
|
||||
func (cs caseList) delete(index int) caseList { |
||||
return append(cs[:index], cs[index+1:]...) |
||||
} |
||||
|
||||
// deactivate moves the case at index into the non-accessible portion of the cs slice.
|
||||
func (cs caseList) deactivate(index int) caseList { |
||||
last := len(cs) - 1 |
||||
cs[index], cs[last] = cs[last], cs[index] |
||||
return cs[:last] |
||||
} |
||||
|
||||
// func (cs caseList) String() string {
|
||||
// s := "["
|
||||
// for i, cas := range cs {
|
||||
// if i != 0 {
|
||||
// s += ", "
|
||||
// }
|
||||
// switch cas.Dir {
|
||||
// case reflect.SelectSend:
|
||||
// s += fmt.Sprintf("%v<-", cas.Chan.Interface())
|
||||
// case reflect.SelectRecv:
|
||||
// s += fmt.Sprintf("<-%v", cas.Chan.Interface())
|
||||
// }
|
||||
// }
|
||||
// return s + "]"
|
||||
// }
|
@ -0,0 +1,226 @@ |
||||
// Copyright 2016 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package event |
||||
|
||||
import ( |
||||
"fmt" |
||||
"reflect" |
||||
"sync" |
||||
"testing" |
||||
"time" |
||||
) |
||||
|
||||
func TestFeedPanics(t *testing.T) { |
||||
{ |
||||
var f Feed |
||||
f.Send(int(2)) |
||||
want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))} |
||||
if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil { |
||||
t.Error(err) |
||||
} |
||||
} |
||||
{ |
||||
var f Feed |
||||
ch := make(chan int) |
||||
f.Subscribe(ch) |
||||
want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))} |
||||
if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil { |
||||
t.Error(err) |
||||
} |
||||
} |
||||
{ |
||||
var f Feed |
||||
f.Send(int(2)) |
||||
want := feedTypeError{op: "Subscribe", got: reflect.TypeOf(make(chan uint64)), want: reflect.TypeOf(make(chan<- int))} |
||||
if err := checkPanic(want, func() { f.Subscribe(make(chan uint64)) }); err != nil { |
||||
t.Error(err) |
||||
} |
||||
} |
||||
{ |
||||
var f Feed |
||||
if err := checkPanic(errBadChannel, func() { f.Subscribe(make(<-chan int)) }); err != nil { |
||||
t.Error(err) |
||||
} |
||||
} |
||||
{ |
||||
var f Feed |
||||
if err := checkPanic(errBadChannel, func() { f.Subscribe(int(0)) }); err != nil { |
||||
t.Error(err) |
||||
} |
||||
} |
||||
} |
||||
|
||||
func checkPanic(want error, fn func()) (err error) { |
||||
defer func() { |
||||
panic := recover() |
||||
if panic == nil { |
||||
err = fmt.Errorf("didn't panic") |
||||
} else if !reflect.DeepEqual(panic, want) { |
||||
err = fmt.Errorf("panicked with wrong error: got %q, want %q", panic, want) |
||||
} |
||||
}() |
||||
fn() |
||||
return nil |
||||
} |
||||
|
||||
func TestFeed(t *testing.T) { |
||||
var feed Feed |
||||
var done, subscribed sync.WaitGroup |
||||
subscriber := func(i int) { |
||||
defer done.Done() |
||||
|
||||
subchan := make(chan int) |
||||
sub := feed.Subscribe(subchan) |
||||
timeout := time.NewTimer(2 * time.Second) |
||||
subscribed.Done() |
||||
|
||||
select { |
||||
case v := <-subchan: |
||||
if v != 1 { |
||||
t.Errorf("%d: received value %d, want 1", i, v) |
||||
} |
||||
case <-timeout.C: |
||||
t.Errorf("%d: receive timeout", i) |
||||
} |
||||
|
||||
sub.Unsubscribe() |
||||
select { |
||||
case _, ok := <-sub.Err(): |
||||
if ok { |
||||
t.Errorf("%d: error channel not closed after unsubscribe", i) |
||||
} |
||||
case <-timeout.C: |
||||
t.Errorf("%d: unsubscribe timeout", i) |
||||
} |
||||
} |
||||
|
||||
const n = 1000 |
||||
done.Add(n) |
||||
subscribed.Add(n) |
||||
for i := 0; i < n; i++ { |
||||
go subscriber(i) |
||||
} |
||||
subscribed.Wait() |
||||
if nsent := feed.Send(1); nsent != n { |
||||
t.Errorf("first send delivered %d times, want %d", nsent, n) |
||||
} |
||||
if nsent := feed.Send(2); nsent != 0 { |
||||
t.Errorf("second send delivered %d times, want 0", nsent) |
||||
} |
||||
done.Wait() |
||||
} |
||||
|
||||
func TestFeedSubscribeSameChannel(t *testing.T) { |
||||
var ( |
||||
feed Feed |
||||
done sync.WaitGroup |
||||
ch = make(chan int) |
||||
sub1 = feed.Subscribe(ch) |
||||
sub2 = feed.Subscribe(ch) |
||||
_ = feed.Subscribe(ch) |
||||
) |
||||
expectSends := func(value, n int) { |
||||
if nsent := feed.Send(value); nsent != n { |
||||
t.Errorf("send delivered %d times, want %d", nsent, n) |
||||
} |
||||
done.Done() |
||||
} |
||||
expectRecv := func(wantValue, n int) { |
||||
for i := 0; i < n; i++ { |
||||
if v := <-ch; v != wantValue { |
||||
t.Errorf("received %d, want %d", v, wantValue) |
||||
} |
||||
} |
||||
} |
||||
|
||||
done.Add(1) |
||||
go expectSends(1, 3) |
||||
expectRecv(1, 3) |
||||
done.Wait() |
||||
|
||||
sub1.Unsubscribe() |
||||
|
||||
done.Add(1) |
||||
go expectSends(2, 2) |
||||
expectRecv(2, 2) |
||||
done.Wait() |
||||
|
||||
sub2.Unsubscribe() |
||||
|
||||
done.Add(1) |
||||
go expectSends(3, 1) |
||||
expectRecv(3, 1) |
||||
done.Wait() |
||||
} |
||||
|
||||
func TestFeedUnsubscribeFromInbox(t *testing.T) { |
||||
var ( |
||||
feed Feed |
||||
ch1 = make(chan int) |
||||
ch2 = make(chan int) |
||||
sub1 = feed.Subscribe(ch1) |
||||
sub2 = feed.Subscribe(ch1) |
||||
sub3 = feed.Subscribe(ch2) |
||||
) |
||||
if len(feed.inbox) != 3 { |
||||
t.Errorf("inbox length != 3 after subscribe") |
||||
} |
||||
if len(feed.sendCases) != 1 { |
||||
t.Errorf("sendCases is non-empty after unsubscribe") |
||||
} |
||||
|
||||
sub1.Unsubscribe() |
||||
sub2.Unsubscribe() |
||||
sub3.Unsubscribe() |
||||
if len(feed.inbox) != 0 { |
||||
t.Errorf("inbox is non-empty after unsubscribe") |
||||
} |
||||
if len(feed.sendCases) != 1 { |
||||
t.Errorf("sendCases is non-empty after unsubscribe") |
||||
} |
||||
} |
||||
|
||||
func BenchmarkFeedSend1000(b *testing.B) { |
||||
var ( |
||||
done sync.WaitGroup |
||||
feed Feed |
||||
nsubs = 1000 |
||||
) |
||||
subscriber := func(ch <-chan int) { |
||||
for i := 0; i < b.N; i++ { |
||||
<-ch |
||||
} |
||||
done.Done() |
||||
} |
||||
done.Add(nsubs) |
||||
for i := 0; i < nsubs; i++ { |
||||
ch := make(chan int, 200) |
||||
feed.Subscribe(ch) |
||||
go subscriber(ch) |
||||
} |
||||
|
||||
// The actual benchmark.
|
||||
b.ResetTimer() |
||||
for i := 0; i < b.N; i++ { |
||||
if feed.Send(i) != nsubs { |
||||
panic("wrong number of sends") |
||||
} |
||||
} |
||||
|
||||
b.StopTimer() |
||||
done.Wait() |
||||
} |
@ -0,0 +1,275 @@ |
||||
// Copyright 2017 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package event |
||||
|
||||
import ( |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/ethereum/go-ethereum/common/mclock" |
||||
"golang.org/x/net/context" |
||||
) |
||||
|
||||
// Subscription represents a stream of events. The carrier of the events is typically a
|
||||
// channel, but isn't part of the interface.
|
||||
//
|
||||
// Subscriptions can fail while established. Failures are reported through an error
|
||||
// channel. It receives a value if there is an issue with the subscription (e.g. the
|
||||
// network connection delivering the events has been closed). Only one value will ever be
|
||||
// sent.
|
||||
//
|
||||
// The error channel is closed when the subscription ends successfully (i.e. when the
|
||||
// source of events is closed). It is also closed when Unsubscribe is called.
|
||||
//
|
||||
// The Unsubscribe method cancels the sending of events. You must call Unsubscribe in all
|
||||
// cases to ensure that resources related to the subscription are released. It can be
|
||||
// called any number of times.
|
||||
type Subscription interface { |
||||
Err() <-chan error // returns the error channel
|
||||
Unsubscribe() // cancels sending of events, closing the error channel
|
||||
} |
||||
|
||||
// NewSubscription runs fn as a subscription in a new goroutine. The channel given to fn
|
||||
// is closed when Unsubscribe is called. If fn returns an error, it is sent on the
|
||||
// subscription's error channel.
|
||||
func NewSubscription(fn func(<-chan struct{}) error) Subscription { |
||||
s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)} |
||||
go func() { |
||||
defer close(s.err) |
||||
err := fn(s.unsub) |
||||
s.mu.Lock() |
||||
defer s.mu.Unlock() |
||||
if !s.unsubscribed { |
||||
if err != nil { |
||||
s.err <- err |
||||
} |
||||
s.unsubscribed = true |
||||
} |
||||
}() |
||||
return s |
||||
} |
||||
|
||||
type funcSub struct { |
||||
unsub chan struct{} |
||||
err chan error |
||||
mu sync.Mutex |
||||
unsubscribed bool |
||||
} |
||||
|
||||
func (s *funcSub) Unsubscribe() { |
||||
s.mu.Lock() |
||||
if s.unsubscribed { |
||||
s.mu.Unlock() |
||||
return |
||||
} |
||||
s.unsubscribed = true |
||||
close(s.unsub) |
||||
s.mu.Unlock() |
||||
// Wait for producer shutdown.
|
||||
<-s.err |
||||
} |
||||
|
||||
func (s *funcSub) Err() <-chan error { |
||||
return s.err |
||||
} |
||||
|
||||
// Resubscribe calls fn repeatedly to keep a subscription established. When the
|
||||
// subscription is established, Resubscribe waits for it to fail and calls fn again. This
|
||||
// process repeats until Unsubscribe is called or the active subscription ends
|
||||
// successfully.
|
||||
//
|
||||
// Resubscribe applies backoff between calls to fn. The time between calls is adapted
|
||||
// based on the error rate, but will never exceed backoffMax.
|
||||
func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription { |
||||
s := &resubscribeSub{ |
||||
waitTime: backoffMax / 10, |
||||
backoffMax: backoffMax, |
||||
fn: fn, |
||||
err: make(chan error), |
||||
unsub: make(chan struct{}), |
||||
} |
||||
go s.loop() |
||||
return s |
||||
} |
||||
|
||||
// A ResubscribeFunc attempts to establish a subscription.
|
||||
type ResubscribeFunc func(context.Context) (Subscription, error) |
||||
|
||||
type resubscribeSub struct { |
||||
fn ResubscribeFunc |
||||
err chan error |
||||
unsub chan struct{} |
||||
unsubOnce sync.Once |
||||
lastTry mclock.AbsTime |
||||
waitTime, backoffMax time.Duration |
||||
} |
||||
|
||||
func (s *resubscribeSub) Unsubscribe() { |
||||
s.unsubOnce.Do(func() { |
||||
s.unsub <- struct{}{} |
||||
<-s.err |
||||
}) |
||||
} |
||||
|
||||
func (s *resubscribeSub) Err() <-chan error { |
||||
return s.err |
||||
} |
||||
|
||||
func (s *resubscribeSub) loop() { |
||||
defer close(s.err) |
||||
var done bool |
||||
for !done { |
||||
sub := s.subscribe() |
||||
if sub == nil { |
||||
break |
||||
} |
||||
done = s.waitForError(sub) |
||||
sub.Unsubscribe() |
||||
} |
||||
} |
||||
|
||||
func (s *resubscribeSub) subscribe() Subscription { |
||||
subscribed := make(chan error) |
||||
var sub Subscription |
||||
retry: |
||||
for { |
||||
s.lastTry = mclock.Now() |
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
go func() { |
||||
rsub, err := s.fn(ctx) |
||||
sub = rsub |
||||
subscribed <- err |
||||
}() |
||||
select { |
||||
case err := <-subscribed: |
||||
cancel() |
||||
if err != nil { |
||||
// Subscribing failed, wait before launching the next try.
|
||||
if s.backoffWait() { |
||||
return nil |
||||
} |
||||
continue retry |
||||
} |
||||
if sub == nil { |
||||
panic("event: ResubscribeFunc returned nil subscription and no error") |
||||
} |
||||
return sub |
||||
case <-s.unsub: |
||||
cancel() |
||||
return nil |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (s *resubscribeSub) waitForError(sub Subscription) bool { |
||||
defer sub.Unsubscribe() |
||||
select { |
||||
case err := <-sub.Err(): |
||||
return err == nil |
||||
case <-s.unsub: |
||||
return true |
||||
} |
||||
} |
||||
|
||||
func (s *resubscribeSub) backoffWait() bool { |
||||
if time.Duration(mclock.Now()-s.lastTry) > s.backoffMax { |
||||
s.waitTime = s.backoffMax / 10 |
||||
} else { |
||||
s.waitTime *= 2 |
||||
if s.waitTime > s.backoffMax { |
||||
s.waitTime = s.backoffMax |
||||
} |
||||
} |
||||
|
||||
t := time.NewTimer(s.waitTime) |
||||
defer t.Stop() |
||||
select { |
||||
case <-t.C: |
||||
return false |
||||
case <-s.unsub: |
||||
return true |
||||
} |
||||
} |
||||
|
||||
// SubscriptionScope provides a facility to unsubscribe multiple subscriptions at once.
|
||||
//
|
||||
// For code that handle more than one subscription, a scope can be used to conveniently
|
||||
// unsubscribe all of them with a single call. The example demonstrates a typical use in a
|
||||
// larger program.
|
||||
//
|
||||
// The zero value is ready to use.
|
||||
type SubscriptionScope struct { |
||||
mu sync.Mutex |
||||
subs map[*scopeSub]struct{} |
||||
closed bool |
||||
} |
||||
|
||||
type scopeSub struct { |
||||
sc *SubscriptionScope |
||||
s Subscription |
||||
} |
||||
|
||||
// Track starts tracking a subscription. If the scope is closed, Track returns nil. The
|
||||
// returned subscription is a wrapper. Unsubscribing the wrapper removes it from the
|
||||
// scope.
|
||||
func (sc *SubscriptionScope) Track(s Subscription) Subscription { |
||||
sc.mu.Lock() |
||||
defer sc.mu.Unlock() |
||||
if sc.closed { |
||||
return nil |
||||
} |
||||
if sc.subs == nil { |
||||
sc.subs = make(map[*scopeSub]struct{}) |
||||
} |
||||
ss := &scopeSub{sc, s} |
||||
sc.subs[ss] = struct{}{} |
||||
return ss |
||||
} |
||||
|
||||
// Close calls Unsubscribe on all tracked subscriptions and prevents further additions to
|
||||
// the tracked set. Calls to Track after Close return nil.
|
||||
func (sc *SubscriptionScope) Close() { |
||||
sc.mu.Lock() |
||||
defer sc.mu.Unlock() |
||||
if sc.closed { |
||||
return |
||||
} |
||||
sc.closed = true |
||||
for s := range sc.subs { |
||||
s.s.Unsubscribe() |
||||
} |
||||
sc.subs = nil |
||||
} |
||||
|
||||
// Count returns the number of tracked subscriptions.
|
||||
// It is meant to be used for debugging.
|
||||
func (sc *SubscriptionScope) Count() int { |
||||
sc.mu.Lock() |
||||
defer sc.mu.Unlock() |
||||
return len(sc.subs) |
||||
} |
||||
|
||||
func (s *scopeSub) Unsubscribe() { |
||||
s.s.Unsubscribe() |
||||
s.sc.mu.Lock() |
||||
defer s.sc.mu.Unlock() |
||||
delete(s.sc.subs, s) |
||||
} |
||||
|
||||
func (s *scopeSub) Err() <-chan error { |
||||
return s.s.Err() |
||||
} |
@ -0,0 +1,121 @@ |
||||
// Copyright 2017 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package event |
||||
|
||||
import ( |
||||
"errors" |
||||
"testing" |
||||
"time" |
||||
|
||||
"golang.org/x/net/context" |
||||
) |
||||
|
||||
var errInts = errors.New("error in subscribeInts") |
||||
|
||||
func subscribeInts(max, fail int, c chan<- int) Subscription { |
||||
return NewSubscription(func(quit <-chan struct{}) error { |
||||
for i := 0; i < max; i++ { |
||||
if i >= fail { |
||||
return errInts |
||||
} |
||||
select { |
||||
case c <- i: |
||||
case <-quit: |
||||
return nil |
||||
} |
||||
} |
||||
return nil |
||||
}) |
||||
} |
||||
|
||||
func TestNewSubscriptionError(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
channel := make(chan int) |
||||
sub := subscribeInts(10, 2, channel) |
||||
loop: |
||||
for want := 0; want < 10; want++ { |
||||
select { |
||||
case got := <-channel: |
||||
if got != want { |
||||
t.Fatalf("wrong int %d, want %d", got, want) |
||||
} |
||||
case err := <-sub.Err(): |
||||
if err != errInts { |
||||
t.Fatalf("wrong error: got %q, want %q", err, errInts) |
||||
} |
||||
if want != 2 { |
||||
t.Fatalf("got errInts at int %d, should be received at 2", want) |
||||
} |
||||
break loop |
||||
} |
||||
} |
||||
sub.Unsubscribe() |
||||
|
||||
err, ok := <-sub.Err() |
||||
if err != nil { |
||||
t.Fatal("got non-nil error after Unsubscribe") |
||||
} |
||||
if ok { |
||||
t.Fatal("channel still open after Unsubscribe") |
||||
} |
||||
} |
||||
|
||||
func TestResubscribe(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
var i int |
||||
nfails := 6 |
||||
sub := Resubscribe(100*time.Millisecond, func(ctx context.Context) (Subscription, error) { |
||||
// fmt.Printf("call #%d @ %v\n", i, time.Now())
|
||||
i++ |
||||
if i == 2 { |
||||
// Delay the second failure a bit to reset the resubscribe interval.
|
||||
time.Sleep(200 * time.Millisecond) |
||||
} |
||||
if i < nfails { |
||||
return nil, errors.New("oops") |
||||
} |
||||
sub := NewSubscription(func(unsubscribed <-chan struct{}) error { return nil }) |
||||
return sub, nil |
||||
}) |
||||
|
||||
<-sub.Err() |
||||
if i != nfails { |
||||
t.Fatalf("resubscribe function called %d times, want %d times", i, nfails) |
||||
} |
||||
} |
||||
|
||||
func TestResubscribeAbort(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
done := make(chan error) |
||||
sub := Resubscribe(0, func(ctx context.Context) (Subscription, error) { |
||||
select { |
||||
case <-ctx.Done(): |
||||
done <- nil |
||||
case <-time.After(2 * time.Second): |
||||
done <- errors.New("context given to resubscribe function not canceled within 2s") |
||||
} |
||||
return nil, nil |
||||
}) |
||||
|
||||
sub.Unsubscribe() |
||||
if err := <-done; err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
} |
Loading…
Reference in new issue