beacon/light/api: improve handling of event stream setup failures (#29308)

The StartHeadListener method will only be called once. So it can't just make one attempt
to connect to the eventsource endpoint, it has to keep trying. Note that once the stream
is established, the eventsource implementation itself will keep retrying.
pull/29339/head
Felix Lange 8 months ago committed by GitHub
parent 5cea7a6230
commit eda9cb7b36
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 106
      beacon/light/api/light_api.go

@ -17,11 +17,13 @@
package api package api
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"sync"
"time" "time"
"github.com/donovanhide/eventsource" "github.com/donovanhide/eventsource"
@ -416,39 +418,34 @@ type HeadEventListener struct {
// The callbacks are also called for the current head and optimistic head at startup. // The callbacks are also called for the current head and optimistic head at startup.
// They are never called concurrently. // They are never called concurrently.
func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() { func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() {
closeCh := make(chan struct{}) // initiate closing the stream var (
closedCh := make(chan struct{}) // stream closed (or failed to create) ctx, closeCtx = context.WithCancel(context.Background())
stoppedCh := make(chan struct{}) // sync loop stopped streamCh = make(chan *eventsource.Stream, 1)
streamCh := make(chan *eventsource.Stream, 1) wg sync.WaitGroup
)
// When connected to a Lodestar node the subscription blocks until the first actual
// event arrives; therefore we create the subscription in a separate goroutine while
// letting the main goroutine sync up to the current head.
wg.Add(1)
go func() { go func() {
defer close(closedCh) defer wg.Done()
// when connected to a Lodestar node the subscription blocks until the stream := api.startEventStream(ctx, &listener)
// first actual event arrives; therefore we create the subscription in if stream == nil {
// a separate goroutine while letting the main goroutine sync up to the // This case happens when the context was closed.
// current head
req, err := http.NewRequest("GET", api.url+
"/eth/v1/events?topics=head&topics=light_client_optimistic_update&topics=light_client_finality_update", nil)
if err != nil {
listener.OnError(fmt.Errorf("error creating event subscription request: %v", err))
return
}
for k, v := range api.customHeaders {
req.Header.Set(k, v)
}
stream, err := eventsource.SubscribeWithRequest("", req)
if err != nil {
listener.OnError(fmt.Errorf("error creating event subscription: %v", err))
close(streamCh)
return return
} }
// Stream was opened, wait for close signal.
streamCh <- stream streamCh <- stream
<-closeCh <-ctx.Done()
stream.Close() stream.Close()
}() }()
wg.Add(1)
go func() { go func() {
defer close(stoppedCh) defer wg.Done()
// Request initial data.
if head, err := api.GetHeader(common.Hash{}); err == nil { if head, err := api.GetHeader(common.Hash{}); err == nil {
listener.OnNewHead(head.Slot, head.Hash()) listener.OnNewHead(head.Slot, head.Hash())
} }
@ -458,32 +455,42 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
if finalityUpdate, err := api.GetFinalityUpdate(); err == nil { if finalityUpdate, err := api.GetFinalityUpdate(); err == nil {
listener.OnFinality(finalityUpdate) listener.OnFinality(finalityUpdate)
} }
stream := <-streamCh
if stream == nil { // Receive the stream.
var stream *eventsource.Stream
select {
case stream = <-streamCh:
case <-ctx.Done():
return return
} }
for { for {
select { select {
case <-ctx.Done():
stream.Close()
case event, ok := <-stream.Events: case event, ok := <-stream.Events:
if !ok { if !ok {
return return
} }
switch event.Event() { switch event.Event() {
case "head": case "head":
if slot, blockRoot, err := decodeHeadEvent([]byte(event.Data())); err == nil { slot, blockRoot, err := decodeHeadEvent([]byte(event.Data()))
if err == nil {
listener.OnNewHead(slot, blockRoot) listener.OnNewHead(slot, blockRoot)
} else { } else {
listener.OnError(fmt.Errorf("error decoding head event: %v", err)) listener.OnError(fmt.Errorf("error decoding head event: %v", err))
} }
case "light_client_optimistic_update": case "light_client_optimistic_update":
if signedHead, err := decodeOptimisticHeadUpdate([]byte(event.Data())); err == nil { signedHead, err := decodeOptimisticHeadUpdate([]byte(event.Data()))
if err == nil {
listener.OnSignedHead(signedHead) listener.OnSignedHead(signedHead)
} else { } else {
listener.OnError(fmt.Errorf("error decoding optimistic update event: %v", err)) listener.OnError(fmt.Errorf("error decoding optimistic update event: %v", err))
} }
case "light_client_finality_update": case "light_client_finality_update":
if finalityUpdate, err := decodeFinalityUpdate([]byte(event.Data())); err == nil { finalityUpdate, err := decodeFinalityUpdate([]byte(event.Data()))
if err == nil {
listener.OnFinality(finalityUpdate) listener.OnFinality(finalityUpdate)
} else { } else {
listener.OnError(fmt.Errorf("error decoding finality update event: %v", err)) listener.OnError(fmt.Errorf("error decoding finality update event: %v", err))
@ -491,6 +498,7 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
default: default:
listener.OnError(fmt.Errorf("unexpected event: %s", event.Event())) listener.OnError(fmt.Errorf("unexpected event: %s", event.Event()))
} }
case err, ok := <-stream.Errors: case err, ok := <-stream.Errors:
if !ok { if !ok {
return return
@ -499,9 +507,43 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
} }
} }
}() }()
return func() { return func() {
close(closeCh) closeCtx()
<-closedCh wg.Wait()
<-stoppedCh }
}
// startEventStream establishes an event stream. This will keep retrying until the stream has been
// established. It can only return nil when the context is canceled.
func (api *BeaconLightApi) startEventStream(ctx context.Context, listener *HeadEventListener) *eventsource.Stream {
for retry := true; retry; retry = ctxSleep(ctx, 5*time.Second) {
path := "/eth/v1/events?topics=head&topics=light_client_optimistic_update&topics=light_client_finality_update"
req, err := http.NewRequestWithContext(ctx, "GET", api.url+path, nil)
if err != nil {
listener.OnError(fmt.Errorf("error creating event subscription request: %v", err))
continue
}
for k, v := range api.customHeaders {
req.Header.Set(k, v)
}
stream, err := eventsource.SubscribeWithRequest("", req)
if err != nil {
listener.OnError(fmt.Errorf("error creating event subscription: %v", err))
continue
}
return stream
}
return nil
}
func ctxSleep(ctx context.Context, timeout time.Duration) (ok bool) {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case <-timer.C:
return true
case <-ctx.Done():
return false
} }
} }

Loading…
Cancel
Save