beacon/light/sync: print error log if checkpoint retrieval fails (#29532)

Co-authored-by: Felix Lange <fjl@twurst.com>
pull/29634/head
Felföldi Zsolt 7 months ago committed by GitHub
parent 1ec7af2612
commit e6689fe090
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 10
      beacon/blsync/block_sync_test.go
  2. 9
      beacon/light/api/api_server.go
  3. 17
      beacon/light/api/light_api.go
  4. 4
      beacon/light/request/scheduler.go
  5. 4
      beacon/light/request/scheduler_test.go
  6. 7
      beacon/light/request/server.go
  7. 1
      beacon/light/request/server_test.go
  8. 14
      beacon/light/sync/head_sync_test.go
  9. 4
      beacon/light/sync/test_helpers.go
  10. 6
      beacon/light/sync/types.go
  11. 139
      beacon/light/sync/update_sync.go

@ -28,8 +28,8 @@ import (
) )
var ( var (
testServer1 = "testServer1" testServer1 = testServer("testServer1")
testServer2 = "testServer2" testServer2 = testServer("testServer2")
testBlock1 = types.NewBeaconBlock(&deneb.BeaconBlock{ testBlock1 = types.NewBeaconBlock(&deneb.BeaconBlock{
Slot: 123, Slot: 123,
@ -51,6 +51,12 @@ var (
}) })
) )
type testServer string
func (t testServer) Name() string {
return string(t)
}
func TestBlockSync(t *testing.T) { func TestBlockSync(t *testing.T) {
ht := &testHeadTracker{} ht := &testHeadTracker{}
blockSync := newBeaconBlockSync(ht) blockSync := newBeaconBlockSync(ht)

@ -73,8 +73,10 @@ func (s *ApiServer) SendRequest(id request.ID, req request.Request) {
r.Updates, r.Committees, err = s.api.GetBestUpdatesAndCommittees(data.FirstPeriod, data.Count) r.Updates, r.Committees, err = s.api.GetBestUpdatesAndCommittees(data.FirstPeriod, data.Count)
resp = r resp = r
case sync.ReqHeader: case sync.ReqHeader:
var r sync.RespHeader
log.Debug("Beacon API: requesting header", "reqid", id, "hash", common.Hash(data)) log.Debug("Beacon API: requesting header", "reqid", id, "hash", common.Hash(data))
resp, err = s.api.GetHeader(common.Hash(data)) r.Header, r.Canonical, r.Finalized, err = s.api.GetHeader(common.Hash(data))
resp = r
case sync.ReqCheckpointData: case sync.ReqCheckpointData:
log.Debug("Beacon API: requesting checkpoint data", "reqid", id, "hash", common.Hash(data)) log.Debug("Beacon API: requesting checkpoint data", "reqid", id, "hash", common.Hash(data))
resp, err = s.api.GetCheckpointData(common.Hash(data)) resp, err = s.api.GetCheckpointData(common.Hash(data))
@ -101,3 +103,8 @@ func (s *ApiServer) Unsubscribe() {
s.unsubscribe = nil s.unsubscribe = nil
} }
} }
// Name implements request.Server
func (s *ApiServer) Name() string {
return s.api.url
}

@ -291,7 +291,9 @@ func decodeFinalityUpdate(enc []byte) (types.FinalityUpdate, error) {
// GetHeader fetches and validates the beacon header with the given blockRoot. // GetHeader fetches and validates the beacon header with the given blockRoot.
// If blockRoot is null hash then the latest head header is fetched. // If blockRoot is null hash then the latest head header is fetched.
func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, error) { // The values of the canonical and finalized flags are also returned. Note that
// these flags are not validated.
func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, bool, bool, error) {
var blockId string var blockId string
if blockRoot == (common.Hash{}) { if blockRoot == (common.Hash{}) {
blockId = "head" blockId = "head"
@ -300,11 +302,12 @@ func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, error
} }
resp, err := api.httpGetf("/eth/v1/beacon/headers/%s", blockId) resp, err := api.httpGetf("/eth/v1/beacon/headers/%s", blockId)
if err != nil { if err != nil {
return types.Header{}, err return types.Header{}, false, false, err
} }
var data struct { var data struct {
Data struct { Finalized bool `json:"finalized"`
Data struct {
Root common.Hash `json:"root"` Root common.Hash `json:"root"`
Canonical bool `json:"canonical"` Canonical bool `json:"canonical"`
Header struct { Header struct {
@ -314,16 +317,16 @@ func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, error
} `json:"data"` } `json:"data"`
} }
if err := json.Unmarshal(resp, &data); err != nil { if err := json.Unmarshal(resp, &data); err != nil {
return types.Header{}, err return types.Header{}, false, false, err
} }
header := data.Data.Header.Message header := data.Data.Header.Message
if blockRoot == (common.Hash{}) { if blockRoot == (common.Hash{}) {
blockRoot = data.Data.Root blockRoot = data.Data.Root
} }
if header.Hash() != blockRoot { if header.Hash() != blockRoot {
return types.Header{}, errors.New("retrieved beacon header root does not match") return types.Header{}, false, false, errors.New("retrieved beacon header root does not match")
} }
return header, nil return header, data.Data.Canonical, data.Finalized, nil
} }
// GetCheckpointData fetches and validates bootstrap data belonging to the given checkpoint. // GetCheckpointData fetches and validates bootstrap data belonging to the given checkpoint.
@ -446,7 +449,7 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
defer wg.Done() defer wg.Done()
// Request initial data. // Request initial data.
if head, err := api.GetHeader(common.Hash{}); err == nil { if head, _, _, err := api.GetHeader(common.Hash{}); err == nil {
listener.OnNewHead(head.Slot, head.Hash()) listener.OnNewHead(head.Slot, head.Hash())
} }
if signedHead, err := api.GetOptimisticHeadUpdate(); err == nil { if signedHead, err := api.GetOptimisticHeadUpdate(); err == nil {

@ -93,7 +93,9 @@ type (
// the modules that do not interact with them directly. // the modules that do not interact with them directly.
// In order to make module testing easier, Server interface is used in // In order to make module testing easier, Server interface is used in
// events and modules. // events and modules.
Server any Server interface {
Name() string
}
Request any Request any
Response any Response any
ID uint64 ID uint64

@ -70,6 +70,10 @@ type testServer struct {
canRequest int canRequest int
} }
func (s *testServer) Name() string {
return ""
}
func (s *testServer) subscribe(eventCb func(Event)) { func (s *testServer) subscribe(eventCb func(Event)) {
s.eventCb = eventCb s.eventCb = eventCb
} }

@ -58,6 +58,7 @@ const (
// EvResponse or EvFail. Additionally, it may also send application-defined // EvResponse or EvFail. Additionally, it may also send application-defined
// events that the Modules can interpret. // events that the Modules can interpret.
type requestServer interface { type requestServer interface {
Name() string
Subscribe(eventCallback func(Event)) Subscribe(eventCallback func(Event))
SendRequest(ID, Request) SendRequest(ID, Request)
Unsubscribe() Unsubscribe()
@ -69,6 +70,7 @@ type requestServer interface {
// limit the number of parallel in-flight requests and temporarily disable // limit the number of parallel in-flight requests and temporarily disable
// new requests based on timeouts and response failures. // new requests based on timeouts and response failures.
type server interface { type server interface {
Server
subscribe(eventCallback func(Event)) subscribe(eventCallback func(Event))
canRequestNow() bool canRequestNow() bool
sendRequest(Request) ID sendRequest(Request) ID
@ -138,6 +140,11 @@ type serverWithTimeout struct {
lastID ID lastID ID
} }
// Name implements request.Server
func (s *serverWithTimeout) Name() string {
return s.parent.Name()
}
// init initializes serverWithTimeout // init initializes serverWithTimeout
func (s *serverWithTimeout) init(clock mclock.Clock) { func (s *serverWithTimeout) init(clock mclock.Clock) {
s.clock = clock s.clock = clock

@ -153,6 +153,7 @@ type testRequestServer struct {
eventCb func(Event) eventCb func(Event)
} }
func (rs *testRequestServer) Name() string { return "" }
func (rs *testRequestServer) Subscribe(eventCb func(Event)) { rs.eventCb = eventCb } func (rs *testRequestServer) Subscribe(eventCb func(Event)) { rs.eventCb = eventCb }
func (rs *testRequestServer) SendRequest(ID, Request) {} func (rs *testRequestServer) SendRequest(ID, Request) {}
func (rs *testRequestServer) Unsubscribe() {} func (rs *testRequestServer) Unsubscribe() {}

@ -24,10 +24,10 @@ import (
) )
var ( var (
testServer1 = "testServer1" testServer1 = testServer("testServer1")
testServer2 = "testServer2" testServer2 = testServer("testServer2")
testServer3 = "testServer3" testServer3 = testServer("testServer3")
testServer4 = "testServer4" testServer4 = testServer("testServer4")
testHead0 = types.HeadInfo{} testHead0 = types.HeadInfo{}
testHead1 = types.HeadInfo{Slot: 123, BlockRoot: common.Hash{1}} testHead1 = types.HeadInfo{Slot: 123, BlockRoot: common.Hash{1}}
@ -42,6 +42,12 @@ var (
testSHead4 = types.SignedHeader{SignatureSlot: 0x6444, Header: types.Header{Slot: 0x6443, StateRoot: common.Hash{4}}} testSHead4 = types.SignedHeader{SignatureSlot: 0x6444, Header: types.Header{Slot: 0x6443, StateRoot: common.Hash{4}}}
) )
type testServer string
func (t testServer) Name() string {
return string(t)
}
func TestValidatedHead(t *testing.T) { func TestValidatedHead(t *testing.T) {
chain := &TestCommitteeChain{} chain := &TestCommitteeChain{}
ht := &TestHeadTracker{} ht := &TestHeadTracker{}

@ -75,7 +75,7 @@ func (ts *TestScheduler) Run(testIndex int, exp ...any) {
if count == 0 { if count == 0 {
continue continue
} }
ts.t.Errorf("Missing %d Server.Fail(s) from server %s in test case #%d", count, server.(string), testIndex) ts.t.Errorf("Missing %d Server.Fail(s) from server %s in test case #%d", count, server.Name(), testIndex)
} }
if !reflect.DeepEqual(ts.sent[testIndex], expReqs) { if !reflect.DeepEqual(ts.sent[testIndex], expReqs) {
@ -104,7 +104,7 @@ func (ts *TestScheduler) Send(server request.Server, req request.Request) reques
func (ts *TestScheduler) Fail(server request.Server, desc string) { func (ts *TestScheduler) Fail(server request.Server, desc string) {
if ts.expFail[server] == 0 { if ts.expFail[server] == 0 {
ts.t.Errorf("Unexpected Fail from server %s in test case #%d: %s", server.(string), ts.testIndex, desc) ts.t.Errorf("Unexpected Fail from server %s in test case #%d: %s", server.Name(), ts.testIndex, desc)
return return
} }
ts.expFail[server]-- ts.expFail[server]--

@ -36,7 +36,11 @@ type (
Updates []*types.LightClientUpdate Updates []*types.LightClientUpdate
Committees []*types.SerializedSyncCommittee Committees []*types.SerializedSyncCommittee
} }
ReqHeader common.Hash ReqHeader common.Hash
RespHeader struct {
Header types.Header
Canonical, Finalized bool
}
ReqCheckpointData common.Hash ReqCheckpointData common.Hash
ReqBeaconBlock common.Hash ReqBeaconBlock common.Hash
) )

@ -21,6 +21,7 @@ import (
"github.com/ethereum/go-ethereum/beacon/light" "github.com/ethereum/go-ethereum/beacon/light"
"github.com/ethereum/go-ethereum/beacon/light/request" "github.com/ethereum/go-ethereum/beacon/light/request"
"github.com/ethereum/go-ethereum/beacon/params"
"github.com/ethereum/go-ethereum/beacon/types" "github.com/ethereum/go-ethereum/beacon/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
@ -42,6 +43,31 @@ type CheckpointInit struct {
checkpointHash common.Hash checkpointHash common.Hash
locked request.ServerAndID locked request.ServerAndID
initialized bool initialized bool
// per-server state is used to track the state of requesting checkpoint header
// info. Part of this info (canonical and finalized state) is not validated
// and therefore it is requested from each server separately after it has
// reported a missing checkpoint (which is also not validated info).
serverState map[request.Server]serverState
// the following fields are used to determine whether the checkpoint is on
// epoch boundary. This information is validated and therefore stored globally.
parentHash common.Hash
hasEpochInfo, epochBoundary bool
cpSlot, parentSlot uint64
}
const (
ssDefault = iota // no action yet or checkpoint requested
ssNeedHeader // checkpoint req failed, need cp header
ssHeaderRequested // cp header requested
ssNeedParent // cp header slot %32 != 0, need parent to check epoch boundary
ssParentRequested // cp parent header requested
ssPrintStatus // has all necessary info, print log message if init still not successful
ssDone // log message printed, no more action required
)
type serverState struct {
state int
hasHeader, canonical, finalized bool // stored per server because not validated
} }
// NewCheckpointInit creates a new CheckpointInit. // NewCheckpointInit creates a new CheckpointInit.
@ -49,40 +75,109 @@ func NewCheckpointInit(chain committeeChain, checkpointHash common.Hash) *Checkp
return &CheckpointInit{ return &CheckpointInit{
chain: chain, chain: chain,
checkpointHash: checkpointHash, checkpointHash: checkpointHash,
serverState: make(map[request.Server]serverState),
} }
} }
// Process implements request.Module. // Process implements request.Module.
func (s *CheckpointInit) Process(requester request.Requester, events []request.Event) { func (s *CheckpointInit) Process(requester request.Requester, events []request.Event) {
if s.initialized {
return
}
for _, event := range events { for _, event := range events {
if !event.IsRequestEvent() { switch event.Type {
continue case request.EvResponse, request.EvFail, request.EvTimeout:
} sid, req, resp := event.RequestInfo()
sid, req, resp := event.RequestInfo() if s.locked == sid {
if s.locked == sid { s.locked = request.ServerAndID{}
s.locked = request.ServerAndID{}
}
if resp != nil {
if checkpoint := resp.(*types.BootstrapData); checkpoint.Header.Hash() == common.Hash(req.(ReqCheckpointData)) {
s.chain.CheckpointInit(*checkpoint)
s.initialized = true
return
} }
if event.Type == request.EvTimeout {
requester.Fail(event.Server, "invalid checkpoint data") continue
}
switch s.serverState[sid.Server].state {
case ssDefault:
if resp != nil {
if checkpoint := resp.(*types.BootstrapData); checkpoint.Header.Hash() == common.Hash(req.(ReqCheckpointData)) {
s.chain.CheckpointInit(*checkpoint)
s.initialized = true
return
}
requester.Fail(event.Server, "invalid checkpoint data")
}
s.serverState[sid.Server] = serverState{state: ssNeedHeader}
case ssHeaderRequested:
if resp == nil {
s.serverState[sid.Server] = serverState{state: ssPrintStatus}
continue
}
newState := serverState{
hasHeader: true,
canonical: resp.(RespHeader).Canonical,
finalized: resp.(RespHeader).Finalized,
}
s.cpSlot, s.parentHash = resp.(RespHeader).Header.Slot, resp.(RespHeader).Header.ParentRoot
if s.cpSlot%params.EpochLength == 0 {
s.hasEpochInfo, s.epochBoundary = true, true
}
if s.hasEpochInfo {
newState.state = ssPrintStatus
} else {
newState.state = ssNeedParent
}
s.serverState[sid.Server] = newState
case ssParentRequested:
s.parentSlot = resp.(RespHeader).Header.Slot
s.hasEpochInfo, s.epochBoundary = true, s.cpSlot/params.EpochLength > s.parentSlot/params.EpochLength
newState := s.serverState[sid.Server]
newState.state = ssPrintStatus
s.serverState[sid.Server] = newState
}
case request.EvUnregistered:
delete(s.serverState, event.Server)
} }
} }
// start a request if possible // start a request if possible
if s.initialized || s.locked != (request.ServerAndID{}) { for _, server := range requester.CanSendTo() {
return switch s.serverState[server].state {
case ssDefault:
if s.locked == (request.ServerAndID{}) {
id := requester.Send(server, ReqCheckpointData(s.checkpointHash))
s.locked = request.ServerAndID{Server: server, ID: id}
}
case ssNeedHeader:
requester.Send(server, ReqHeader(s.checkpointHash))
newState := s.serverState[server]
newState.state = ssHeaderRequested
s.serverState[server] = newState
case ssNeedParent:
requester.Send(server, ReqHeader(s.parentHash))
newState := s.serverState[server]
newState.state = ssParentRequested
s.serverState[server] = newState
}
} }
cs := requester.CanSendTo() // print log message if necessary
if len(cs) == 0 { for server, state := range s.serverState {
return if state.state != ssPrintStatus {
continue
}
switch {
case !state.hasHeader:
log.Error("blsync: checkpoint block is not available, reported as unknown", "server", server.Name())
case !state.canonical:
log.Error("blsync: checkpoint block is not available, reported as non-canonical", "server", server.Name())
case !s.hasEpochInfo:
// should be available if hasHeader is true and state is ssPrintStatus
panic("checkpoint epoch info not available when printing retrieval status")
case !s.epochBoundary:
log.Error("blsync: checkpoint block is not first of epoch", "slot", s.cpSlot, "parent", s.parentSlot, "server", server.Name())
case !state.finalized:
log.Error("blsync: checkpoint block is reported as non-finalized", "server", server.Name())
default:
log.Error("blsync: checkpoint not available, but reported as finalized; specified checkpoint hash might be too old", "server", server.Name())
}
s.serverState[server] = serverState{state: ssDone}
} }
server := cs[0]
id := requester.Send(server, ReqCheckpointData(s.checkpointHash))
s.locked = request.ServerAndID{Server: server, ID: id}
} }
// ForwardUpdateSync implements request.Module; it fetches updates between the // ForwardUpdateSync implements request.Module; it fetches updates between the

Loading…
Cancel
Save