diff --git a/event/event.go b/event/event.go index 344d1e3f64..d11a0e9bdb 100644 --- a/event/event.go +++ b/event/event.go @@ -23,6 +23,8 @@ type Subscription interface { // A TypeMux dispatches events to registered receivers. Receivers can be // registered to handle events of certain type. Any operation // called after mux is stopped will return ErrMuxClosed. +// +// The zero value is ready to use. type TypeMux struct { mutex sync.RWMutex subm map[reflect.Type][]*muxsub @@ -32,11 +34,6 @@ type TypeMux struct { // ErrMuxClosed is returned when Posting on a closed TypeMux. var ErrMuxClosed = errors.New("event: mux closed") -// NewTypeMux creates a running mux. -func NewTypeMux() *TypeMux { - return &TypeMux{subm: make(map[reflect.Type][]*muxsub)} -} - // Subscribe creates a subscription for events of the given types. The // subscription's channel is closed when it is unsubscribed // or the mux is closed. @@ -44,9 +41,11 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { sub := newsub(mux) mux.mutex.Lock() if mux.stopped { - mux.mutex.Unlock() close(sub.postC) } else { + if mux.subm == nil { + mux.subm = make(map[reflect.Type][]*muxsub) + } for _, t := range types { rtyp := reflect.TypeOf(t) oldsubs := mux.subm[rtyp] @@ -55,8 +54,8 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { subs[len(oldsubs)] = sub mux.subm[rtyp] = subs } - mux.mutex.Unlock() } + mux.mutex.Unlock() return sub } diff --git a/event/event_test.go b/event/event_test.go index 385bd70b79..f65aaa0a21 100644 --- a/event/event_test.go +++ b/event/event_test.go @@ -10,7 +10,7 @@ import ( type testEvent int func TestSub(t *testing.T) { - mux := NewTypeMux() + mux := new(TypeMux) defer mux.Stop() sub := mux.Subscribe(testEvent(0)) @@ -28,7 +28,7 @@ func TestSub(t *testing.T) { } func TestMuxErrorAfterStop(t *testing.T) { - mux := NewTypeMux() + mux := new(TypeMux) mux.Stop() sub := mux.Subscribe(testEvent(0)) @@ -41,7 +41,7 @@ func TestMuxErrorAfterStop(t *testing.T) { } func TestUnsubscribeUnblockPost(t *testing.T) { - mux := NewTypeMux() + mux := new(TypeMux) defer mux.Stop() sub := mux.Subscribe(testEvent(0)) @@ -62,7 +62,7 @@ func TestUnsubscribeUnblockPost(t *testing.T) { func TestMuxConcurrent(t *testing.T) { rand.Seed(time.Now().Unix()) - mux := NewTypeMux() + mux := new(TypeMux) defer mux.Stop() recv := make(chan int) @@ -111,7 +111,7 @@ func emptySubscriber(mux *TypeMux, types ...interface{}) { } func BenchmarkPost3(b *testing.B) { - var mux = NewTypeMux() + var mux = new(TypeMux) defer mux.Stop() emptySubscriber(mux, testEvent(0)) emptySubscriber(mux, testEvent(0)) @@ -123,7 +123,7 @@ func BenchmarkPost3(b *testing.B) { } func BenchmarkPostConcurrent(b *testing.B) { - var mux = NewTypeMux() + var mux = new(TypeMux) defer mux.Stop() emptySubscriber(mux, testEvent(0)) emptySubscriber(mux, testEvent(0))