diff --git a/les/client_handler.go b/les/client_handler.go index b903b1106..4a550b207 100644 --- a/les/client_handler.go +++ b/les/client_handler.go @@ -100,11 +100,11 @@ func (h *clientHandler) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) defer peer.close() h.wg.Add(1) defer h.wg.Done() - err := h.handle(peer) + err := h.handle(peer, false) return err } -func (h *clientHandler) handle(p *serverPeer) error { +func (h *clientHandler) handle(p *serverPeer, noInitAnnounce bool) error { if h.backend.peers.len() >= h.backend.config.LightPeers && !p.Peer.Info().Network.Trusted { return p2p.DiscTooManyPeers } @@ -143,8 +143,11 @@ func (h *clientHandler) handle(p *serverPeer) error { connectionTimer.Update(time.Duration(mclock.Now() - connectedAt)) serverConnectionGauge.Update(int64(h.backend.peers.len())) }() - h.fetcher.announce(p, &announceData{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td}) - + // It's mainly used in testing which requires discarding initial + // signal to prevent syncing. + if !noInitAnnounce { + h.fetcher.announce(p, &announceData{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td}) + } // Mark the peer starts to be served. atomic.StoreUint32(&p.serving, 1) defer atomic.StoreUint32(&p.serving, 0) diff --git a/les/fetcher.go b/les/fetcher.go index a6d1c93c4..5eea99674 100644 --- a/les/fetcher.go +++ b/les/fetcher.go @@ -153,9 +153,7 @@ type lightFetcher struct { synchronise func(peer *serverPeer) // Test fields or hooks - noAnnounce bool newHeadHook func(*types.Header) - newAnnounce func(*serverPeer, *announceData) } // newLightFetcher creates a light fetcher instance. @@ -474,12 +472,6 @@ func (f *lightFetcher) mainloop() { // announce processes a new announcement message received from a peer. func (f *lightFetcher) announce(p *serverPeer, head *announceData) { - if f.newAnnounce != nil { - f.newAnnounce(p, head) - } - if f.noAnnounce { - return - } select { case f.announceCh <- &announce{peerid: p.ID(), trust: p.trusted, data: head}: case <-f.closeCh: diff --git a/les/fetcher_test.go b/les/fetcher_test.go index d3a74d25c..ef700651e 100644 --- a/les/fetcher_test.go +++ b/les/fetcher_test.go @@ -74,14 +74,12 @@ func testSequentialAnnouncements(t *testing.T, protocol int) { s, c, teardown := newClientServerEnv(t, netconfig) defer teardown() - // Create connected peer pair. - c.handler.fetcher.noAnnounce = true // Ignore the first announce from peer which can trigger a resync. - p1, _, err := newTestPeerPair("peer", protocol, s.handler, c.handler) + // Create connected peer pair, the initial signal from LES server + // is discarded to prevent syncing. + p1, _, err := newTestPeerPair("peer", protocol, s.handler, c.handler, true) if err != nil { t.Fatalf("Failed to create peer pair %v", err) } - c.handler.fetcher.noAnnounce = false - importCh := make(chan interface{}) c.handler.fetcher.newHeadHook = func(header *types.Header) { importCh <- header @@ -114,14 +112,12 @@ func testGappedAnnouncements(t *testing.T, protocol int) { s, c, teardown := newClientServerEnv(t, netconfig) defer teardown() - // Create connected peer pair. - c.handler.fetcher.noAnnounce = true // Ignore the first announce from peer which can trigger a resync. - peer, _, err := newTestPeerPair("peer", protocol, s.handler, c.handler) + // Create connected peer pair, the initial signal from LES server + // is discarded to prevent syncing. + peer, _, err := newTestPeerPair("peer", protocol, s.handler, c.handler, true) if err != nil { t.Fatalf("Failed to create peer pair %v", err) } - c.handler.fetcher.noAnnounce = false - done := make(chan *types.Header, 1) c.handler.fetcher.newHeadHook = func(header *types.Header) { done <- header } @@ -141,29 +137,11 @@ func testGappedAnnouncements(t *testing.T, protocol int) { verifyChainHeight(t, c.handler.fetcher, 4) // Send a reorged announcement - var newAnno = make(chan struct{}, 1) - c.handler.fetcher.noAnnounce = true - c.handler.fetcher.newAnnounce = func(*serverPeer, *announceData) { - newAnno <- struct{}{} - } blocks, _ := core.GenerateChain(rawdb.ReadChainConfig(s.db, s.backend.Blockchain().Genesis().Hash()), s.backend.Blockchain().GetBlockByNumber(3), ethash.NewFaker(), s.db, 2, func(i int, gen *core.BlockGen) { gen.OffsetTime(-9) // higher block difficulty }) s.backend.Blockchain().InsertChain(blocks) - <-newAnno - c.handler.fetcher.noAnnounce = false - c.handler.fetcher.newAnnounce = nil - - latest = blocks[len(blocks)-1].Header() - hash, number = latest.Hash(), latest.Number.Uint64() - td = rawdb.ReadTd(s.db, hash, number) - - announce = announceData{hash, number, td, 1, nil} - if peer.cpeer.announceType == announceTypeSigned { - announce.sign(s.handler.server.privateKey) - } - peer.cpeer.sendAnnounce(announce) <-done // Wait syncing verifyChainHeight(t, c.handler.fetcher, 5) @@ -206,20 +184,15 @@ func testTrustedAnnouncement(t *testing.T, protocol int) { teardowns[i]() } }() - - c.handler.fetcher.noAnnounce = true // Ignore the first announce from peer which can trigger a resync. - // Connect all server instances. for i := 0; i < len(servers); i++ { - sp, cp, err := connect(servers[i].handler, nodes[i].ID(), c.handler, protocol) + sp, cp, err := connect(servers[i].handler, nodes[i].ID(), c.handler, protocol, true) if err != nil { t.Fatalf("connect server and client failed, err %s", err) } cpeers = append(cpeers, cp) speers = append(speers, sp) } - c.handler.fetcher.noAnnounce = false - newHead := make(chan *types.Header, 1) c.handler.fetcher.newHeadHook = func(header *types.Header) { newHead <- header } @@ -262,14 +235,12 @@ func testInvalidAnnounces(t *testing.T, protocol int) { s, c, teardown := newClientServerEnv(t, netconfig) defer teardown() - // Create connected peer pair. - c.handler.fetcher.noAnnounce = true // Ignore the first announce from peer which can trigger a resync. - peer, _, err := newTestPeerPair("peer", lpv3, s.handler, c.handler) + // Create connected peer pair, the initial signal from LES server + // is discarded to prevent syncing. + peer, _, err := newTestPeerPair("peer", lpv3, s.handler, c.handler, true) if err != nil { t.Fatalf("Failed to create peer pair %v", err) } - c.handler.fetcher.noAnnounce = false - done := make(chan *types.Header, 1) c.handler.fetcher.newHeadHook = func(header *types.Header) { done <- header } diff --git a/les/odr_test.go b/les/odr_test.go index ea88495d1..ad77abf5b 100644 --- a/les/odr_test.go +++ b/les/odr_test.go @@ -401,9 +401,9 @@ func testGetTxStatusFromUnindexedPeers(t *testing.T, protocol int) { closeFns = append(closeFns, closePeer) // Create a one-time routine for serving message - go func(i int, peer *testPeer) { - serveMsg(peer, testspec.txLookups[i]) - }(i, peer) + go func(i int, peer *testPeer, lookup uint64) { + serveMsg(peer, lookup) + }(i, peer, testspec.txLookups[i]) } // Send out the GetTxStatus requests, compare the result with diff --git a/les/server_handler.go b/les/server_handler.go index 80fcf1c44..fa20fd7b3 100644 --- a/les/server_handler.go +++ b/les/server_handler.go @@ -28,7 +28,6 @@ import ( "github.com/ethereum/go-ethereum/core/forkid" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/les/flowcontrol" "github.com/ethereum/go-ethereum/light" @@ -408,7 +407,7 @@ func (h *serverHandler) broadcastLoop() { defer headSub.Unsubscribe() var ( - lastHead *types.Header + lastHead = h.blockchain.CurrentHeader() lastTd = common.Big0 ) for { diff --git a/les/sync_test.go b/les/sync_test.go index d3bb90df0..3fc2a9c15 100644 --- a/les/sync_test.go +++ b/les/sync_test.go @@ -116,7 +116,7 @@ func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) { } // Create connected peer pair. - peer1, peer2, err := newTestPeerPair("peer", protocol, server.handler, client.handler) + peer1, peer2, err := newTestPeerPair("peer", protocol, server.handler, client.handler, false) if err != nil { t.Fatalf("Failed to connect testing peers %v", err) } @@ -218,7 +218,7 @@ func testMissOracleBackend(t *testing.T, hasCheckpoint bool, protocol int) { } } // Create connected peer pair. - if _, _, err := newTestPeerPair("peer", 2, server.handler, client.handler); err != nil { + if _, _, err := newTestPeerPair("peer", 2, server.handler, client.handler, false); err != nil { t.Fatalf("Failed to connect testing peers %v", err) } select { @@ -291,7 +291,7 @@ func testSyncFromConfiguredCheckpoint(t *testing.T, protocol int) { } } // Create connected peer pair. - if _, _, err := newTestPeerPair("peer", 2, server.handler, client.handler); err != nil { + if _, _, err := newTestPeerPair("peer", 2, server.handler, client.handler, false); err != nil { t.Fatalf("Failed to connect testing peers %v", err) } @@ -364,7 +364,7 @@ func testSyncAll(t *testing.T, protocol int) { } } // Create connected peer pair. - if _, _, err := newTestPeerPair("peer", 2, server.handler, client.handler); err != nil { + if _, _, err := newTestPeerPair("peer", 2, server.handler, client.handler, false); err != nil { t.Fatalf("Failed to connect testing peers %v", err) } diff --git a/les/test_helper.go b/les/test_helper.go index 9ff2583b9..21d0f191c 100644 --- a/les/test_helper.go +++ b/les/test_helper.go @@ -398,7 +398,7 @@ func (p *testPeer) close() { p.app.Close() } -func newTestPeerPair(name string, version int, server *serverHandler, client *clientHandler) (*testPeer, *testPeer, error) { +func newTestPeerPair(name string, version int, server *serverHandler, client *clientHandler, noInitAnnounce bool) (*testPeer, *testPeer, error) { // Create a message pipe to communicate through app, net := p2p.MsgPipe() @@ -423,16 +423,16 @@ func newTestPeerPair(name string, version int, server *serverHandler, client *cl select { case <-client.closeCh: errc2 <- p2p.DiscQuitting - case errc2 <- client.handle(peer2): + case errc2 <- client.handle(peer2, noInitAnnounce): } }() // Ensure the connection is established or exits when any error occurs for { select { case err := <-errc1: - return nil, nil, fmt.Errorf("Failed to establish protocol connection %v", err) + return nil, nil, fmt.Errorf("failed to establish protocol connection %v", err) case err := <-errc2: - return nil, nil, fmt.Errorf("Failed to establish protocol connection %v", err) + return nil, nil, fmt.Errorf("failed to establish protocol connection %v", err) default: } if atomic.LoadUint32(&peer1.serving) == 1 && atomic.LoadUint32(&peer2.serving) == 1 { @@ -473,7 +473,7 @@ func (client *testClient) newRawPeer(t *testing.T, name string, version int, rec select { case <-client.handler.closeCh: errCh <- p2p.DiscQuitting - case errCh <- client.handler.handle(peer): + case errCh <- client.handler.handle(peer, false): } }() tp := &testPeer{ @@ -623,7 +623,7 @@ func newClientServerEnv(t *testing.T, config testnetConfig) (*testServer, *testC if config.connect { done := make(chan struct{}) client.syncEnd = func(_ *types.Header) { close(done) } - cpeer, speer, err = newTestPeerPair("peer", config.protocol, server, client) + cpeer, speer, err = newTestPeerPair("peer", config.protocol, server, client, false) if err != nil { t.Fatalf("Failed to connect testing peers %v", err) } diff --git a/les/ulc_test.go b/les/ulc_test.go index d7308fa59..ecef58d97 100644 --- a/les/ulc_test.go +++ b/les/ulc_test.go @@ -20,6 +20,7 @@ import ( "crypto/rand" "fmt" "net" + "sync/atomic" "testing" "time" @@ -65,7 +66,7 @@ func testULCAnnounceThreshold(t *testing.T, protocol int) { // Connect all servers. for i := 0; i < len(servers); i++ { - connect(servers[i].handler, nodes[i].ID(), c.handler, protocol) + connect(servers[i].handler, nodes[i].ID(), c.handler, protocol, false) } for i := 0; i < len(servers); i++ { for j := 0; j < testcase.height[i]; j++ { @@ -86,7 +87,7 @@ func testULCAnnounceThreshold(t *testing.T, protocol int) { } } -func connect(server *serverHandler, serverId enode.ID, client *clientHandler, protocol int) (*serverPeer, *clientPeer, error) { +func connect(server *serverHandler, serverId enode.ID, client *clientHandler, protocol int, noInitAnnounce bool) (*serverPeer, *clientPeer, error) { // Create a message pipe to communicate through app, net := p2p.MsgPipe() @@ -110,16 +111,22 @@ func connect(server *serverHandler, serverId enode.ID, client *clientHandler, pr select { case <-client.closeCh: errc1 <- p2p.DiscQuitting - case errc1 <- client.handle(peer1): + case errc1 <- client.handle(peer1, noInitAnnounce): } }() - - select { - case <-time.After(time.Millisecond * 100): - case err := <-errc1: - return nil, nil, fmt.Errorf("peerLight handshake error: %v", err) - case err := <-errc2: - return nil, nil, fmt.Errorf("peerFull handshake error: %v", err) + // Ensure the connection is established or exits when any error occurs + for { + select { + case err := <-errc1: + return nil, nil, fmt.Errorf("failed to establish protocol connection %v", err) + case err := <-errc2: + return nil, nil, fmt.Errorf("failed to establish protocol connection %v", err) + default: + } + if atomic.LoadUint32(&peer1.serving) == 1 && atomic.LoadUint32(&peer2.serving) == 1 { + break + } + time.Sleep(50 * time.Millisecond) } return peer1, peer2, nil }