event: add ResubscribeErr (#22191)

This adds a way to get the error of the failing subscription
for logging/debugging purposes.

Co-authored-by: Felix Lange <fjl@twurst.com>
pull/22211/head
Łukasz Zimnoch 4 years ago committed by GitHub
parent c4307a9339
commit 231040c633
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 32
      event/subscription.go
  2. 36
      event/subscription_test.go

@ -95,6 +95,26 @@ func (s *funcSub) Err() <-chan error {
// Resubscribe applies backoff between calls to fn. The time between calls is adapted
// based on the error rate, but will never exceed backoffMax.
func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription {
return ResubscribeErr(backoffMax, func(ctx context.Context, _ error) (Subscription, error) {
return fn(ctx)
})
}
// A ResubscribeFunc attempts to establish a subscription.
type ResubscribeFunc func(context.Context) (Subscription, error)
// ResubscribeErr calls fn repeatedly to keep a subscription established. When the
// subscription is established, ResubscribeErr waits for it to fail and calls fn again. This
// process repeats until Unsubscribe is called or the active subscription ends
// successfully.
//
// The difference between Resubscribe and ResubscribeErr is that with ResubscribeErr,
// the error of the failing subscription is available to the callback for logging
// purposes.
//
// ResubscribeErr applies backoff between calls to fn. The time between calls is adapted
// based on the error rate, but will never exceed backoffMax.
func ResubscribeErr(backoffMax time.Duration, fn ResubscribeErrFunc) Subscription {
s := &resubscribeSub{
waitTime: backoffMax / 10,
backoffMax: backoffMax,
@ -106,15 +126,18 @@ func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription {
return s
}
// A ResubscribeFunc attempts to establish a subscription.
type ResubscribeFunc func(context.Context) (Subscription, error)
// A ResubscribeErrFunc attempts to establish a subscription.
// For every call but the first, the second argument to this function is
// the error that occurred with the previous subscription.
type ResubscribeErrFunc func(context.Context, error) (Subscription, error)
type resubscribeSub struct {
fn ResubscribeFunc
fn ResubscribeErrFunc
err chan error
unsub chan struct{}
unsubOnce sync.Once
lastTry mclock.AbsTime
lastSubErr error
waitTime, backoffMax time.Duration
}
@ -149,7 +172,7 @@ func (s *resubscribeSub) subscribe() Subscription {
s.lastTry = mclock.Now()
ctx, cancel := context.WithCancel(context.Background())
go func() {
rsub, err := s.fn(ctx)
rsub, err := s.fn(ctx, s.lastSubErr)
sub = rsub
subscribed <- err
}()
@ -178,6 +201,7 @@ func (s *resubscribeSub) waitForError(sub Subscription) bool {
defer sub.Unsubscribe()
select {
case err := <-sub.Err():
s.lastSubErr = err
return err == nil
case <-s.unsub:
return true

@ -19,6 +19,8 @@ package event
import (
"context"
"errors"
"fmt"
"reflect"
"testing"
"time"
)
@ -118,3 +120,37 @@ func TestResubscribeAbort(t *testing.T) {
t.Fatal(err)
}
}
func TestResubscribeWithErrorHandler(t *testing.T) {
t.Parallel()
var i int
nfails := 6
subErrs := make([]string, 0)
sub := ResubscribeErr(100*time.Millisecond, func(ctx context.Context, lastErr error) (Subscription, error) {
i++
var lastErrVal string
if lastErr != nil {
lastErrVal = lastErr.Error()
}
subErrs = append(subErrs, lastErrVal)
sub := NewSubscription(func(unsubscribed <-chan struct{}) error {
if i < nfails {
return fmt.Errorf("err-%v", i)
} else {
return nil
}
})
return sub, nil
})
<-sub.Err()
if i != nfails {
t.Fatalf("resubscribe function called %d times, want %d times", i, nfails)
}
expectedSubErrs := []string{"", "err-1", "err-2", "err-3", "err-4", "err-5"}
if !reflect.DeepEqual(subErrs, expectedSubErrs) {
t.Fatalf("unexpected subscription errors %v, want %v", subErrs, expectedSubErrs)
}
}

Loading…
Cancel
Save