p2p/discover: fix UDP reply packet timeout handling

If the timeout fired (even just nanoseconds) before the deadline of the
next pending reply, the timer was not rescheduled. The timer would've
been rescheduled anyway once the next packet was sent, but there were
cases where no next packet could ever be sent due to the locking issue
fixed in the previous commit.

As timing-related bugs go, this issue had been present for a long time
and I could never reproduce it. The test added in this commit did
reproduce the issue on about one out of 15 runs.
pull/1621/head
Felix Lange 9 years ago
parent 01ed3fa1a9
commit 590c99a98f
  1. 70
      p2p/discover/udp.go
  2. 73
      p2p/discover/udp_test.go

@ -18,6 +18,7 @@ package discover
import ( import (
"bytes" "bytes"
"container/list"
"crypto/ecdsa" "crypto/ecdsa"
"errors" "errors"
"fmt" "fmt"
@ -43,6 +44,7 @@ var (
errUnsolicitedReply = errors.New("unsolicited reply") errUnsolicitedReply = errors.New("unsolicited reply")
errUnknownNode = errors.New("unknown node") errUnknownNode = errors.New("unknown node")
errTimeout = errors.New("RPC timeout") errTimeout = errors.New("RPC timeout")
errClockWarp = errors.New("reply deadline too far in the future")
errClosed = errors.New("socket closed") errClosed = errors.New("socket closed")
) )
@ -296,7 +298,7 @@ func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <-
} }
func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool { func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool {
matched := make(chan bool) matched := make(chan bool, 1)
select { select {
case t.gotreply <- reply{from, ptype, req, matched}: case t.gotreply <- reply{from, ptype, req, matched}:
// loop will handle it // loop will handle it
@ -310,68 +312,82 @@ func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool {
// the refresh timer and the pending reply queue. // the refresh timer and the pending reply queue.
func (t *udp) loop() { func (t *udp) loop() {
var ( var (
pending []*pending plist = list.New()
nextDeadline time.Time
timeout = time.NewTimer(0) timeout = time.NewTimer(0)
nextTimeout *pending // head of plist when timeout was last reset
refresh = time.NewTicker(refreshInterval) refresh = time.NewTicker(refreshInterval)
) )
<-timeout.C // ignore first timeout <-timeout.C // ignore first timeout
defer refresh.Stop() defer refresh.Stop()
defer timeout.Stop() defer timeout.Stop()
rearmTimeout := func() { resetTimeout := func() {
if plist.Front() == nil || nextTimeout == plist.Front().Value {
return
}
// Start the timer so it fires when the next pending reply has expired.
now := time.Now() now := time.Now()
if len(pending) == 0 || now.Before(nextDeadline) { for el := plist.Front(); el != nil; el = el.Next() {
nextTimeout = el.Value.(*pending)
if dist := nextTimeout.deadline.Sub(now); dist < 2*respTimeout {
timeout.Reset(dist)
return return
} }
nextDeadline = pending[0].deadline // Remove pending replies whose deadline is too far in the
timeout.Reset(nextDeadline.Sub(now)) // future. These can occur if the system clock jumped
// backwards after the deadline was assigned.
nextTimeout.errc <- errClockWarp
plist.Remove(el)
}
nextTimeout = nil
timeout.Stop()
} }
for { for {
resetTimeout()
select { select {
case <-refresh.C: case <-refresh.C:
go t.refresh() go t.refresh()
case <-t.closing: case <-t.closing:
for _, p := range pending { for el := plist.Front(); el != nil; el = el.Next() {
p.errc <- errClosed el.Value.(*pending).errc <- errClosed
} }
pending = nil
return return
case p := <-t.addpending: case p := <-t.addpending:
p.deadline = time.Now().Add(respTimeout) p.deadline = time.Now().Add(respTimeout)
pending = append(pending, p) plist.PushBack(p)
rearmTimeout()
case r := <-t.gotreply: case r := <-t.gotreply:
var matched bool var matched bool
for i := 0; i < len(pending); i++ { for el := plist.Front(); el != nil; el = el.Next() {
if p := pending[i]; p.from == r.from && p.ptype == r.ptype { p := el.Value.(*pending)
if p.from == r.from && p.ptype == r.ptype {
matched = true matched = true
// Remove the matcher if its callback indicates
// that all replies have been received. This is
// required for packet types that expect multiple
// reply packets.
if p.callback(r.data) { if p.callback(r.data) {
// callback indicates the request is done, remove it.
p.errc <- nil p.errc <- nil
copy(pending[i:], pending[i+1:]) plist.Remove(el)
pending = pending[:len(pending)-1]
i--
} }
} }
} }
r.matched <- matched r.matched <- matched
case now := <-timeout.C: case now := <-timeout.C:
// notify and remove callbacks whose deadline is in the past. nextTimeout = nil
i := 0 // Notify and remove callbacks whose deadline is in the past.
for ; i < len(pending) && now.After(pending[i].deadline); i++ { for el := plist.Front(); el != nil; el = el.Next() {
pending[i].errc <- errTimeout p := el.Value.(*pending)
if now.After(p.deadline) || now.Equal(p.deadline) {
p.errc <- errTimeout
plist.Remove(el)
} }
if i > 0 {
copy(pending, pending[i:])
pending = pending[:len(pending)-i]
} }
rearmTimeout()
} }
} }
} }
@ -385,7 +401,7 @@ const (
var ( var (
headSpace = make([]byte, headSize) headSpace = make([]byte, headSize)
// Neighbors responses are sent across multiple packets to // Neighbors replies are sent across multiple packets to
// stay below the 1280 byte limit. We compute the maximum number // stay below the 1280 byte limit. We compute the maximum number
// of entries by stuffing a packet until it grows too large. // of entries by stuffing a packet until it grows too large.
maxNeighbors int maxNeighbors int

@ -19,10 +19,12 @@ package discover
import ( import (
"bytes" "bytes"
"crypto/ecdsa" "crypto/ecdsa"
"encoding/binary"
"errors" "errors"
"fmt" "fmt"
"io" "io"
logpkg "log" logpkg "log"
"math/rand"
"net" "net"
"os" "os"
"path/filepath" "path/filepath"
@ -138,6 +140,77 @@ func TestUDP_pingTimeout(t *testing.T) {
} }
} }
func TestUDP_responseTimeouts(t *testing.T) {
t.Parallel()
test := newUDPTest(t)
defer test.table.Close()
rand.Seed(time.Now().UnixNano())
randomDuration := func(max time.Duration) time.Duration {
return time.Duration(rand.Int63n(int64(max)))
}
var (
nReqs = 200
nTimeouts = 0 // number of requests with ptype > 128
nilErr = make(chan error, nReqs) // for requests that get a reply
timeoutErr = make(chan error, nReqs) // for requests that time out
)
for i := 0; i < nReqs; i++ {
// Create a matcher for a random request in udp.loop. Requests
// with ptype <= 128 will not get a reply and should time out.
// For all other requests, a reply is scheduled to arrive
// within the timeout window.
p := &pending{
ptype: byte(rand.Intn(255)),
callback: func(interface{}) bool { return true },
}
binary.BigEndian.PutUint64(p.from[:], uint64(i))
if p.ptype <= 128 {
p.errc = timeoutErr
nTimeouts++
} else {
p.errc = nilErr
time.AfterFunc(randomDuration(60*time.Millisecond), func() {
if !test.udp.handleReply(p.from, p.ptype, nil) {
t.Logf("not matched: %v", p)
}
})
}
test.udp.addpending <- p
time.Sleep(randomDuration(30 * time.Millisecond))
}
// Check that all timeouts were delivered and that the rest got nil errors.
// The replies must be delivered.
var (
recvDeadline = time.After(20 * time.Second)
nTimeoutsRecv, nNil = 0, 0
)
for i := 0; i < nReqs; i++ {
select {
case err := <-timeoutErr:
if err != errTimeout {
t.Fatalf("got non-timeout error on timeoutErr %d: %v", i, err)
}
nTimeoutsRecv++
case err := <-nilErr:
if err != nil {
t.Fatalf("got non-nil error on nilErr %d: %v", i, err)
}
nNil++
case <-recvDeadline:
t.Fatalf("exceeded recv deadline")
}
}
if nTimeoutsRecv != nTimeouts {
t.Errorf("wrong number of timeout errors received: got %d, want %d", nTimeoutsRecv, nTimeouts)
}
if nNil != nReqs-nTimeouts {
t.Errorf("wrong number of successful replies: got %d, want %d", nNil, nReqs-nTimeouts)
}
}
func TestUDP_findnodeTimeout(t *testing.T) { func TestUDP_findnodeTimeout(t *testing.T) {
t.Parallel() t.Parallel()
test := newUDPTest(t) test := newUDPTest(t)

Loading…
Cancel
Save