From e6689fe090cc56cb3f0c1948c5e5356ea1d20c1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felf=C3=B6ldi=20Zsolt?= Date: Mon, 22 Apr 2024 13:19:42 +0200 Subject: [PATCH] beacon/light/sync: print error log if checkpoint retrieval fails (#29532) Co-authored-by: Felix Lange --- beacon/blsync/block_sync_test.go | 10 +- beacon/light/api/api_server.go | 9 +- beacon/light/api/light_api.go | 17 +-- beacon/light/request/scheduler.go | 4 +- beacon/light/request/scheduler_test.go | 4 + beacon/light/request/server.go | 7 ++ beacon/light/request/server_test.go | 1 + beacon/light/sync/head_sync_test.go | 14 ++- beacon/light/sync/test_helpers.go | 4 +- beacon/light/sync/types.go | 6 +- beacon/light/sync/update_sync.go | 139 +++++++++++++++++++++---- 11 files changed, 175 insertions(+), 40 deletions(-) diff --git a/beacon/blsync/block_sync_test.go b/beacon/blsync/block_sync_test.go index 73ae89ae73..0525e95a89 100644 --- a/beacon/blsync/block_sync_test.go +++ b/beacon/blsync/block_sync_test.go @@ -28,8 +28,8 @@ import ( ) var ( - testServer1 = "testServer1" - testServer2 = "testServer2" + testServer1 = testServer("testServer1") + testServer2 = testServer("testServer2") testBlock1 = types.NewBeaconBlock(&deneb.BeaconBlock{ Slot: 123, @@ -51,6 +51,12 @@ var ( }) ) +type testServer string + +func (t testServer) Name() string { + return string(t) +} + func TestBlockSync(t *testing.T) { ht := &testHeadTracker{} blockSync := newBeaconBlockSync(ht) diff --git a/beacon/light/api/api_server.go b/beacon/light/api/api_server.go index da044f4b2d..4b885cb8e1 100755 --- a/beacon/light/api/api_server.go +++ b/beacon/light/api/api_server.go @@ -73,8 +73,10 @@ func (s *ApiServer) SendRequest(id request.ID, req request.Request) { r.Updates, r.Committees, err = s.api.GetBestUpdatesAndCommittees(data.FirstPeriod, data.Count) resp = r case sync.ReqHeader: + var r sync.RespHeader log.Debug("Beacon API: requesting header", "reqid", id, "hash", common.Hash(data)) - resp, err = s.api.GetHeader(common.Hash(data)) + r.Header, r.Canonical, r.Finalized, err = s.api.GetHeader(common.Hash(data)) + resp = r case sync.ReqCheckpointData: log.Debug("Beacon API: requesting checkpoint data", "reqid", id, "hash", common.Hash(data)) resp, err = s.api.GetCheckpointData(common.Hash(data)) @@ -101,3 +103,8 @@ func (s *ApiServer) Unsubscribe() { s.unsubscribe = nil } } + +// Name implements request.Server +func (s *ApiServer) Name() string { + return s.api.url +} diff --git a/beacon/light/api/light_api.go b/beacon/light/api/light_api.go index ceb4261c3c..6892407caf 100755 --- a/beacon/light/api/light_api.go +++ b/beacon/light/api/light_api.go @@ -291,7 +291,9 @@ func decodeFinalityUpdate(enc []byte) (types.FinalityUpdate, error) { // GetHeader fetches and validates the beacon header with the given blockRoot. // If blockRoot is null hash then the latest head header is fetched. -func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, error) { +// The values of the canonical and finalized flags are also returned. Note that +// these flags are not validated. +func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, bool, bool, error) { var blockId string if blockRoot == (common.Hash{}) { blockId = "head" @@ -300,11 +302,12 @@ func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, error } resp, err := api.httpGetf("/eth/v1/beacon/headers/%s", blockId) if err != nil { - return types.Header{}, err + return types.Header{}, false, false, err } var data struct { - Data struct { + Finalized bool `json:"finalized"` + Data struct { Root common.Hash `json:"root"` Canonical bool `json:"canonical"` Header struct { @@ -314,16 +317,16 @@ func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, error } `json:"data"` } if err := json.Unmarshal(resp, &data); err != nil { - return types.Header{}, err + return types.Header{}, false, false, err } header := data.Data.Header.Message if blockRoot == (common.Hash{}) { blockRoot = data.Data.Root } if header.Hash() != blockRoot { - return types.Header{}, errors.New("retrieved beacon header root does not match") + return types.Header{}, false, false, errors.New("retrieved beacon header root does not match") } - return header, nil + return header, data.Data.Canonical, data.Finalized, nil } // GetCheckpointData fetches and validates bootstrap data belonging to the given checkpoint. @@ -446,7 +449,7 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() defer wg.Done() // Request initial data. - if head, err := api.GetHeader(common.Hash{}); err == nil { + if head, _, _, err := api.GetHeader(common.Hash{}); err == nil { listener.OnNewHead(head.Slot, head.Hash()) } if signedHead, err := api.GetOptimisticHeadUpdate(); err == nil { diff --git a/beacon/light/request/scheduler.go b/beacon/light/request/scheduler.go index 4b8f6ce570..e80daf805e 100644 --- a/beacon/light/request/scheduler.go +++ b/beacon/light/request/scheduler.go @@ -93,7 +93,9 @@ type ( // the modules that do not interact with them directly. // In order to make module testing easier, Server interface is used in // events and modules. - Server any + Server interface { + Name() string + } Request any Response any ID uint64 diff --git a/beacon/light/request/scheduler_test.go b/beacon/light/request/scheduler_test.go index 7d5a567078..5cd4965644 100644 --- a/beacon/light/request/scheduler_test.go +++ b/beacon/light/request/scheduler_test.go @@ -70,6 +70,10 @@ type testServer struct { canRequest int } +func (s *testServer) Name() string { + return "" +} + func (s *testServer) subscribe(eventCb func(Event)) { s.eventCb = eventCb } diff --git a/beacon/light/request/server.go b/beacon/light/request/server.go index bcb8744b38..9f3b09b81e 100644 --- a/beacon/light/request/server.go +++ b/beacon/light/request/server.go @@ -58,6 +58,7 @@ const ( // EvResponse or EvFail. Additionally, it may also send application-defined // events that the Modules can interpret. type requestServer interface { + Name() string Subscribe(eventCallback func(Event)) SendRequest(ID, Request) Unsubscribe() @@ -69,6 +70,7 @@ type requestServer interface { // limit the number of parallel in-flight requests and temporarily disable // new requests based on timeouts and response failures. type server interface { + Server subscribe(eventCallback func(Event)) canRequestNow() bool sendRequest(Request) ID @@ -138,6 +140,11 @@ type serverWithTimeout struct { lastID ID } +// Name implements request.Server +func (s *serverWithTimeout) Name() string { + return s.parent.Name() +} + // init initializes serverWithTimeout func (s *serverWithTimeout) init(clock mclock.Clock) { s.clock = clock diff --git a/beacon/light/request/server_test.go b/beacon/light/request/server_test.go index b6b9edf9a0..38629cb8c4 100644 --- a/beacon/light/request/server_test.go +++ b/beacon/light/request/server_test.go @@ -153,6 +153,7 @@ type testRequestServer struct { eventCb func(Event) } +func (rs *testRequestServer) Name() string { return "" } func (rs *testRequestServer) Subscribe(eventCb func(Event)) { rs.eventCb = eventCb } func (rs *testRequestServer) SendRequest(ID, Request) {} func (rs *testRequestServer) Unsubscribe() {} diff --git a/beacon/light/sync/head_sync_test.go b/beacon/light/sync/head_sync_test.go index 2f75487f16..a2870b2732 100644 --- a/beacon/light/sync/head_sync_test.go +++ b/beacon/light/sync/head_sync_test.go @@ -24,10 +24,10 @@ import ( ) var ( - testServer1 = "testServer1" - testServer2 = "testServer2" - testServer3 = "testServer3" - testServer4 = "testServer4" + testServer1 = testServer("testServer1") + testServer2 = testServer("testServer2") + testServer3 = testServer("testServer3") + testServer4 = testServer("testServer4") testHead0 = types.HeadInfo{} testHead1 = types.HeadInfo{Slot: 123, BlockRoot: common.Hash{1}} @@ -42,6 +42,12 @@ var ( testSHead4 = types.SignedHeader{SignatureSlot: 0x6444, Header: types.Header{Slot: 0x6443, StateRoot: common.Hash{4}}} ) +type testServer string + +func (t testServer) Name() string { + return string(t) +} + func TestValidatedHead(t *testing.T) { chain := &TestCommitteeChain{} ht := &TestHeadTracker{} diff --git a/beacon/light/sync/test_helpers.go b/beacon/light/sync/test_helpers.go index a1ca2b5909..9f57ceebe4 100644 --- a/beacon/light/sync/test_helpers.go +++ b/beacon/light/sync/test_helpers.go @@ -75,7 +75,7 @@ func (ts *TestScheduler) Run(testIndex int, exp ...any) { if count == 0 { continue } - ts.t.Errorf("Missing %d Server.Fail(s) from server %s in test case #%d", count, server.(string), testIndex) + ts.t.Errorf("Missing %d Server.Fail(s) from server %s in test case #%d", count, server.Name(), testIndex) } if !reflect.DeepEqual(ts.sent[testIndex], expReqs) { @@ -104,7 +104,7 @@ func (ts *TestScheduler) Send(server request.Server, req request.Request) reques func (ts *TestScheduler) Fail(server request.Server, desc string) { if ts.expFail[server] == 0 { - ts.t.Errorf("Unexpected Fail from server %s in test case #%d: %s", server.(string), ts.testIndex, desc) + ts.t.Errorf("Unexpected Fail from server %s in test case #%d: %s", server.Name(), ts.testIndex, desc) return } ts.expFail[server]-- diff --git a/beacon/light/sync/types.go b/beacon/light/sync/types.go index 6449ae842d..8aa4c95f46 100644 --- a/beacon/light/sync/types.go +++ b/beacon/light/sync/types.go @@ -36,7 +36,11 @@ type ( Updates []*types.LightClientUpdate Committees []*types.SerializedSyncCommittee } - ReqHeader common.Hash + ReqHeader common.Hash + RespHeader struct { + Header types.Header + Canonical, Finalized bool + } ReqCheckpointData common.Hash ReqBeaconBlock common.Hash ) diff --git a/beacon/light/sync/update_sync.go b/beacon/light/sync/update_sync.go index 533e470fb0..71801b1b60 100644 --- a/beacon/light/sync/update_sync.go +++ b/beacon/light/sync/update_sync.go @@ -21,6 +21,7 @@ import ( "github.com/ethereum/go-ethereum/beacon/light" "github.com/ethereum/go-ethereum/beacon/light/request" + "github.com/ethereum/go-ethereum/beacon/params" "github.com/ethereum/go-ethereum/beacon/types" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -42,6 +43,31 @@ type CheckpointInit struct { checkpointHash common.Hash locked request.ServerAndID initialized bool + // per-server state is used to track the state of requesting checkpoint header + // info. Part of this info (canonical and finalized state) is not validated + // and therefore it is requested from each server separately after it has + // reported a missing checkpoint (which is also not validated info). + serverState map[request.Server]serverState + // the following fields are used to determine whether the checkpoint is on + // epoch boundary. This information is validated and therefore stored globally. + parentHash common.Hash + hasEpochInfo, epochBoundary bool + cpSlot, parentSlot uint64 +} + +const ( + ssDefault = iota // no action yet or checkpoint requested + ssNeedHeader // checkpoint req failed, need cp header + ssHeaderRequested // cp header requested + ssNeedParent // cp header slot %32 != 0, need parent to check epoch boundary + ssParentRequested // cp parent header requested + ssPrintStatus // has all necessary info, print log message if init still not successful + ssDone // log message printed, no more action required +) + +type serverState struct { + state int + hasHeader, canonical, finalized bool // stored per server because not validated } // NewCheckpointInit creates a new CheckpointInit. @@ -49,40 +75,109 @@ func NewCheckpointInit(chain committeeChain, checkpointHash common.Hash) *Checkp return &CheckpointInit{ chain: chain, checkpointHash: checkpointHash, + serverState: make(map[request.Server]serverState), } } // Process implements request.Module. func (s *CheckpointInit) Process(requester request.Requester, events []request.Event) { + if s.initialized { + return + } for _, event := range events { - if !event.IsRequestEvent() { - continue - } - sid, req, resp := event.RequestInfo() - if s.locked == sid { - s.locked = request.ServerAndID{} - } - if resp != nil { - if checkpoint := resp.(*types.BootstrapData); checkpoint.Header.Hash() == common.Hash(req.(ReqCheckpointData)) { - s.chain.CheckpointInit(*checkpoint) - s.initialized = true - return + switch event.Type { + case request.EvResponse, request.EvFail, request.EvTimeout: + sid, req, resp := event.RequestInfo() + if s.locked == sid { + s.locked = request.ServerAndID{} } - - requester.Fail(event.Server, "invalid checkpoint data") + if event.Type == request.EvTimeout { + continue + } + switch s.serverState[sid.Server].state { + case ssDefault: + if resp != nil { + if checkpoint := resp.(*types.BootstrapData); checkpoint.Header.Hash() == common.Hash(req.(ReqCheckpointData)) { + s.chain.CheckpointInit(*checkpoint) + s.initialized = true + return + } + requester.Fail(event.Server, "invalid checkpoint data") + } + s.serverState[sid.Server] = serverState{state: ssNeedHeader} + case ssHeaderRequested: + if resp == nil { + s.serverState[sid.Server] = serverState{state: ssPrintStatus} + continue + } + newState := serverState{ + hasHeader: true, + canonical: resp.(RespHeader).Canonical, + finalized: resp.(RespHeader).Finalized, + } + s.cpSlot, s.parentHash = resp.(RespHeader).Header.Slot, resp.(RespHeader).Header.ParentRoot + if s.cpSlot%params.EpochLength == 0 { + s.hasEpochInfo, s.epochBoundary = true, true + } + if s.hasEpochInfo { + newState.state = ssPrintStatus + } else { + newState.state = ssNeedParent + } + s.serverState[sid.Server] = newState + case ssParentRequested: + s.parentSlot = resp.(RespHeader).Header.Slot + s.hasEpochInfo, s.epochBoundary = true, s.cpSlot/params.EpochLength > s.parentSlot/params.EpochLength + newState := s.serverState[sid.Server] + newState.state = ssPrintStatus + s.serverState[sid.Server] = newState + } + case request.EvUnregistered: + delete(s.serverState, event.Server) } } // start a request if possible - if s.initialized || s.locked != (request.ServerAndID{}) { - return + for _, server := range requester.CanSendTo() { + switch s.serverState[server].state { + case ssDefault: + if s.locked == (request.ServerAndID{}) { + id := requester.Send(server, ReqCheckpointData(s.checkpointHash)) + s.locked = request.ServerAndID{Server: server, ID: id} + } + case ssNeedHeader: + requester.Send(server, ReqHeader(s.checkpointHash)) + newState := s.serverState[server] + newState.state = ssHeaderRequested + s.serverState[server] = newState + case ssNeedParent: + requester.Send(server, ReqHeader(s.parentHash)) + newState := s.serverState[server] + newState.state = ssParentRequested + s.serverState[server] = newState + } } - cs := requester.CanSendTo() - if len(cs) == 0 { - return + // print log message if necessary + for server, state := range s.serverState { + if state.state != ssPrintStatus { + continue + } + switch { + case !state.hasHeader: + log.Error("blsync: checkpoint block is not available, reported as unknown", "server", server.Name()) + case !state.canonical: + log.Error("blsync: checkpoint block is not available, reported as non-canonical", "server", server.Name()) + case !s.hasEpochInfo: + // should be available if hasHeader is true and state is ssPrintStatus + panic("checkpoint epoch info not available when printing retrieval status") + case !s.epochBoundary: + log.Error("blsync: checkpoint block is not first of epoch", "slot", s.cpSlot, "parent", s.parentSlot, "server", server.Name()) + case !state.finalized: + log.Error("blsync: checkpoint block is reported as non-finalized", "server", server.Name()) + default: + log.Error("blsync: checkpoint not available, but reported as finalized; specified checkpoint hash might be too old", "server", server.Name()) + } + s.serverState[server] = serverState{state: ssDone} } - server := cs[0] - id := requester.Send(server, ReqCheckpointData(s.checkpointHash)) - s.locked = request.ServerAndID{Server: server, ID: id} } // ForwardUpdateSync implements request.Module; it fetches updates between the