@ -25,33 +25,22 @@ import (
"time"
)
// Event is a time-tagged notification pushed to subscribers.
type Event struct {
// TypeMux Event is a time-tagged notification pushed to subscribers.
type TypeMux Event struct {
Time time . Time
Data interface { }
}
// Subscription is implemented by event subscriptions.
type Subscription interface {
// Chan returns a channel that carries events.
// Implementations should return the same channel
// for any subsequent calls to Chan.
Chan ( ) <- chan * Event
// Unsubscribe stops delivery of events to a subscription.
// The event channel is closed.
// Unsubscribe can be called more than once.
Unsubscribe ( )
}
// A TypeMux dispatches events to registered receivers. Receivers can be
// registered to handle events of certain type. Any operation
// called after mux is stopped will return ErrMuxClosed.
//
// The zero value is ready to use.
//
// Deprecated: use Feed
type TypeMux struct {
mutex sync . RWMutex
subm map [ reflect . Type ] [ ] * muxsub
subm map [ reflect . Type ] [ ] * TypeMuxSubscription
stopped bool
}
@ -61,7 +50,7 @@ var ErrMuxClosed = errors.New("event: mux closed")
// Subscribe creates a subscription for events of the given types. The
// subscription's channel is closed when it is unsubscribed
// or the mux is closed.
func ( mux * TypeMux ) Subscribe ( types ... interface { } ) Subscription {
func ( mux * TypeMux ) Subscribe ( types ... interface { } ) * TypeMux Subscription {
sub := newsub ( mux )
mux . mutex . Lock ( )
defer mux . mutex . Unlock ( )
@ -72,7 +61,7 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription {
close ( sub . postC )
} else {
if mux . subm == nil {
mux . subm = make ( map [ reflect . Type ] [ ] * muxsub )
mux . subm = make ( map [ reflect . Type ] [ ] * TypeMuxSubscription )
}
for _ , t := range types {
rtyp := reflect . TypeOf ( t )
@ -80,7 +69,7 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription {
if find ( oldsubs , sub ) != - 1 {
panic ( fmt . Sprintf ( "event: duplicate type %s in Subscribe" , rtyp ) )
}
subs := make ( [ ] * muxsub , len ( oldsubs ) + 1 )
subs := make ( [ ] * TypeMuxSubscription , len ( oldsubs ) + 1 )
copy ( subs , oldsubs )
subs [ len ( oldsubs ) ] = sub
mux . subm [ rtyp ] = subs
@ -92,7 +81,7 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription {
// Post sends an event to all receivers registered for the given type.
// It returns ErrMuxClosed if the mux has been stopped.
func ( mux * TypeMux ) Post ( ev interface { } ) error {
event := & Event {
event := & TypeMux Event{
Time : time . Now ( ) ,
Data : ev ,
}
@ -125,7 +114,7 @@ func (mux *TypeMux) Stop() {
mux . mutex . Unlock ( )
}
func ( mux * TypeMux ) del ( s * muxsub ) {
func ( mux * TypeMux ) del ( s * TypeMuxSubscription ) {
mux . mutex . Lock ( )
for typ , subs := range mux . subm {
if pos := find ( subs , s ) ; pos >= 0 {
@ -139,7 +128,7 @@ func (mux *TypeMux) del(s *muxsub) {
s . mux . mutex . Unlock ( )
}
func find ( slice [ ] * muxsub , item * muxsub ) int {
func find ( slice [ ] * TypeMuxSubscription , item * TypeMuxSubscription ) int {
for i , v := range slice {
if v == item {
return i
@ -148,14 +137,15 @@ func find(slice []*muxsub, item *muxsub) int {
return - 1
}
func posdelete ( slice [ ] * muxsub , pos int ) [ ] * muxsub {
news := make ( [ ] * muxsub , len ( slice ) - 1 )
func posdelete ( slice [ ] * TypeMuxSubscription , pos int ) [ ] * TypeMuxSubscription {
news := make ( [ ] * TypeMuxSubscription , len ( slice ) - 1 )
copy ( news [ : pos ] , slice [ : pos ] )
copy ( news [ pos : ] , slice [ pos + 1 : ] )
return news
}
type muxsub struct {
// TypeMuxSubscription is a subscription established through TypeMux.
type TypeMuxSubscription struct {
mux * TypeMux
created time . Time
closeMu sync . Mutex
@ -166,13 +156,13 @@ type muxsub struct {
// postC can be set to nil without affecting the return value of
// Chan.
postMu sync . RWMutex
readC <- chan * Event
postC chan <- * Event
readC <- chan * TypeMux Event
postC chan <- * TypeMux Event
}
func newsub ( mux * TypeMux ) * muxsub {
c := make ( chan * Event )
return & muxsub {
func newsub ( mux * TypeMux ) * TypeMuxSubscription {
c := make ( chan * TypeMux Event)
return & TypeMuxSubscription {
mux : mux ,
created : time . Now ( ) ,
readC : c ,
@ -181,16 +171,16 @@ func newsub(mux *TypeMux) *muxsub {
}
}
func ( s * muxsub ) Chan ( ) <- chan * Event {
func ( s * TypeMuxSubscription ) Chan ( ) <- chan * TypeMux Event {
return s . readC
}
func ( s * muxsub ) Unsubscribe ( ) {
func ( s * TypeMuxSubscription ) Unsubscribe ( ) {
s . mux . del ( s )
s . closewait ( )
}
func ( s * muxsub ) closewait ( ) {
func ( s * TypeMuxSubscription ) closewait ( ) {
s . closeMu . Lock ( )
defer s . closeMu . Unlock ( )
if s . closed {
@ -205,7 +195,7 @@ func (s *muxsub) closewait() {
s . postMu . Unlock ( )
}
func ( s * muxsub ) deliver ( event * Event ) {
func ( s * TypeMuxSubscription ) deliver ( event * TypeMux Event) {
// Short circuit delivery if stale event
if s . created . After ( event . Time ) {
return