les/vflux/client, p2p/nodestate: fix data races (#24058)

Fixes #23848
pull/24107/head
Felföldi Zsolt 3 years ago committed by GitHub
parent 155795be99
commit fc01a7ce8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 39
      les/vflux/client/serverpool_test.go
  2. 9
      p2p/nodestate/nodestate.go

@ -63,7 +63,11 @@ type ServerPoolTest struct {
trusted []string trusted []string
waitCount, waitEnded int32 waitCount, waitEnded int32
lock sync.Mutex // preNegLock protects the cycle counter, testNodes list and its connected field
// (accessed from both the main thread and the preNeg callback)
preNegLock sync.Mutex
queryWg *sync.WaitGroup // a new wait group is created each time the simulation is started
stopping bool // stopping avoid callind queryWg.Add after queryWg.Wait
cycle, conn, servedConn int cycle, conn, servedConn int
serviceCycles, dialCount int serviceCycles, dialCount int
@ -111,13 +115,21 @@ func (s *ServerPoolTest) addTrusted(i int) {
func (s *ServerPoolTest) start() { func (s *ServerPoolTest) start() {
var testQuery QueryFunc var testQuery QueryFunc
s.queryWg = new(sync.WaitGroup)
if s.preNeg { if s.preNeg {
testQuery = func(node *enode.Node) int { testQuery = func(node *enode.Node) int {
s.preNegLock.Lock()
if s.stopping {
s.preNegLock.Unlock()
return 0
}
s.queryWg.Add(1)
idx := testNodeIndex(node.ID()) idx := testNodeIndex(node.ID())
n := &s.testNodes[idx] n := &s.testNodes[idx]
s.lock.Lock()
canConnect := !n.connected && n.connectCycles != 0 && s.cycle >= n.nextConnCycle canConnect := !n.connected && n.connectCycles != 0 && s.cycle >= n.nextConnCycle
s.lock.Unlock() s.preNegLock.Unlock()
defer s.queryWg.Done()
if s.preNegFail { if s.preNegFail {
// simulate a scenario where UDP queries never work // simulate a scenario where UDP queries never work
s.beginWait() s.beginWait()
@ -181,11 +193,20 @@ func (s *ServerPoolTest) start() {
} }
func (s *ServerPoolTest) stop() { func (s *ServerPoolTest) stop() {
// disable further queries and wait if one is currently running
s.preNegLock.Lock()
s.stopping = true
s.preNegLock.Unlock()
s.queryWg.Wait()
quit := make(chan struct{}) quit := make(chan struct{})
s.quit <- quit s.quit <- quit
<-quit <-quit
s.sp.Stop() s.sp.Stop()
s.spi.Close() s.spi.Close()
s.preNegLock.Lock()
s.stopping = false
s.preNegLock.Unlock()
for i := range s.testNodes { for i := range s.testNodes {
n := &s.testNodes[i] n := &s.testNodes[i]
if n.connected { if n.connected {
@ -205,7 +226,9 @@ func (s *ServerPoolTest) run() {
n := &s.testNodes[idx] n := &s.testNodes[idx]
s.sp.UnregisterNode(n.node) s.sp.UnregisterNode(n.node)
n.totalConn += s.cycle n.totalConn += s.cycle
s.preNegLock.Lock()
n.connected = false n.connected = false
s.preNegLock.Unlock()
n.node = nil n.node = nil
s.conn-- s.conn--
if n.service { if n.service {
@ -230,7 +253,9 @@ func (s *ServerPoolTest) run() {
s.servedConn++ s.servedConn++
} }
n.totalConn -= s.cycle n.totalConn -= s.cycle
s.preNegLock.Lock()
n.connected = true n.connected = true
s.preNegLock.Unlock()
dc := s.cycle + n.connectCycles dc := s.cycle + n.connectCycles
s.disconnect[dc] = append(s.disconnect[dc], idx) s.disconnect[dc] = append(s.disconnect[dc], idx)
n.node = dial n.node = dial
@ -242,9 +267,9 @@ func (s *ServerPoolTest) run() {
} }
s.serviceCycles += s.servedConn s.serviceCycles += s.servedConn
s.clock.Run(time.Second) s.clock.Run(time.Second)
s.lock.Lock() s.preNegLock.Lock()
s.cycle++ s.cycle++
s.lock.Unlock() s.preNegLock.Unlock()
} }
} }
@ -255,11 +280,13 @@ func (s *ServerPoolTest) setNodes(count, conn, wait int, service, trusted bool)
idx = rand.Intn(spTestNodes) idx = rand.Intn(spTestNodes)
} }
res = append(res, idx) res = append(res, idx)
s.preNegLock.Lock()
s.testNodes[idx] = spTestNode{ s.testNodes[idx] = spTestNode{
connectCycles: conn, connectCycles: conn,
waitCycles: wait, waitCycles: wait,
service: service, service: service,
} }
s.preNegLock.Unlock()
if trusted { if trusted {
s.addTrusted(idx) s.addTrusted(idx)
} }
@ -273,7 +300,9 @@ func (s *ServerPoolTest) resetNodes() {
n.totalConn += s.cycle n.totalConn += s.cycle
s.sp.UnregisterNode(n.node) s.sp.UnregisterNode(n.node)
} }
s.preNegLock.Lock()
s.testNodes[i] = spTestNode{totalConn: n.totalConn} s.testNodes[i] = spTestNode{totalConn: n.totalConn}
s.preNegLock.Unlock()
} }
s.conn, s.servedConn = 0, 0 s.conn, s.servedConn = 0, 0
s.disconnect = make(map[int][]int) s.disconnect = make(map[int][]int)

@ -808,7 +808,14 @@ func (ns *NodeStateMachine) addTimeout(n *enode.Node, mask bitMask, timeout time
ns.removeTimeouts(node, mask) ns.removeTimeouts(node, mask)
t := &nodeStateTimeout{mask: mask} t := &nodeStateTimeout{mask: mask}
t.timer = ns.clock.AfterFunc(timeout, func() { t.timer = ns.clock.AfterFunc(timeout, func() {
ns.SetState(n, Flags{}, Flags{mask: t.mask, setup: ns.setup}, 0) ns.lock.Lock()
defer ns.lock.Unlock()
if !ns.opStart() {
return
}
ns.setState(n, Flags{}, Flags{mask: t.mask, setup: ns.setup}, 0)
ns.opFinish()
}) })
node.timeouts = append(node.timeouts, t) node.timeouts = append(node.timeouts, t)
if mask&ns.saveFlags != 0 { if mask&ns.saveFlags != 0 {

Loading…
Cancel
Save