beacon/light: fix shutdown issues (#29946)

* beacon/light/request: add server test for event after unsubscribe

* beacon/light/api: fixed double stream.Close()

* beacon/light/request: add checks for nil event callback function

* beacon/light/request: unlock server mutex while unsubscribing from parent
pull/29907/head^2
Felföldi Zsolt 5 months ago committed by GitHub
parent 69351e8b0f
commit 86150af2e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 3
      beacon/light/api/light_api.go
  2. 31
      beacon/light/request/server.go
  3. 31
      beacon/light/request/server_test.go

@ -494,9 +494,6 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
for { for {
select { select {
case <-ctx.Done():
stream.Close()
case event, ok := <-stream.Events: case event, ok := <-stream.Events:
if !ok { if !ok {
log.Trace("Event stream closed") log.Trace("Event stream closed")

@ -186,10 +186,14 @@ func (s *serverWithTimeout) eventCallback(event Event) {
// call will just do nothing // call will just do nothing
timer.Stop() timer.Stop()
delete(s.timeouts, id) delete(s.timeouts, id)
s.childEventCb(event) if s.childEventCb != nil {
s.childEventCb(event)
}
} }
default: default:
s.childEventCb(event) if s.childEventCb != nil {
s.childEventCb(event)
}
} }
} }
@ -211,25 +215,27 @@ func (s *serverWithTimeout) startTimeout(reqData RequestResponse) {
delete(s.timeouts, id) delete(s.timeouts, id)
childEventCb := s.childEventCb childEventCb := s.childEventCb
s.lock.Unlock() s.lock.Unlock()
childEventCb(Event{Type: EvFail, Data: reqData}) if childEventCb != nil {
childEventCb(Event{Type: EvFail, Data: reqData})
}
}) })
childEventCb := s.childEventCb childEventCb := s.childEventCb
s.lock.Unlock() s.lock.Unlock()
childEventCb(Event{Type: EvTimeout, Data: reqData}) if childEventCb != nil {
childEventCb(Event{Type: EvTimeout, Data: reqData})
}
}) })
} }
// unsubscribe stops all goroutines associated with the server. // unsubscribe stops all goroutines associated with the server.
func (s *serverWithTimeout) unsubscribe() { func (s *serverWithTimeout) unsubscribe() {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock()
for _, timer := range s.timeouts { for _, timer := range s.timeouts {
if timer != nil { if timer != nil {
timer.Stop() timer.Stop()
} }
} }
s.childEventCb = nil s.lock.Unlock()
s.parent.Unsubscribe() s.parent.Unsubscribe()
} }
@ -328,10 +334,10 @@ func (s *serverWithLimits) eventCallback(event Event) {
} }
childEventCb := s.childEventCb childEventCb := s.childEventCb
s.lock.Unlock() s.lock.Unlock()
if passEvent { if passEvent && childEventCb != nil {
childEventCb(event) childEventCb(event)
} }
if sendCanRequestAgain { if sendCanRequestAgain && childEventCb != nil {
childEventCb(Event{Type: EvCanRequestAgain}) childEventCb(Event{Type: EvCanRequestAgain})
} }
} }
@ -347,13 +353,12 @@ func (s *serverWithLimits) sendRequest(request Request) (reqId ID) {
// unsubscribe stops all goroutines associated with the server. // unsubscribe stops all goroutines associated with the server.
func (s *serverWithLimits) unsubscribe() { func (s *serverWithLimits) unsubscribe() {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock()
if s.delayTimer != nil { if s.delayTimer != nil {
s.delayTimer.Stop() s.delayTimer.Stop()
s.delayTimer = nil s.delayTimer = nil
} }
s.childEventCb = nil s.childEventCb = nil
s.lock.Unlock()
s.serverWithTimeout.unsubscribe() s.serverWithTimeout.unsubscribe()
} }
@ -383,7 +388,7 @@ func (s *serverWithLimits) canRequestNow() bool {
} }
childEventCb := s.childEventCb childEventCb := s.childEventCb
s.lock.Unlock() s.lock.Unlock()
if sendCanRequestAgain { if sendCanRequestAgain && childEventCb != nil {
childEventCb(Event{Type: EvCanRequestAgain}) childEventCb(Event{Type: EvCanRequestAgain})
} }
return canRequest return canRequest
@ -415,7 +420,7 @@ func (s *serverWithLimits) delay(delay time.Duration) {
} }
childEventCb := s.childEventCb childEventCb := s.childEventCb
s.lock.Unlock() s.lock.Unlock()
if sendCanRequestAgain { if sendCanRequestAgain && childEventCb != nil {
childEventCb(Event{Type: EvCanRequestAgain}) childEventCb(Event{Type: EvCanRequestAgain})
} }
}) })

@ -51,6 +51,7 @@ func TestServerEvents(t *testing.T) {
expEvent(EvFail) expEvent(EvFail)
rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 1, Request: testRequest, Response: testResponse}}) rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 1, Request: testRequest, Response: testResponse}})
expEvent(nil) expEvent(nil)
srv.unsubscribe()
} }
func TestServerParallel(t *testing.T) { func TestServerParallel(t *testing.T) {
@ -129,9 +130,7 @@ func TestServerEventRateLimit(t *testing.T) {
srv := NewServer(rs, clock) srv := NewServer(rs, clock)
var eventCount int var eventCount int
srv.subscribe(func(event Event) { srv.subscribe(func(event Event) {
if !event.IsRequestEvent() { eventCount++
eventCount++
}
}) })
expEvents := func(send, expAllowed int) { expEvents := func(send, expAllowed int) {
eventCount = 0 eventCount = 0
@ -147,6 +146,30 @@ func TestServerEventRateLimit(t *testing.T) {
expEvents(5, 1) expEvents(5, 1)
clock.Run(maxServerEventRate * maxServerEventBuffer * 2) clock.Run(maxServerEventRate * maxServerEventBuffer * 2)
expEvents(maxServerEventBuffer+5, maxServerEventBuffer) expEvents(maxServerEventBuffer+5, maxServerEventBuffer)
srv.unsubscribe()
}
func TestServerUnsubscribe(t *testing.T) {
rs := &testRequestServer{}
clock := &mclock.Simulated{}
srv := NewServer(rs, clock)
var eventCount int
srv.subscribe(func(event Event) {
eventCount++
})
eventCb := rs.eventCb
eventCb(Event{Type: testEventType})
if eventCount != 1 {
t.Errorf("Server event callback not called before unsubscribe")
}
srv.unsubscribe()
if rs.eventCb != nil {
t.Errorf("Server event callback not removed after unsubscribe")
}
eventCb(Event{Type: testEventType})
if eventCount != 1 {
t.Errorf("Server event callback called after unsubscribe")
}
} }
type testRequestServer struct { type testRequestServer struct {
@ -156,4 +179,4 @@ type testRequestServer struct {
func (rs *testRequestServer) Name() string { return "" } func (rs *testRequestServer) Name() string { return "" }
func (rs *testRequestServer) Subscribe(eventCb func(Event)) { rs.eventCb = eventCb } func (rs *testRequestServer) Subscribe(eventCb func(Event)) { rs.eventCb = eventCb }
func (rs *testRequestServer) SendRequest(ID, Request) {} func (rs *testRequestServer) SendRequest(ID, Request) {}
func (rs *testRequestServer) Unsubscribe() {} func (rs *testRequestServer) Unsubscribe() { rs.eventCb = nil }

Loading…
Cancel
Save