les: fix data races in tests (#23457)

pull/23461/head
gary rong 3 years ago committed by GitHub
parent fe2f153b55
commit 83ad92c421
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      les/client_handler.go
  2. 8
      les/fetcher.go
  3. 49
      les/fetcher_test.go
  4. 6
      les/odr_test.go
  5. 3
      les/server_handler.go
  6. 8
      les/sync_test.go
  7. 12
      les/test_helper.go
  8. 27
      les/ulc_test.go

@ -100,11 +100,11 @@ func (h *clientHandler) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter)
defer peer.close() defer peer.close()
h.wg.Add(1) h.wg.Add(1)
defer h.wg.Done() defer h.wg.Done()
err := h.handle(peer) err := h.handle(peer, false)
return err return err
} }
func (h *clientHandler) handle(p *serverPeer) error { func (h *clientHandler) handle(p *serverPeer, noInitAnnounce bool) error {
if h.backend.peers.len() >= h.backend.config.LightPeers && !p.Peer.Info().Network.Trusted { if h.backend.peers.len() >= h.backend.config.LightPeers && !p.Peer.Info().Network.Trusted {
return p2p.DiscTooManyPeers return p2p.DiscTooManyPeers
} }
@ -143,8 +143,11 @@ func (h *clientHandler) handle(p *serverPeer) error {
connectionTimer.Update(time.Duration(mclock.Now() - connectedAt)) connectionTimer.Update(time.Duration(mclock.Now() - connectedAt))
serverConnectionGauge.Update(int64(h.backend.peers.len())) serverConnectionGauge.Update(int64(h.backend.peers.len()))
}() }()
h.fetcher.announce(p, &announceData{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td}) // It's mainly used in testing which requires discarding initial
// signal to prevent syncing.
if !noInitAnnounce {
h.fetcher.announce(p, &announceData{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td})
}
// Mark the peer starts to be served. // Mark the peer starts to be served.
atomic.StoreUint32(&p.serving, 1) atomic.StoreUint32(&p.serving, 1)
defer atomic.StoreUint32(&p.serving, 0) defer atomic.StoreUint32(&p.serving, 0)

@ -153,9 +153,7 @@ type lightFetcher struct {
synchronise func(peer *serverPeer) synchronise func(peer *serverPeer)
// Test fields or hooks // Test fields or hooks
noAnnounce bool
newHeadHook func(*types.Header) newHeadHook func(*types.Header)
newAnnounce func(*serverPeer, *announceData)
} }
// newLightFetcher creates a light fetcher instance. // newLightFetcher creates a light fetcher instance.
@ -474,12 +472,6 @@ func (f *lightFetcher) mainloop() {
// announce processes a new announcement message received from a peer. // announce processes a new announcement message received from a peer.
func (f *lightFetcher) announce(p *serverPeer, head *announceData) { func (f *lightFetcher) announce(p *serverPeer, head *announceData) {
if f.newAnnounce != nil {
f.newAnnounce(p, head)
}
if f.noAnnounce {
return
}
select { select {
case f.announceCh <- &announce{peerid: p.ID(), trust: p.trusted, data: head}: case f.announceCh <- &announce{peerid: p.ID(), trust: p.trusted, data: head}:
case <-f.closeCh: case <-f.closeCh:

@ -74,14 +74,12 @@ func testSequentialAnnouncements(t *testing.T, protocol int) {
s, c, teardown := newClientServerEnv(t, netconfig) s, c, teardown := newClientServerEnv(t, netconfig)
defer teardown() defer teardown()
// Create connected peer pair. // Create connected peer pair, the initial signal from LES server
c.handler.fetcher.noAnnounce = true // Ignore the first announce from peer which can trigger a resync. // is discarded to prevent syncing.
p1, _, err := newTestPeerPair("peer", protocol, s.handler, c.handler) p1, _, err := newTestPeerPair("peer", protocol, s.handler, c.handler, true)
if err != nil { if err != nil {
t.Fatalf("Failed to create peer pair %v", err) t.Fatalf("Failed to create peer pair %v", err)
} }
c.handler.fetcher.noAnnounce = false
importCh := make(chan interface{}) importCh := make(chan interface{})
c.handler.fetcher.newHeadHook = func(header *types.Header) { c.handler.fetcher.newHeadHook = func(header *types.Header) {
importCh <- header importCh <- header
@ -114,14 +112,12 @@ func testGappedAnnouncements(t *testing.T, protocol int) {
s, c, teardown := newClientServerEnv(t, netconfig) s, c, teardown := newClientServerEnv(t, netconfig)
defer teardown() defer teardown()
// Create connected peer pair. // Create connected peer pair, the initial signal from LES server
c.handler.fetcher.noAnnounce = true // Ignore the first announce from peer which can trigger a resync. // is discarded to prevent syncing.
peer, _, err := newTestPeerPair("peer", protocol, s.handler, c.handler) peer, _, err := newTestPeerPair("peer", protocol, s.handler, c.handler, true)
if err != nil { if err != nil {
t.Fatalf("Failed to create peer pair %v", err) t.Fatalf("Failed to create peer pair %v", err)
} }
c.handler.fetcher.noAnnounce = false
done := make(chan *types.Header, 1) done := make(chan *types.Header, 1)
c.handler.fetcher.newHeadHook = func(header *types.Header) { done <- header } c.handler.fetcher.newHeadHook = func(header *types.Header) { done <- header }
@ -141,29 +137,11 @@ func testGappedAnnouncements(t *testing.T, protocol int) {
verifyChainHeight(t, c.handler.fetcher, 4) verifyChainHeight(t, c.handler.fetcher, 4)
// Send a reorged announcement // Send a reorged announcement
var newAnno = make(chan struct{}, 1)
c.handler.fetcher.noAnnounce = true
c.handler.fetcher.newAnnounce = func(*serverPeer, *announceData) {
newAnno <- struct{}{}
}
blocks, _ := core.GenerateChain(rawdb.ReadChainConfig(s.db, s.backend.Blockchain().Genesis().Hash()), s.backend.Blockchain().GetBlockByNumber(3), blocks, _ := core.GenerateChain(rawdb.ReadChainConfig(s.db, s.backend.Blockchain().Genesis().Hash()), s.backend.Blockchain().GetBlockByNumber(3),
ethash.NewFaker(), s.db, 2, func(i int, gen *core.BlockGen) { ethash.NewFaker(), s.db, 2, func(i int, gen *core.BlockGen) {
gen.OffsetTime(-9) // higher block difficulty gen.OffsetTime(-9) // higher block difficulty
}) })
s.backend.Blockchain().InsertChain(blocks) s.backend.Blockchain().InsertChain(blocks)
<-newAnno
c.handler.fetcher.noAnnounce = false
c.handler.fetcher.newAnnounce = nil
latest = blocks[len(blocks)-1].Header()
hash, number = latest.Hash(), latest.Number.Uint64()
td = rawdb.ReadTd(s.db, hash, number)
announce = announceData{hash, number, td, 1, nil}
if peer.cpeer.announceType == announceTypeSigned {
announce.sign(s.handler.server.privateKey)
}
peer.cpeer.sendAnnounce(announce)
<-done // Wait syncing <-done // Wait syncing
verifyChainHeight(t, c.handler.fetcher, 5) verifyChainHeight(t, c.handler.fetcher, 5)
@ -206,20 +184,15 @@ func testTrustedAnnouncement(t *testing.T, protocol int) {
teardowns[i]() teardowns[i]()
} }
}() }()
c.handler.fetcher.noAnnounce = true // Ignore the first announce from peer which can trigger a resync.
// Connect all server instances. // Connect all server instances.
for i := 0; i < len(servers); i++ { for i := 0; i < len(servers); i++ {
sp, cp, err := connect(servers[i].handler, nodes[i].ID(), c.handler, protocol) sp, cp, err := connect(servers[i].handler, nodes[i].ID(), c.handler, protocol, true)
if err != nil { if err != nil {
t.Fatalf("connect server and client failed, err %s", err) t.Fatalf("connect server and client failed, err %s", err)
} }
cpeers = append(cpeers, cp) cpeers = append(cpeers, cp)
speers = append(speers, sp) speers = append(speers, sp)
} }
c.handler.fetcher.noAnnounce = false
newHead := make(chan *types.Header, 1) newHead := make(chan *types.Header, 1)
c.handler.fetcher.newHeadHook = func(header *types.Header) { newHead <- header } c.handler.fetcher.newHeadHook = func(header *types.Header) { newHead <- header }
@ -262,14 +235,12 @@ func testInvalidAnnounces(t *testing.T, protocol int) {
s, c, teardown := newClientServerEnv(t, netconfig) s, c, teardown := newClientServerEnv(t, netconfig)
defer teardown() defer teardown()
// Create connected peer pair. // Create connected peer pair, the initial signal from LES server
c.handler.fetcher.noAnnounce = true // Ignore the first announce from peer which can trigger a resync. // is discarded to prevent syncing.
peer, _, err := newTestPeerPair("peer", lpv3, s.handler, c.handler) peer, _, err := newTestPeerPair("peer", lpv3, s.handler, c.handler, true)
if err != nil { if err != nil {
t.Fatalf("Failed to create peer pair %v", err) t.Fatalf("Failed to create peer pair %v", err)
} }
c.handler.fetcher.noAnnounce = false
done := make(chan *types.Header, 1) done := make(chan *types.Header, 1)
c.handler.fetcher.newHeadHook = func(header *types.Header) { done <- header } c.handler.fetcher.newHeadHook = func(header *types.Header) { done <- header }

@ -401,9 +401,9 @@ func testGetTxStatusFromUnindexedPeers(t *testing.T, protocol int) {
closeFns = append(closeFns, closePeer) closeFns = append(closeFns, closePeer)
// Create a one-time routine for serving message // Create a one-time routine for serving message
go func(i int, peer *testPeer) { go func(i int, peer *testPeer, lookup uint64) {
serveMsg(peer, testspec.txLookups[i]) serveMsg(peer, lookup)
}(i, peer) }(i, peer, testspec.txLookups[i])
} }
// Send out the GetTxStatus requests, compare the result with // Send out the GetTxStatus requests, compare the result with

@ -28,7 +28,6 @@ import (
"github.com/ethereum/go-ethereum/core/forkid" "github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/flowcontrol" "github.com/ethereum/go-ethereum/les/flowcontrol"
"github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/light"
@ -408,7 +407,7 @@ func (h *serverHandler) broadcastLoop() {
defer headSub.Unsubscribe() defer headSub.Unsubscribe()
var ( var (
lastHead *types.Header lastHead = h.blockchain.CurrentHeader()
lastTd = common.Big0 lastTd = common.Big0
) )
for { for {

@ -116,7 +116,7 @@ func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) {
} }
// Create connected peer pair. // Create connected peer pair.
peer1, peer2, err := newTestPeerPair("peer", protocol, server.handler, client.handler) peer1, peer2, err := newTestPeerPair("peer", protocol, server.handler, client.handler, false)
if err != nil { if err != nil {
t.Fatalf("Failed to connect testing peers %v", err) t.Fatalf("Failed to connect testing peers %v", err)
} }
@ -218,7 +218,7 @@ func testMissOracleBackend(t *testing.T, hasCheckpoint bool, protocol int) {
} }
} }
// Create connected peer pair. // Create connected peer pair.
if _, _, err := newTestPeerPair("peer", 2, server.handler, client.handler); err != nil { if _, _, err := newTestPeerPair("peer", 2, server.handler, client.handler, false); err != nil {
t.Fatalf("Failed to connect testing peers %v", err) t.Fatalf("Failed to connect testing peers %v", err)
} }
select { select {
@ -291,7 +291,7 @@ func testSyncFromConfiguredCheckpoint(t *testing.T, protocol int) {
} }
} }
// Create connected peer pair. // Create connected peer pair.
if _, _, err := newTestPeerPair("peer", 2, server.handler, client.handler); err != nil { if _, _, err := newTestPeerPair("peer", 2, server.handler, client.handler, false); err != nil {
t.Fatalf("Failed to connect testing peers %v", err) t.Fatalf("Failed to connect testing peers %v", err)
} }
@ -364,7 +364,7 @@ func testSyncAll(t *testing.T, protocol int) {
} }
} }
// Create connected peer pair. // Create connected peer pair.
if _, _, err := newTestPeerPair("peer", 2, server.handler, client.handler); err != nil { if _, _, err := newTestPeerPair("peer", 2, server.handler, client.handler, false); err != nil {
t.Fatalf("Failed to connect testing peers %v", err) t.Fatalf("Failed to connect testing peers %v", err)
} }

@ -398,7 +398,7 @@ func (p *testPeer) close() {
p.app.Close() p.app.Close()
} }
func newTestPeerPair(name string, version int, server *serverHandler, client *clientHandler) (*testPeer, *testPeer, error) { func newTestPeerPair(name string, version int, server *serverHandler, client *clientHandler, noInitAnnounce bool) (*testPeer, *testPeer, error) {
// Create a message pipe to communicate through // Create a message pipe to communicate through
app, net := p2p.MsgPipe() app, net := p2p.MsgPipe()
@ -423,16 +423,16 @@ func newTestPeerPair(name string, version int, server *serverHandler, client *cl
select { select {
case <-client.closeCh: case <-client.closeCh:
errc2 <- p2p.DiscQuitting errc2 <- p2p.DiscQuitting
case errc2 <- client.handle(peer2): case errc2 <- client.handle(peer2, noInitAnnounce):
} }
}() }()
// Ensure the connection is established or exits when any error occurs // Ensure the connection is established or exits when any error occurs
for { for {
select { select {
case err := <-errc1: case err := <-errc1:
return nil, nil, fmt.Errorf("Failed to establish protocol connection %v", err) return nil, nil, fmt.Errorf("failed to establish protocol connection %v", err)
case err := <-errc2: case err := <-errc2:
return nil, nil, fmt.Errorf("Failed to establish protocol connection %v", err) return nil, nil, fmt.Errorf("failed to establish protocol connection %v", err)
default: default:
} }
if atomic.LoadUint32(&peer1.serving) == 1 && atomic.LoadUint32(&peer2.serving) == 1 { if atomic.LoadUint32(&peer1.serving) == 1 && atomic.LoadUint32(&peer2.serving) == 1 {
@ -473,7 +473,7 @@ func (client *testClient) newRawPeer(t *testing.T, name string, version int, rec
select { select {
case <-client.handler.closeCh: case <-client.handler.closeCh:
errCh <- p2p.DiscQuitting errCh <- p2p.DiscQuitting
case errCh <- client.handler.handle(peer): case errCh <- client.handler.handle(peer, false):
} }
}() }()
tp := &testPeer{ tp := &testPeer{
@ -623,7 +623,7 @@ func newClientServerEnv(t *testing.T, config testnetConfig) (*testServer, *testC
if config.connect { if config.connect {
done := make(chan struct{}) done := make(chan struct{})
client.syncEnd = func(_ *types.Header) { close(done) } client.syncEnd = func(_ *types.Header) { close(done) }
cpeer, speer, err = newTestPeerPair("peer", config.protocol, server, client) cpeer, speer, err = newTestPeerPair("peer", config.protocol, server, client, false)
if err != nil { if err != nil {
t.Fatalf("Failed to connect testing peers %v", err) t.Fatalf("Failed to connect testing peers %v", err)
} }

@ -20,6 +20,7 @@ import (
"crypto/rand" "crypto/rand"
"fmt" "fmt"
"net" "net"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -65,7 +66,7 @@ func testULCAnnounceThreshold(t *testing.T, protocol int) {
// Connect all servers. // Connect all servers.
for i := 0; i < len(servers); i++ { for i := 0; i < len(servers); i++ {
connect(servers[i].handler, nodes[i].ID(), c.handler, protocol) connect(servers[i].handler, nodes[i].ID(), c.handler, protocol, false)
} }
for i := 0; i < len(servers); i++ { for i := 0; i < len(servers); i++ {
for j := 0; j < testcase.height[i]; j++ { for j := 0; j < testcase.height[i]; j++ {
@ -86,7 +87,7 @@ func testULCAnnounceThreshold(t *testing.T, protocol int) {
} }
} }
func connect(server *serverHandler, serverId enode.ID, client *clientHandler, protocol int) (*serverPeer, *clientPeer, error) { func connect(server *serverHandler, serverId enode.ID, client *clientHandler, protocol int, noInitAnnounce bool) (*serverPeer, *clientPeer, error) {
// Create a message pipe to communicate through // Create a message pipe to communicate through
app, net := p2p.MsgPipe() app, net := p2p.MsgPipe()
@ -110,16 +111,22 @@ func connect(server *serverHandler, serverId enode.ID, client *clientHandler, pr
select { select {
case <-client.closeCh: case <-client.closeCh:
errc1 <- p2p.DiscQuitting errc1 <- p2p.DiscQuitting
case errc1 <- client.handle(peer1): case errc1 <- client.handle(peer1, noInitAnnounce):
} }
}() }()
// Ensure the connection is established or exits when any error occurs
select { for {
case <-time.After(time.Millisecond * 100): select {
case err := <-errc1: case err := <-errc1:
return nil, nil, fmt.Errorf("peerLight handshake error: %v", err) return nil, nil, fmt.Errorf("failed to establish protocol connection %v", err)
case err := <-errc2: case err := <-errc2:
return nil, nil, fmt.Errorf("peerFull handshake error: %v", err) return nil, nil, fmt.Errorf("failed to establish protocol connection %v", err)
default:
}
if atomic.LoadUint32(&peer1.serving) == 1 && atomic.LoadUint32(&peer2.serving) == 1 {
break
}
time.Sleep(50 * time.Millisecond)
} }
return peer1, peer2, nil return peer1, peer2, nil
} }

Loading…
Cancel
Save