diff --git a/event/subscription.go b/event/subscription.go index 6c62874719..07e059c6db 100644 --- a/event/subscription.go +++ b/event/subscription.go @@ -120,7 +120,7 @@ func ResubscribeErr(backoffMax time.Duration, fn ResubscribeErrFunc) Subscriptio backoffMax: backoffMax, fn: fn, err: make(chan error), - unsub: make(chan struct{}), + unsub: make(chan struct{}, 1), } go s.loop() return s diff --git a/event/subscription_test.go b/event/subscription_test.go index ba081705c4..743d0bf67d 100644 --- a/event/subscription_test.go +++ b/event/subscription_test.go @@ -154,3 +154,27 @@ func TestResubscribeWithErrorHandler(t *testing.T) { t.Fatalf("unexpected subscription errors %v, want %v", subErrs, expectedSubErrs) } } + +func TestResubscribeWithCompletedSubscription(t *testing.T) { + t.Parallel() + + quitProducerAck := make(chan struct{}) + quitProducer := make(chan struct{}) + + sub := ResubscribeErr(100*time.Millisecond, func(ctx context.Context, lastErr error) (Subscription, error) { + return NewSubscription(func(unsubscribed <-chan struct{}) error { + select { + case <-quitProducer: + quitProducerAck <- struct{}{} + return nil + case <-unsubscribed: + return nil + } + }), nil + }) + + // Ensure producer has started and exited before Unsubscribe + close(quitProducer) + <-quitProducerAck + sub.Unsubscribe() +}