diff --git a/beacon/blsync/block_sync.go b/beacon/blsync/block_sync.go index 3ab156354d..ff689a922f 100755 --- a/beacon/blsync/block_sync.go +++ b/beacon/blsync/block_sync.go @@ -41,7 +41,7 @@ type beaconBlockSync struct { type headTracker interface { PrefetchHead() types.HeadInfo - ValidatedHead() (types.SignedHeader, bool) + ValidatedOptimistic() (types.OptimisticUpdate, bool) ValidatedFinality() (types.FinalityUpdate, bool) } @@ -66,6 +66,7 @@ func (s *beaconBlockSync) Process(requester request.Requester, events []request. case request.EvResponse, request.EvFail, request.EvTimeout: sid, req, resp := event.RequestInfo() blockRoot := common.Hash(req.(sync.ReqBeaconBlock)) + log.Debug("Beacon block event", "type", event.Type.Name, "hash", blockRoot) if resp != nil { s.recentBlocks.Add(blockRoot, resp.(*types.BeaconBlock)) } @@ -80,8 +81,8 @@ func (s *beaconBlockSync) Process(requester request.Requester, events []request. } s.updateEventFeed() // request validated head block if unavailable and not yet requested - if vh, ok := s.headTracker.ValidatedHead(); ok { - s.tryRequestBlock(requester, vh.Header.Hash(), false) + if vh, ok := s.headTracker.ValidatedOptimistic(); ok { + s.tryRequestBlock(requester, vh.Attested.Hash(), false) } // request prefetch head if the given server has announced it if prefetchHead := s.headTracker.PrefetchHead().BlockRoot; prefetchHead != (common.Hash{}) { @@ -114,12 +115,12 @@ func blockHeadInfo(block *types.BeaconBlock) types.HeadInfo { } func (s *beaconBlockSync) updateEventFeed() { - head, ok := s.headTracker.ValidatedHead() + optimistic, ok := s.headTracker.ValidatedOptimistic() if !ok { return } - validatedHead := head.Header.Hash() + validatedHead := optimistic.Attested.Hash() headBlock, ok := s.recentBlocks.Get(validatedHead) if !ok { return @@ -127,7 +128,7 @@ func (s *beaconBlockSync) updateEventFeed() { var finalizedHash common.Hash if finality, ok := s.headTracker.ValidatedFinality(); ok { - he := head.Header.Epoch() + he := optimistic.Attested.Epoch() fe := finality.Attested.Header.Epoch() switch { case he == fe: @@ -135,10 +136,9 @@ func (s *beaconBlockSync) updateEventFeed() { case he < fe: return case he == fe+1: - parent, ok := s.recentBlocks.Get(head.Header.ParentRoot) + parent, ok := s.recentBlocks.Get(optimistic.Attested.ParentRoot) if !ok || parent.Slot()/params.EpochLength == fe { return // head is at first slot of next epoch, wait for finality update - //TODO: try to fetch finality update directly if subscription does not deliver } } } @@ -156,7 +156,7 @@ func (s *beaconBlockSync) updateEventFeed() { return } s.chainHeadFeed.Send(types.ChainHeadEvent{ - BeaconHead: head.Header, + BeaconHead: optimistic.Attested.Header, Block: execBlock, Finalized: finalizedHash, }) diff --git a/beacon/blsync/block_sync_test.go b/beacon/blsync/block_sync_test.go index 0525e95a89..3d3b9e5e8d 100644 --- a/beacon/blsync/block_sync_test.go +++ b/beacon/blsync/block_sync_test.go @@ -140,8 +140,12 @@ func (h *testHeadTracker) PrefetchHead() types.HeadInfo { return h.prefetch } -func (h *testHeadTracker) ValidatedHead() (types.SignedHeader, bool) { - return h.validated, h.validated.Header != (types.Header{}) +func (h *testHeadTracker) ValidatedOptimistic() (types.OptimisticUpdate, bool) { + return types.OptimisticUpdate{ + Attested: types.HeaderWithExecProof{Header: h.validated.Header}, + Signature: h.validated.Signature, + SignatureSlot: h.validated.SignatureSlot, + }, h.validated.Header != (types.Header{}) } // TODO add test case for finality diff --git a/beacon/blsync/engineclient.go b/beacon/blsync/engineclient.go index 5a2d292a7d..97ef6f5cb8 100644 --- a/beacon/blsync/engineclient.go +++ b/beacon/blsync/engineclient.go @@ -62,6 +62,7 @@ func (ec *engineClient) updateLoop(headCh <-chan types.ChainHeadEvent) { for { select { case <-ec.rootCtx.Done(): + log.Debug("Stopping engine API update loop") return case event := <-headCh: @@ -73,12 +74,14 @@ func (ec *engineClient) updateLoop(headCh <-chan types.ChainHeadEvent) { fork := ec.config.ForkAtEpoch(event.BeaconHead.Epoch()) forkName := strings.ToLower(fork.Name) + log.Debug("Calling NewPayload", "number", event.Block.NumberU64(), "hash", event.Block.Hash()) if status, err := ec.callNewPayload(forkName, event); err == nil { log.Info("Successful NewPayload", "number", event.Block.NumberU64(), "hash", event.Block.Hash(), "status", status) } else { log.Error("Failed NewPayload", "number", event.Block.NumberU64(), "hash", event.Block.Hash(), "error", err) } + log.Debug("Calling ForkchoiceUpdated", "head", event.Block.Hash()) if status, err := ec.callForkchoiceUpdated(forkName, event); err == nil { log.Info("Successful ForkchoiceUpdated", "head", event.Block.Hash(), "status", status) } else { diff --git a/beacon/light/api/api_server.go b/beacon/light/api/api_server.go index 4b885cb8e1..2579854d82 100755 --- a/beacon/light/api/api_server.go +++ b/beacon/light/api/api_server.go @@ -46,13 +46,13 @@ func (s *ApiServer) Subscribe(eventCallback func(event request.Event)) { log.Debug("New head received", "slot", slot, "blockRoot", blockRoot) eventCallback(request.Event{Type: sync.EvNewHead, Data: types.HeadInfo{Slot: slot, BlockRoot: blockRoot}}) }, - OnSignedHead: func(head types.SignedHeader) { - log.Debug("New signed head received", "slot", head.Header.Slot, "blockRoot", head.Header.Hash(), "signerCount", head.Signature.SignerCount()) - eventCallback(request.Event{Type: sync.EvNewSignedHead, Data: head}) + OnOptimistic: func(update types.OptimisticUpdate) { + log.Debug("New optimistic update received", "slot", update.Attested.Slot, "blockRoot", update.Attested.Hash(), "signerCount", update.Signature.SignerCount()) + eventCallback(request.Event{Type: sync.EvNewOptimisticUpdate, Data: update}) }, - OnFinality: func(head types.FinalityUpdate) { - log.Debug("New finality update received", "slot", head.Attested.Slot, "blockRoot", head.Attested.Hash(), "signerCount", head.Signature.SignerCount()) - eventCallback(request.Event{Type: sync.EvNewFinalityUpdate, Data: head}) + OnFinality: func(update types.FinalityUpdate) { + log.Debug("New finality update received", "slot", update.Attested.Slot, "blockRoot", update.Attested.Hash(), "signerCount", update.Signature.SignerCount()) + eventCallback(request.Event{Type: sync.EvNewFinalityUpdate, Data: update}) }, OnError: func(err error) { log.Warn("Head event stream error", "err", err) @@ -83,6 +83,9 @@ func (s *ApiServer) SendRequest(id request.ID, req request.Request) { case sync.ReqBeaconBlock: log.Debug("Beacon API: requesting block", "reqid", id, "hash", common.Hash(data)) resp, err = s.api.GetBeaconBlock(common.Hash(data)) + case sync.ReqFinality: + log.Debug("Beacon API: requesting finality update") + resp, err = s.api.GetFinalityUpdate() default: } @@ -90,6 +93,7 @@ func (s *ApiServer) SendRequest(id request.ID, req request.Request) { log.Warn("Beacon API request failed", "type", reflect.TypeOf(req), "reqid", id, "err", err) s.eventCallback(request.Event{Type: request.EvFail, Data: request.RequestResponse{ID: id, Request: req}}) } else { + log.Debug("Beacon API request answered", "type", reflect.TypeOf(req), "reqid", id) s.eventCallback(request.Event{Type: request.EvResponse, Data: request.RequestResponse{ID: id, Request: req, Response: resp}}) } }() diff --git a/beacon/light/api/light_api.go b/beacon/light/api/light_api.go index 6892407caf..903db57344 100755 --- a/beacon/light/api/light_api.go +++ b/beacon/light/api/light_api.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/beacon/types" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/log" ) var ( @@ -184,46 +185,56 @@ func (api *BeaconLightApi) GetBestUpdatesAndCommittees(firstPeriod, count uint64 return updates, committees, nil } -// GetOptimisticHeadUpdate fetches a signed header based on the latest available -// optimistic update. Note that the signature should be verified by the caller -// as its validity depends on the update chain. +// GetOptimisticUpdate fetches the latest available optimistic update. +// Note that the signature should be verified by the caller as its validity +// depends on the update chain. // // See data structure definition here: // https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/sync-protocol.md#lightclientoptimisticupdate -func (api *BeaconLightApi) GetOptimisticHeadUpdate() (types.SignedHeader, error) { +func (api *BeaconLightApi) GetOptimisticUpdate() (types.OptimisticUpdate, error) { resp, err := api.httpGet("/eth/v1/beacon/light_client/optimistic_update") if err != nil { - return types.SignedHeader{}, err + return types.OptimisticUpdate{}, err } - return decodeOptimisticHeadUpdate(resp) + return decodeOptimisticUpdate(resp) } -func decodeOptimisticHeadUpdate(enc []byte) (types.SignedHeader, error) { +func decodeOptimisticUpdate(enc []byte) (types.OptimisticUpdate, error) { var data struct { - Data struct { - Header jsonBeaconHeader `json:"attested_header"` - Aggregate types.SyncAggregate `json:"sync_aggregate"` - SignatureSlot common.Decimal `json:"signature_slot"` + Version string + Data struct { + Attested jsonHeaderWithExecProof `json:"attested_header"` + Aggregate types.SyncAggregate `json:"sync_aggregate"` + SignatureSlot common.Decimal `json:"signature_slot"` } `json:"data"` } if err := json.Unmarshal(enc, &data); err != nil { - return types.SignedHeader{}, err + return types.OptimisticUpdate{}, err + } + // Decode the execution payload headers. + attestedExecHeader, err := types.ExecutionHeaderFromJSON(data.Version, data.Data.Attested.Execution) + if err != nil { + return types.OptimisticUpdate{}, fmt.Errorf("invalid attested header: %v", err) } - if data.Data.Header.Beacon.StateRoot == (common.Hash{}) { + if data.Data.Attested.Beacon.StateRoot == (common.Hash{}) { // workaround for different event encoding format in Lodestar if err := json.Unmarshal(enc, &data.Data); err != nil { - return types.SignedHeader{}, err + return types.OptimisticUpdate{}, err } } if len(data.Data.Aggregate.Signers) != params.SyncCommitteeBitmaskSize { - return types.SignedHeader{}, errors.New("invalid sync_committee_bits length") + return types.OptimisticUpdate{}, errors.New("invalid sync_committee_bits length") } if len(data.Data.Aggregate.Signature) != params.BLSSignatureSize { - return types.SignedHeader{}, errors.New("invalid sync_committee_signature length") + return types.OptimisticUpdate{}, errors.New("invalid sync_committee_signature length") } - return types.SignedHeader{ - Header: data.Data.Header.Beacon, + return types.OptimisticUpdate{ + Attested: types.HeaderWithExecProof{ + Header: data.Data.Attested.Beacon, + PayloadHeader: attestedExecHeader, + PayloadBranch: data.Data.Attested.ExecutionBranch, + }, Signature: data.Data.Aggregate, SignatureSlot: uint64(data.Data.SignatureSlot), }, nil @@ -411,7 +422,7 @@ func decodeHeadEvent(enc []byte) (uint64, common.Hash, error) { type HeadEventListener struct { OnNewHead func(slot uint64, blockRoot common.Hash) - OnSignedHead func(head types.SignedHeader) + OnOptimistic func(head types.OptimisticUpdate) OnFinality func(head types.FinalityUpdate) OnError func(err error) } @@ -449,21 +460,35 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() defer wg.Done() // Request initial data. + log.Trace("Requesting initial head header") if head, _, _, err := api.GetHeader(common.Hash{}); err == nil { + log.Trace("Retrieved initial head header", "slot", head.Slot, "hash", head.Hash()) listener.OnNewHead(head.Slot, head.Hash()) + } else { + log.Debug("Failed to retrieve initial head header", "error", err) } - if signedHead, err := api.GetOptimisticHeadUpdate(); err == nil { - listener.OnSignedHead(signedHead) + log.Trace("Requesting initial optimistic update") + if optimisticUpdate, err := api.GetOptimisticUpdate(); err == nil { + log.Trace("Retrieved initial optimistic update", "slot", optimisticUpdate.Attested.Slot, "hash", optimisticUpdate.Attested.Hash()) + listener.OnOptimistic(optimisticUpdate) + } else { + log.Debug("Failed to retrieve initial optimistic update", "error", err) } + log.Trace("Requesting initial finality update") if finalityUpdate, err := api.GetFinalityUpdate(); err == nil { + log.Trace("Retrieved initial finality update", "slot", finalityUpdate.Finalized.Slot, "hash", finalityUpdate.Finalized.Hash()) listener.OnFinality(finalityUpdate) + } else { + log.Debug("Failed to retrieve initial finality update", "error", err) } + log.Trace("Starting event stream processing loop") // Receive the stream. var stream *eventsource.Stream select { case stream = <-streamCh: case <-ctx.Done(): + log.Trace("Stopping event stream processing loop") return } @@ -474,8 +499,10 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() case event, ok := <-stream.Events: if !ok { + log.Trace("Event stream closed") return } + log.Trace("New event received from event stream", "type", event.Event()) switch event.Event() { case "head": slot, blockRoot, err := decodeHeadEvent([]byte(event.Data())) @@ -485,9 +512,9 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() listener.OnError(fmt.Errorf("error decoding head event: %v", err)) } case "light_client_optimistic_update": - signedHead, err := decodeOptimisticHeadUpdate([]byte(event.Data())) + optimisticUpdate, err := decodeOptimisticUpdate([]byte(event.Data())) if err == nil { - listener.OnSignedHead(signedHead) + listener.OnOptimistic(optimisticUpdate) } else { listener.OnError(fmt.Errorf("error decoding optimistic update event: %v", err)) } @@ -521,7 +548,8 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() // established. It can only return nil when the context is canceled. func (api *BeaconLightApi) startEventStream(ctx context.Context, listener *HeadEventListener) *eventsource.Stream { for retry := true; retry; retry = ctxSleep(ctx, 5*time.Second) { - path := "/eth/v1/events?topics=head&topics=light_client_optimistic_update&topics=light_client_finality_update" + path := "/eth/v1/events?topics=head&topics=light_client_finality_update&topics=light_client_optimistic_update" + log.Trace("Sending event subscription request") req, err := http.NewRequestWithContext(ctx, "GET", api.url+path, nil) if err != nil { listener.OnError(fmt.Errorf("error creating event subscription request: %v", err)) @@ -535,6 +563,7 @@ func (api *BeaconLightApi) startEventStream(ctx context.Context, listener *HeadE listener.OnError(fmt.Errorf("error creating event subscription: %v", err)) continue } + log.Trace("Successfully created event stream") return stream } return nil diff --git a/beacon/light/head_tracker.go b/beacon/light/head_tracker.go index 6036322f01..7ef93fecce 100644 --- a/beacon/light/head_tracker.go +++ b/beacon/light/head_tracker.go @@ -29,15 +29,15 @@ import ( // which is the (not necessarily validated) head announced by the majority of // servers. type HeadTracker struct { - lock sync.RWMutex - committeeChain *CommitteeChain - minSignerCount int - signedHead types.SignedHeader - hasSignedHead bool - finalityUpdate types.FinalityUpdate - hasFinalityUpdate bool - prefetchHead types.HeadInfo - changeCounter uint64 + lock sync.RWMutex + committeeChain *CommitteeChain + minSignerCount int + optimisticUpdate types.OptimisticUpdate + hasOptimisticUpdate bool + finalityUpdate types.FinalityUpdate + hasFinalityUpdate bool + prefetchHead types.HeadInfo + changeCounter uint64 } // NewHeadTracker creates a new HeadTracker. @@ -48,15 +48,15 @@ func NewHeadTracker(committeeChain *CommitteeChain, minSignerCount int) *HeadTra } } -// ValidatedHead returns the latest validated head. -func (h *HeadTracker) ValidatedHead() (types.SignedHeader, bool) { +// ValidatedOptimistic returns the latest validated optimistic update. +func (h *HeadTracker) ValidatedOptimistic() (types.OptimisticUpdate, bool) { h.lock.RLock() defer h.lock.RUnlock() - return h.signedHead, h.hasSignedHead + return h.optimisticUpdate, h.hasOptimisticUpdate } -// ValidatedFinality returns the latest validated finality. +// ValidatedFinality returns the latest validated finality update. func (h *HeadTracker) ValidatedFinality() (types.FinalityUpdate, bool) { h.lock.RLock() defer h.lock.RUnlock() @@ -64,26 +64,36 @@ func (h *HeadTracker) ValidatedFinality() (types.FinalityUpdate, bool) { return h.finalityUpdate, h.hasFinalityUpdate } -// ValidateHead validates the given signed head. If the head is successfully validated -// and it is better than the old validated head (higher slot or same slot and more -// signers) then ValidatedHead is updated. The boolean return flag signals if -// ValidatedHead has been changed. -func (h *HeadTracker) ValidateHead(head types.SignedHeader) (bool, error) { +// ValidateOptimistic validates the given optimistic update. If the update is +// successfully validated and it is better than the old validated update (higher +// slot or same slot and more signers) then ValidatedOptimistic is updated. +// The boolean return flag signals if ValidatedOptimistic has been changed. +func (h *HeadTracker) ValidateOptimistic(update types.OptimisticUpdate) (bool, error) { h.lock.Lock() defer h.lock.Unlock() - replace, err := h.validate(head, h.signedHead) + if err := update.Validate(); err != nil { + return false, err + } + replace, err := h.validate(update.SignedHeader(), h.optimisticUpdate.SignedHeader()) if replace { - h.signedHead, h.hasSignedHead = head, true + h.optimisticUpdate, h.hasOptimisticUpdate = update, true h.changeCounter++ } return replace, err } +// ValidateFinality validates the given finality update. If the update is +// successfully validated and it is better than the old validated update (higher +// slot or same slot and more signers) then ValidatedFinality is updated. +// The boolean return flag signals if ValidatedFinality has been changed. func (h *HeadTracker) ValidateFinality(update types.FinalityUpdate) (bool, error) { h.lock.Lock() defer h.lock.Unlock() + if err := update.Validate(); err != nil { + return false, err + } replace, err := h.validate(update.SignedHeader(), h.finalityUpdate.SignedHeader()) if replace { h.finalityUpdate, h.hasFinalityUpdate = update, true @@ -142,6 +152,7 @@ func (h *HeadTracker) SetPrefetchHead(head types.HeadInfo) { h.changeCounter++ } +// ChangeCounter implements request.targetData func (h *HeadTracker) ChangeCounter() uint64 { h.lock.RLock() defer h.lock.RUnlock() diff --git a/beacon/light/sync/head_sync.go b/beacon/light/sync/head_sync.go index 5ccc2e18a2..dd05d39588 100644 --- a/beacon/light/sync/head_sync.go +++ b/beacon/light/sync/head_sync.go @@ -19,11 +19,13 @@ package sync import ( "github.com/ethereum/go-ethereum/beacon/light/request" "github.com/ethereum/go-ethereum/beacon/types" + "github.com/ethereum/go-ethereum/log" ) type headTracker interface { - ValidateHead(head types.SignedHeader) (bool, error) + ValidateOptimistic(update types.OptimisticUpdate) (bool, error) ValidateFinality(head types.FinalityUpdate) (bool, error) + ValidatedFinality() (types.FinalityUpdate, bool) SetPrefetchHead(head types.HeadInfo) } @@ -33,16 +35,17 @@ type headTracker interface { // It can also postpone the validation of the latest announced signed head // until the committee chain is synced up to at least the required period. type HeadSync struct { - headTracker headTracker - chain committeeChain - nextSyncPeriod uint64 - chainInit bool - unvalidatedHeads map[request.Server]types.SignedHeader - unvalidatedFinality map[request.Server]types.FinalityUpdate - serverHeads map[request.Server]types.HeadInfo - headServerCount map[types.HeadInfo]headServerCount - headCounter uint64 - prefetchHead types.HeadInfo + headTracker headTracker + chain committeeChain + nextSyncPeriod uint64 + chainInit bool + unvalidatedOptimistic map[request.Server]types.OptimisticUpdate + unvalidatedFinality map[request.Server]types.FinalityUpdate + serverHeads map[request.Server]types.HeadInfo + reqFinalityEpoch map[request.Server]uint64 // next epoch to request finality update + headServerCount map[types.HeadInfo]headServerCount + headCounter uint64 + prefetchHead types.HeadInfo } // headServerCount is associated with most recently seen head infos; it counts @@ -57,75 +60,98 @@ type headServerCount struct { // NewHeadSync creates a new HeadSync. func NewHeadSync(headTracker headTracker, chain committeeChain) *HeadSync { s := &HeadSync{ - headTracker: headTracker, - chain: chain, - unvalidatedHeads: make(map[request.Server]types.SignedHeader), - unvalidatedFinality: make(map[request.Server]types.FinalityUpdate), - serverHeads: make(map[request.Server]types.HeadInfo), - headServerCount: make(map[types.HeadInfo]headServerCount), + headTracker: headTracker, + chain: chain, + unvalidatedOptimistic: make(map[request.Server]types.OptimisticUpdate), + unvalidatedFinality: make(map[request.Server]types.FinalityUpdate), + serverHeads: make(map[request.Server]types.HeadInfo), + headServerCount: make(map[types.HeadInfo]headServerCount), + reqFinalityEpoch: make(map[request.Server]uint64), } return s } // Process implements request.Module. func (s *HeadSync) Process(requester request.Requester, events []request.Event) { + nextPeriod, chainInit := s.chain.NextSyncPeriod() + if nextPeriod != s.nextSyncPeriod || chainInit != s.chainInit { + s.nextSyncPeriod, s.chainInit = nextPeriod, chainInit + s.processUnvalidatedUpdates() + } + for _, event := range events { switch event.Type { case EvNewHead: s.setServerHead(event.Server, event.Data.(types.HeadInfo)) - case EvNewSignedHead: - s.newSignedHead(event.Server, event.Data.(types.SignedHeader)) + case EvNewOptimisticUpdate: + update := event.Data.(types.OptimisticUpdate) + s.newOptimisticUpdate(event.Server, update) + epoch := update.Attested.Epoch() + if epoch < s.reqFinalityEpoch[event.Server] { + continue + } + if finality, ok := s.headTracker.ValidatedFinality(); ok && finality.Attested.Header.Epoch() >= epoch { + continue + } + requester.Send(event.Server, ReqFinality{}) + s.reqFinalityEpoch[event.Server] = epoch + 1 case EvNewFinalityUpdate: s.newFinalityUpdate(event.Server, event.Data.(types.FinalityUpdate)) + case request.EvResponse: + _, _, resp := event.RequestInfo() + s.newFinalityUpdate(event.Server, resp.(types.FinalityUpdate)) case request.EvUnregistered: s.setServerHead(event.Server, types.HeadInfo{}) delete(s.serverHeads, event.Server) - delete(s.unvalidatedHeads, event.Server) + delete(s.unvalidatedOptimistic, event.Server) + delete(s.unvalidatedFinality, event.Server) } } - - nextPeriod, chainInit := s.chain.NextSyncPeriod() - if nextPeriod != s.nextSyncPeriod || chainInit != s.chainInit { - s.nextSyncPeriod, s.chainInit = nextPeriod, chainInit - s.processUnvalidated() - } } -// newSignedHead handles received signed head; either validates it if the chain -// is properly synced or stores it for further validation. -func (s *HeadSync) newSignedHead(server request.Server, signedHead types.SignedHeader) { - if !s.chainInit || types.SyncPeriod(signedHead.SignatureSlot) > s.nextSyncPeriod { - s.unvalidatedHeads[server] = signedHead +// newOptimisticUpdate handles received optimistic update; either validates it if +// the chain is properly synced or stores it for further validation. +func (s *HeadSync) newOptimisticUpdate(server request.Server, optimisticUpdate types.OptimisticUpdate) { + if !s.chainInit || types.SyncPeriod(optimisticUpdate.SignatureSlot) > s.nextSyncPeriod { + s.unvalidatedOptimistic[server] = optimisticUpdate return } - s.headTracker.ValidateHead(signedHead) + if _, err := s.headTracker.ValidateOptimistic(optimisticUpdate); err != nil { + log.Debug("Error validating optimistic update", "error", err) + } } -// newFinalityUpdate handles received finality update; either validates it if the chain -// is properly synced or stores it for further validation. +// newFinalityUpdate handles received finality update; either validates it if +// the chain is properly synced or stores it for further validation. func (s *HeadSync) newFinalityUpdate(server request.Server, finalityUpdate types.FinalityUpdate) { if !s.chainInit || types.SyncPeriod(finalityUpdate.SignatureSlot) > s.nextSyncPeriod { s.unvalidatedFinality[server] = finalityUpdate return } - s.headTracker.ValidateFinality(finalityUpdate) + if _, err := s.headTracker.ValidateFinality(finalityUpdate); err != nil { + log.Debug("Error validating finality update", "error", err) + } } -// processUnvalidated iterates the list of unvalidated heads and validates +// processUnvalidatedUpdates iterates the list of unvalidated updates and validates // those which can be validated. -func (s *HeadSync) processUnvalidated() { +func (s *HeadSync) processUnvalidatedUpdates() { if !s.chainInit { return } - for server, signedHead := range s.unvalidatedHeads { - if types.SyncPeriod(signedHead.SignatureSlot) <= s.nextSyncPeriod { - s.headTracker.ValidateHead(signedHead) - delete(s.unvalidatedHeads, server) + for server, optimisticUpdate := range s.unvalidatedOptimistic { + if types.SyncPeriod(optimisticUpdate.SignatureSlot) <= s.nextSyncPeriod { + if _, err := s.headTracker.ValidateOptimistic(optimisticUpdate); err != nil { + log.Debug("Error validating deferred optimistic update", "error", err) + } + delete(s.unvalidatedOptimistic, server) } } for server, finalityUpdate := range s.unvalidatedFinality { if types.SyncPeriod(finalityUpdate.SignatureSlot) <= s.nextSyncPeriod { - s.headTracker.ValidateFinality(finalityUpdate) + if _, err := s.headTracker.ValidateFinality(finalityUpdate); err != nil { + log.Debug("Error validating deferred finality update", "error", err) + } delete(s.unvalidatedFinality, server) } } diff --git a/beacon/light/sync/head_sync_test.go b/beacon/light/sync/head_sync_test.go index a2870b2732..cd7dacf7fe 100644 --- a/beacon/light/sync/head_sync_test.go +++ b/beacon/light/sync/head_sync_test.go @@ -19,6 +19,7 @@ package sync import ( "testing" + "github.com/ethereum/go-ethereum/beacon/light/request" "github.com/ethereum/go-ethereum/beacon/types" "github.com/ethereum/go-ethereum/common" ) @@ -28,6 +29,7 @@ var ( testServer2 = testServer("testServer2") testServer3 = testServer("testServer3") testServer4 = testServer("testServer4") + testServer5 = testServer("testServer5") testHead0 = types.HeadInfo{} testHead1 = types.HeadInfo{Slot: 123, BlockRoot: common.Hash{1}} @@ -35,13 +37,21 @@ var ( testHead3 = types.HeadInfo{Slot: 124, BlockRoot: common.Hash{3}} testHead4 = types.HeadInfo{Slot: 125, BlockRoot: common.Hash{4}} - testSHead1 = types.SignedHeader{SignatureSlot: 0x0124, Header: types.Header{Slot: 0x0123, StateRoot: common.Hash{1}}} - testSHead2 = types.SignedHeader{SignatureSlot: 0x2010, Header: types.Header{Slot: 0x200e, StateRoot: common.Hash{2}}} - // testSHead3 is at the end of period 1 but signed in period 2 - testSHead3 = types.SignedHeader{SignatureSlot: 0x4000, Header: types.Header{Slot: 0x3fff, StateRoot: common.Hash{3}}} - testSHead4 = types.SignedHeader{SignatureSlot: 0x6444, Header: types.Header{Slot: 0x6443, StateRoot: common.Hash{4}}} + testOptUpdate1 = types.OptimisticUpdate{SignatureSlot: 0x0124, Attested: types.HeaderWithExecProof{Header: types.Header{Slot: 0x0123, StateRoot: common.Hash{1}}}} + testOptUpdate2 = types.OptimisticUpdate{SignatureSlot: 0x2010, Attested: types.HeaderWithExecProof{Header: types.Header{Slot: 0x200e, StateRoot: common.Hash{2}}}} + // testOptUpdate3 is at the end of period 1 but signed in period 2 + testOptUpdate3 = types.OptimisticUpdate{SignatureSlot: 0x4000, Attested: types.HeaderWithExecProof{Header: types.Header{Slot: 0x3fff, StateRoot: common.Hash{3}}}} + testOptUpdate4 = types.OptimisticUpdate{SignatureSlot: 0x6444, Attested: types.HeaderWithExecProof{Header: types.Header{Slot: 0x6443, StateRoot: common.Hash{4}}}} ) +func finality(opt types.OptimisticUpdate) types.FinalityUpdate { + return types.FinalityUpdate{ + SignatureSlot: opt.SignatureSlot, + Attested: opt.Attested, + Finalized: types.HeaderWithExecProof{Header: types.Header{Slot: (opt.Attested.Header.Slot - 64) & uint64(0xffffffffffffffe0)}}, + } +} + type testServer string func (t testServer) Name() string { @@ -57,50 +67,66 @@ func TestValidatedHead(t *testing.T) { ht.ExpValidated(t, 0, nil) ts.AddServer(testServer1, 1) - ts.ServerEvent(EvNewSignedHead, testServer1, testSHead1) - ts.Run(1) + ts.ServerEvent(EvNewOptimisticUpdate, testServer1, testOptUpdate1) + ts.Run(1, testServer1, ReqFinality{}) // announced head should be queued because of uninitialized chain ht.ExpValidated(t, 1, nil) chain.SetNextSyncPeriod(0) // initialize chain ts.Run(2) // expect previously queued head to be validated - ht.ExpValidated(t, 2, []types.SignedHeader{testSHead1}) + ht.ExpValidated(t, 2, []types.OptimisticUpdate{testOptUpdate1}) chain.SetNextSyncPeriod(1) - ts.ServerEvent(EvNewSignedHead, testServer1, testSHead2) + ts.ServerEvent(EvNewFinalityUpdate, testServer1, finality(testOptUpdate2)) + ts.ServerEvent(EvNewOptimisticUpdate, testServer1, testOptUpdate2) ts.AddServer(testServer2, 1) - ts.ServerEvent(EvNewSignedHead, testServer2, testSHead2) + ts.ServerEvent(EvNewOptimisticUpdate, testServer2, testOptUpdate2) ts.Run(3) // expect both head announcements to be validated instantly - ht.ExpValidated(t, 3, []types.SignedHeader{testSHead2, testSHead2}) + ht.ExpValidated(t, 3, []types.OptimisticUpdate{testOptUpdate2, testOptUpdate2}) - ts.ServerEvent(EvNewSignedHead, testServer1, testSHead3) + ts.ServerEvent(EvNewOptimisticUpdate, testServer1, testOptUpdate3) ts.AddServer(testServer3, 1) - ts.ServerEvent(EvNewSignedHead, testServer3, testSHead4) - ts.Run(4) - // future period announced heads should be queued + ts.ServerEvent(EvNewOptimisticUpdate, testServer3, testOptUpdate4) + // finality should be requested from both servers + ts.Run(4, testServer1, ReqFinality{}, testServer3, ReqFinality{}) + // future period annonced heads should be queued ht.ExpValidated(t, 4, nil) chain.SetNextSyncPeriod(2) ts.Run(5) - // testSHead3 can be validated now but not testSHead4 - ht.ExpValidated(t, 5, []types.SignedHeader{testSHead3}) + // testOptUpdate3 can be validated now but not testOptUpdate4 + ht.ExpValidated(t, 5, []types.OptimisticUpdate{testOptUpdate3}) + + ts.AddServer(testServer4, 1) + ts.ServerEvent(EvNewOptimisticUpdate, testServer4, testOptUpdate3) + // new server joined with recent optimistic update but still no finality; should be requested + ts.Run(6, testServer4, ReqFinality{}) + ht.ExpValidated(t, 6, []types.OptimisticUpdate{testOptUpdate3}) + + ts.AddServer(testServer5, 1) + ts.RequestEvent(request.EvResponse, ts.Request(6, 1), finality(testOptUpdate3)) + ts.ServerEvent(EvNewOptimisticUpdate, testServer5, testOptUpdate3) + // finality update request answered; new server should not be requested + ts.Run(7) + ht.ExpValidated(t, 7, []types.OptimisticUpdate{testOptUpdate3}) // server 3 disconnected without proving period 3, its announced head should be dropped ts.RemoveServer(testServer3) - ts.Run(6) - ht.ExpValidated(t, 6, nil) + ts.Run(8) + ht.ExpValidated(t, 8, nil) chain.SetNextSyncPeriod(3) - ts.Run(7) - // testSHead4 could be validated now but it's not queued by any registered server - ht.ExpValidated(t, 7, nil) + ts.Run(9) + // testOptUpdate4 could be validated now but it's not queued by any registered server + ht.ExpValidated(t, 9, nil) - ts.ServerEvent(EvNewSignedHead, testServer2, testSHead4) - ts.Run(8) - // now testSHead4 should be validated - ht.ExpValidated(t, 8, []types.SignedHeader{testSHead4}) + ts.ServerEvent(EvNewFinalityUpdate, testServer2, finality(testOptUpdate4)) + ts.ServerEvent(EvNewOptimisticUpdate, testServer2, testOptUpdate4) + ts.Run(10) + // now testOptUpdate4 should be validated + ht.ExpValidated(t, 10, []types.OptimisticUpdate{testOptUpdate4}) } func TestPrefetchHead(t *testing.T) { diff --git a/beacon/light/sync/test_helpers.go b/beacon/light/sync/test_helpers.go index 9f57ceebe4..cfca8ad8a4 100644 --- a/beacon/light/sync/test_helpers.go +++ b/beacon/light/sync/test_helpers.go @@ -212,32 +212,37 @@ func (tc *TestCommitteeChain) ExpNextSyncPeriod(t *testing.T, expNsp uint64) { type TestHeadTracker struct { phead types.HeadInfo - validated []types.SignedHeader + validated []types.OptimisticUpdate + finality types.FinalityUpdate } -func (ht *TestHeadTracker) ValidateHead(head types.SignedHeader) (bool, error) { - ht.validated = append(ht.validated, head) +func (ht *TestHeadTracker) ValidateOptimistic(update types.OptimisticUpdate) (bool, error) { + ht.validated = append(ht.validated, update) return true, nil } -// TODO add test case for finality -func (ht *TestHeadTracker) ValidateFinality(head types.FinalityUpdate) (bool, error) { +func (ht *TestHeadTracker) ValidateFinality(update types.FinalityUpdate) (bool, error) { + ht.finality = update return true, nil } -func (ht *TestHeadTracker) ExpValidated(t *testing.T, tci int, expHeads []types.SignedHeader) { +func (ht *TestHeadTracker) ValidatedFinality() (types.FinalityUpdate, bool) { + return ht.finality, ht.finality.Attested.Header != (types.Header{}) +} + +func (ht *TestHeadTracker) ExpValidated(t *testing.T, tci int, expHeads []types.OptimisticUpdate) { for i, expHead := range expHeads { if i >= len(ht.validated) { - t.Errorf("Missing validated head in test case #%d index #%d (expected {slot %d blockRoot %x}, got none)", tci, i, expHead.Header.Slot, expHead.Header.Hash()) + t.Errorf("Missing validated head in test case #%d index #%d (expected {slot %d blockRoot %x}, got none)", tci, i, expHead.Attested.Header.Slot, expHead.Attested.Header.Hash()) continue } - if ht.validated[i] != expHead { - vhead := ht.validated[i].Header - t.Errorf("Wrong validated head in test case #%d index #%d (expected {slot %d blockRoot %x}, got {slot %d blockRoot %x})", tci, i, expHead.Header.Slot, expHead.Header.Hash(), vhead.Slot, vhead.Hash()) + if !reflect.DeepEqual(ht.validated[i], expHead) { + vhead := ht.validated[i].Attested.Header + t.Errorf("Wrong validated head in test case #%d index #%d (expected {slot %d blockRoot %x}, got {slot %d blockRoot %x})", tci, i, expHead.Attested.Header.Slot, expHead.Attested.Header.Hash(), vhead.Slot, vhead.Hash()) } } for i := len(expHeads); i < len(ht.validated); i++ { - vhead := ht.validated[i].Header + vhead := ht.validated[i].Attested.Header t.Errorf("Unexpected validated head in test case #%d index #%d (expected none, got {slot %d blockRoot %x})", tci, i, vhead.Slot, vhead.Hash()) } ht.validated = nil diff --git a/beacon/light/sync/types.go b/beacon/light/sync/types.go index 8aa4c95f46..97a3fb2111 100644 --- a/beacon/light/sync/types.go +++ b/beacon/light/sync/types.go @@ -23,9 +23,9 @@ import ( ) var ( - EvNewHead = &request.EventType{Name: "newHead"} // data: types.HeadInfo - EvNewSignedHead = &request.EventType{Name: "newSignedHead"} // data: types.SignedHeader - EvNewFinalityUpdate = &request.EventType{Name: "newFinalityUpdate"} // data: types.FinalityUpdate + EvNewHead = &request.EventType{Name: "newHead"} // data: types.HeadInfo + EvNewOptimisticUpdate = &request.EventType{Name: "newOptimisticUpdate"} // data: types.OptimisticUpdate + EvNewFinalityUpdate = &request.EventType{Name: "newFinalityUpdate"} // data: types.FinalityUpdate ) type ( @@ -43,4 +43,5 @@ type ( } ReqCheckpointData common.Hash ReqBeaconBlock common.Hash + ReqFinality struct{} ) diff --git a/beacon/light/sync/update_sync.go b/beacon/light/sync/update_sync.go index 71801b1b60..9549ee5992 100644 --- a/beacon/light/sync/update_sync.go +++ b/beacon/light/sync/update_sync.go @@ -84,6 +84,7 @@ func (s *CheckpointInit) Process(requester request.Requester, events []request.E if s.initialized { return } + for _, event := range events { switch event.Type { case request.EvResponse, request.EvFail, request.EvTimeout: @@ -132,10 +133,12 @@ func (s *CheckpointInit) Process(requester request.Requester, events []request.E newState.state = ssPrintStatus s.serverState[sid.Server] = newState } + case request.EvUnregistered: delete(s.serverState, event.Server) } } + // start a request if possible for _, server := range requester.CanSendTo() { switch s.serverState[server].state { @@ -156,6 +159,7 @@ func (s *CheckpointInit) Process(requester request.Requester, events []request.E s.serverState[server] = newState } } + // print log message if necessary for server, state := range s.serverState { if state.state != ssPrintStatus { @@ -316,9 +320,9 @@ func (s *ForwardUpdateSync) Process(requester request.Requester, events []reques if !queued { s.unlockRange(sid, req) } - case EvNewSignedHead: - signedHead := event.Data.(types.SignedHeader) - s.nextSyncPeriod[event.Server] = types.SyncPeriod(signedHead.SignatureSlot + 256) + case EvNewOptimisticUpdate: + update := event.Data.(types.OptimisticUpdate) + s.nextSyncPeriod[event.Server] = types.SyncPeriod(update.SignatureSlot + 256) case request.EvUnregistered: delete(s.nextSyncPeriod, event.Server) } diff --git a/beacon/light/sync/update_sync_test.go b/beacon/light/sync/update_sync_test.go index 1c4b3d6d76..8329bf28c9 100644 --- a/beacon/light/sync/update_sync_test.go +++ b/beacon/light/sync/update_sync_test.go @@ -68,9 +68,9 @@ func TestUpdateSyncParallel(t *testing.T) { ts := NewTestScheduler(t, updateSync) // add 2 servers, head at period 100; allow 3-3 parallel requests for each ts.AddServer(testServer1, 3) - ts.ServerEvent(EvNewSignedHead, testServer1, types.SignedHeader{SignatureSlot: 0x2000*100 + 0x1000}) + ts.ServerEvent(EvNewOptimisticUpdate, testServer1, types.OptimisticUpdate{SignatureSlot: 0x2000*100 + 0x1000}) ts.AddServer(testServer2, 3) - ts.ServerEvent(EvNewSignedHead, testServer2, types.SignedHeader{SignatureSlot: 0x2000*100 + 0x1000}) + ts.ServerEvent(EvNewOptimisticUpdate, testServer2, types.OptimisticUpdate{SignatureSlot: 0x2000*100 + 0x1000}) // expect 6 requests to be sent ts.Run(1, @@ -150,11 +150,11 @@ func TestUpdateSyncDifferentHeads(t *testing.T) { ts := NewTestScheduler(t, updateSync) // add 3 servers with different announced head periods ts.AddServer(testServer1, 1) - ts.ServerEvent(EvNewSignedHead, testServer1, types.SignedHeader{SignatureSlot: 0x2000*15 + 0x1000}) + ts.ServerEvent(EvNewOptimisticUpdate, testServer1, types.OptimisticUpdate{SignatureSlot: 0x2000*15 + 0x1000}) ts.AddServer(testServer2, 1) - ts.ServerEvent(EvNewSignedHead, testServer2, types.SignedHeader{SignatureSlot: 0x2000*16 + 0x1000}) + ts.ServerEvent(EvNewOptimisticUpdate, testServer2, types.OptimisticUpdate{SignatureSlot: 0x2000*16 + 0x1000}) ts.AddServer(testServer3, 1) - ts.ServerEvent(EvNewSignedHead, testServer3, types.SignedHeader{SignatureSlot: 0x2000*17 + 0x1000}) + ts.ServerEvent(EvNewOptimisticUpdate, testServer3, types.OptimisticUpdate{SignatureSlot: 0x2000*17 + 0x1000}) // expect request to the best announced head ts.Run(1, testServer3, ReqUpdates{FirstPeriod: 10, Count: 7}) @@ -190,7 +190,7 @@ func TestUpdateSyncDifferentHeads(t *testing.T) { // a new server is registered with announced head period 17 ts.AddServer(testServer4, 1) - ts.ServerEvent(EvNewSignedHead, testServer4, types.SignedHeader{SignatureSlot: 0x2000*17 + 0x1000}) + ts.ServerEvent(EvNewOptimisticUpdate, testServer4, types.OptimisticUpdate{SignatureSlot: 0x2000*17 + 0x1000}) // expect request to sync one more period ts.Run(7, testServer4, ReqUpdates{FirstPeriod: 16, Count: 1}) diff --git a/beacon/types/exec_payload.go b/beacon/types/exec_payload.go index 604de288d2..718f98f529 100644 --- a/beacon/types/exec_payload.go +++ b/beacon/types/exec_payload.go @@ -66,9 +66,8 @@ func convertPayload[T payloadType](payload T, parentRoot *zrntcommon.Root) (*typ block := types.NewBlockWithHeader(&header) block = block.WithBody(transactions, nil) block = block.WithWithdrawals(withdrawals) - hash := block.Hash() - if hash != expectedHash { - return block, fmt.Errorf("Sanity check failed, payload hash does not match (expected %x, got %x)", expectedHash, hash) + if hash := block.Hash(); hash != expectedHash { + return nil, fmt.Errorf("Sanity check failed, payload hash does not match (expected %x, got %x)", expectedHash, hash) } return block, nil } diff --git a/beacon/types/light_sync.go b/beacon/types/light_sync.go index 62becdb21c..3e9b13d0e2 100644 --- a/beacon/types/light_sync.go +++ b/beacon/types/light_sync.go @@ -23,7 +23,7 @@ import ( "github.com/ethereum/go-ethereum/beacon/merkle" "github.com/ethereum/go-ethereum/beacon/params" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" + ctypes "github.com/ethereum/go-ethereum/core/types" ) // HeadInfo represents an unvalidated new head announcement. @@ -142,17 +142,57 @@ func (u UpdateScore) BetterThan(w UpdateScore) bool { return u.SignerCount > w.SignerCount } +// HeaderWithExecProof contains a beacon header and proves the belonging execution +// payload header with a Merkle proof. type HeaderWithExecProof struct { Header PayloadHeader *ExecutionHeader PayloadBranch merkle.Values } +// Validate verifies the Merkle proof of the execution payload header. func (h *HeaderWithExecProof) Validate() error { - payloadRoot := h.PayloadHeader.PayloadRoot() - return merkle.VerifyProof(h.BodyRoot, params.BodyIndexExecPayload, h.PayloadBranch, payloadRoot) + return merkle.VerifyProof(h.BodyRoot, params.BodyIndexExecPayload, h.PayloadBranch, h.PayloadHeader.PayloadRoot()) } +// OptimisticUpdate proves sync committee commitment on the attested beacon header. +// It also proves the belonging execution payload header with a Merkle proof. +// +// See data structure definition here: +// https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/sync-protocol.md#lightclientoptimisticupdate +type OptimisticUpdate struct { + Attested HeaderWithExecProof + // Sync committee BLS signature aggregate + Signature SyncAggregate + // Slot in which the signature has been created (newer than Header.Slot, + // determines the signing sync committee) + SignatureSlot uint64 +} + +// SignedHeader returns the signed attested header of the update. +func (u *OptimisticUpdate) SignedHeader() SignedHeader { + return SignedHeader{ + Header: u.Attested.Header, + Signature: u.Signature, + SignatureSlot: u.SignatureSlot, + } +} + +// Validate verifies the Merkle proof proving the execution payload header. +// Note that the sync committee signature of the attested header should be +// verified separately by a synced committee chain. +func (u *OptimisticUpdate) Validate() error { + return u.Attested.Validate() +} + +// FinalityUpdate proves a finalized beacon header by a sync committee commitment +// on an attested beacon header, referring to the latest finalized header with a +// Merkle proof. +// It also proves the execution payload header belonging to both the attested and +// the finalized beacon header with Merkle proofs. +// +// See data structure definition here: +// https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/sync-protocol.md#lightclientfinalityupdate type FinalityUpdate struct { Attested, Finalized HeaderWithExecProof FinalityBranch merkle.Values @@ -163,6 +203,7 @@ type FinalityUpdate struct { SignatureSlot uint64 } +// SignedHeader returns the signed attested header of the update. func (u *FinalityUpdate) SignedHeader() SignedHeader { return SignedHeader{ Header: u.Attested.Header, @@ -171,6 +212,10 @@ func (u *FinalityUpdate) SignedHeader() SignedHeader { } } +// Validate verifies the Merkle proofs proving the finalized beacon header and +// the execution payload headers belonging to the attested and finalized headers. +// Note that the sync committee signature of the attested header should be +// verified separately by a synced committee chain. func (u *FinalityUpdate) Validate() error { if err := u.Attested.Validate(); err != nil { return err @@ -186,6 +231,6 @@ func (u *FinalityUpdate) Validate() error { // finalized execution block. type ChainHeadEvent struct { BeaconHead Header - Block *types.Block + Block *ctypes.Block Finalized common.Hash }