event, p2p/simulations/adapters: fix rare goroutine leaks (#20657)

Co-authored-by: Felix Lange <fjl@twurst.com>
pull/20932/head^2
Boqin Qin 5 years ago committed by GitHub
parent 46c4b699c8
commit a9614c3c91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      event/subscription.go
  2. 2
      event/subscription_test.go
  3. 2
      p2p/simulations/adapters/exec.go

@ -145,7 +145,6 @@ func (s *resubscribeSub) loop() {
func (s *resubscribeSub) subscribe() Subscription { func (s *resubscribeSub) subscribe() Subscription {
subscribed := make(chan error) subscribed := make(chan error)
var sub Subscription var sub Subscription
retry:
for { for {
s.lastTry = mclock.Now() s.lastTry = mclock.Now()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -157,19 +156,19 @@ retry:
select { select {
case err := <-subscribed: case err := <-subscribed:
cancel() cancel()
if err != nil { if err == nil {
// Subscribing failed, wait before launching the next try.
if s.backoffWait() {
return nil
}
continue retry
}
if sub == nil { if sub == nil {
panic("event: ResubscribeFunc returned nil subscription and no error") panic("event: ResubscribeFunc returned nil subscription and no error")
} }
return sub return sub
}
// Subscribing failed, wait before launching the next try.
if s.backoffWait() {
return nil // unsubscribed during wait
}
case <-s.unsub: case <-s.unsub:
cancel() cancel()
<-subscribed // avoid leaking the s.fn goroutine.
return nil return nil
} }
} }

@ -102,7 +102,7 @@ func TestResubscribe(t *testing.T) {
func TestResubscribeAbort(t *testing.T) { func TestResubscribeAbort(t *testing.T) {
t.Parallel() t.Parallel()
done := make(chan error) done := make(chan error, 1)
sub := Resubscribe(0, func(ctx context.Context) (Subscription, error) { sub := Resubscribe(0, func(ctx context.Context) (Subscription, error) {
select { select {
case <-ctx.Done(): case <-ctx.Done():

@ -287,7 +287,7 @@ func (n *ExecNode) Stop() error {
if err := n.Cmd.Process.Signal(syscall.SIGTERM); err != nil { if err := n.Cmd.Process.Signal(syscall.SIGTERM); err != nil {
return n.Cmd.Process.Kill() return n.Cmd.Process.Kill()
} }
waitErr := make(chan error) waitErr := make(chan error, 1)
go func() { go func() {
waitErr <- n.Cmd.Wait() waitErr <- n.Cmd.Wait()
}() }()

Loading…
Cancel
Save