|
|
|
@ -57,7 +57,8 @@ func (e feedTypeError) Error() string { |
|
|
|
|
return "event: wrong type in " + e.op + " got " + e.got.String() + ", want " + e.want.String() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (f *Feed) init() { |
|
|
|
|
func (f *Feed) init(etype reflect.Type) { |
|
|
|
|
f.etype = etype |
|
|
|
|
f.removeSub = make(chan interface{}) |
|
|
|
|
f.sendLock = make(chan struct{}, 1) |
|
|
|
|
f.sendLock <- struct{}{} |
|
|
|
@ -70,8 +71,6 @@ func (f *Feed) init() { |
|
|
|
|
// The channel should have ample buffer space to avoid blocking other subscribers.
|
|
|
|
|
// Slow subscribers are not dropped.
|
|
|
|
|
func (f *Feed) Subscribe(channel interface{}) Subscription { |
|
|
|
|
f.once.Do(f.init) |
|
|
|
|
|
|
|
|
|
chanval := reflect.ValueOf(channel) |
|
|
|
|
chantyp := chanval.Type() |
|
|
|
|
if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 { |
|
|
|
@ -79,11 +78,13 @@ func (f *Feed) Subscribe(channel interface{}) Subscription { |
|
|
|
|
} |
|
|
|
|
sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)} |
|
|
|
|
|
|
|
|
|
f.mu.Lock() |
|
|
|
|
defer f.mu.Unlock() |
|
|
|
|
if !f.typecheck(chantyp.Elem()) { |
|
|
|
|
f.once.Do(func() { f.init(chantyp.Elem()) }) |
|
|
|
|
if f.etype != chantyp.Elem() { |
|
|
|
|
panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
f.mu.Lock() |
|
|
|
|
defer f.mu.Unlock() |
|
|
|
|
// Add the select case to the inbox.
|
|
|
|
|
// The next Send will add it to f.sendCases.
|
|
|
|
|
cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval} |
|
|
|
@ -91,15 +92,6 @@ func (f *Feed) Subscribe(channel interface{}) Subscription { |
|
|
|
|
return sub |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// note: callers must hold f.mu
|
|
|
|
|
func (f *Feed) typecheck(typ reflect.Type) bool { |
|
|
|
|
if f.etype == nil { |
|
|
|
|
f.etype = typ |
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
|
return f.etype == typ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (f *Feed) remove(sub *feedSub) { |
|
|
|
|
// Delete from inbox first, which covers channels
|
|
|
|
|
// that have not been added to f.sendCases yet.
|
|
|
|
@ -128,19 +120,17 @@ func (f *Feed) remove(sub *feedSub) { |
|
|
|
|
func (f *Feed) Send(value interface{}) (nsent int) { |
|
|
|
|
rvalue := reflect.ValueOf(value) |
|
|
|
|
|
|
|
|
|
f.once.Do(f.init) |
|
|
|
|
f.once.Do(func() { f.init(rvalue.Type()) }) |
|
|
|
|
if f.etype != rvalue.Type() { |
|
|
|
|
panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
<-f.sendLock |
|
|
|
|
|
|
|
|
|
// Add new cases from the inbox after taking the send lock.
|
|
|
|
|
f.mu.Lock() |
|
|
|
|
f.sendCases = append(f.sendCases, f.inbox...) |
|
|
|
|
f.inbox = nil |
|
|
|
|
|
|
|
|
|
if !f.typecheck(rvalue.Type()) { |
|
|
|
|
f.sendLock <- struct{}{} |
|
|
|
|
f.mu.Unlock() |
|
|
|
|
panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype}) |
|
|
|
|
} |
|
|
|
|
f.mu.Unlock() |
|
|
|
|
|
|
|
|
|
// Set the sent value on all channels.
|
|
|
|
|