p2p/discover: avoid dropping unverified nodes when table is almost empty (#21396)

This change improves discovery behavior in small networks. Very small
networks would often fail to bootstrap because all member nodes were
dropping table content due to findnode failure. The check is now changed
to avoid dropping nodes on findnode failure when their bucket is almost
empty. It also relaxes the liveness check requirement for FINDNODE/v4
response nodes, returning unverified nodes as results when there aren't
any verified nodes yet.

The "findnode failed" log now reports whether the node was dropped
instead of the number of results. The value of the "results" was
always zero by definition.

Co-authored-by: Felix Lange <fjl@twurst.com>
pull/21479/head
timcooijmans 4 years ago committed by GitHub
parent bdde616f23
commit 7b5107b73f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      p2p/discover/lookup.go
  2. 43
      p2p/discover/table.go
  3. 4
      p2p/discover/table_test.go
  4. 17
      p2p/discover/v4_udp.go
  5. 88
      p2p/discover/v4_udp_test.go

@ -104,9 +104,7 @@ func (it *lookup) startQueries() bool {
// The first query returns nodes from the local table. // The first query returns nodes from the local table.
if it.queries == -1 { if it.queries == -1 {
it.tab.mutex.Lock() closest := it.tab.findnodeByID(it.result.target, bucketSize, false)
closest := it.tab.closest(it.result.target, bucketSize, false)
it.tab.mutex.Unlock()
// Avoid finishing the lookup too quickly if table is empty. It'd be better to wait // Avoid finishing the lookup too quickly if table is empty. It'd be better to wait
// for the table to fill in this case, but there is no good mechanism for that // for the table to fill in this case, but there is no good mechanism for that
// yet. // yet.
@ -150,11 +148,14 @@ func (it *lookup) query(n *node, reply chan<- []*node) {
} else if len(r) == 0 { } else if len(r) == 0 {
fails++ fails++
it.tab.db.UpdateFindFails(n.ID(), n.IP(), fails) it.tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
it.tab.log.Trace("Findnode failed", "id", n.ID(), "failcount", fails, "results", len(r), "err", err) // Remove the node from the local table if it fails to return anything useful too
if fails >= maxFindnodeFailures { // many times, but only if there are enough other nodes in the bucket.
it.tab.log.Trace("Too many findnode failures, dropping", "id", n.ID(), "failcount", fails) dropped := false
if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= bucketSize/2 {
dropped = true
it.tab.delete(n) it.tab.delete(n)
} }
it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "failcount", fails, "dropped", dropped, "err", err)
} else if fails > 0 { } else if fails > 0 {
// Reset failure counter because it counts _consecutive_ failures. // Reset failure counter because it counts _consecutive_ failures.
it.tab.db.UpdateFindFails(n.ID(), n.IP(), 0) it.tab.db.UpdateFindFails(n.ID(), n.IP(), 0)

@ -392,22 +392,35 @@ func (tab *Table) copyLiveNodes() {
} }
} }
// closest returns the n nodes in the table that are closest to the // findnodeByID returns the n nodes in the table that are closest to the given id.
// given id. The caller must hold tab.mutex. // This is used by the FINDNODE/v4 handler.
func (tab *Table) closest(target enode.ID, nresults int, checklive bool) *nodesByDistance { //
// This is a very wasteful way to find the closest nodes but // The preferLive parameter says whether the caller wants liveness-checked results. If
// obviously correct. I believe that tree-based buckets would make // preferLive is true and the table contains any verified nodes, the result will not
// this easier to implement efficiently. // contain unverified nodes. However, if there are no verified nodes at all, the result
close := &nodesByDistance{target: target} // will contain unverified nodes.
func (tab *Table) findnodeByID(target enode.ID, nresults int, preferLive bool) *nodesByDistance {
tab.mutex.Lock()
defer tab.mutex.Unlock()
// Scan all buckets. There might be a better way to do this, but there aren't that many
// buckets, so this solution should be fine. The worst-case complexity of this loop
// is O(tab.len() * nresults).
nodes := &nodesByDistance{target: target}
liveNodes := &nodesByDistance{target: target}
for _, b := range &tab.buckets { for _, b := range &tab.buckets {
for _, n := range b.entries { for _, n := range b.entries {
if checklive && n.livenessChecks == 0 { nodes.push(n, nresults)
continue if preferLive && n.livenessChecks > 0 {
liveNodes.push(n, nresults)
}
} }
close.push(n, nresults)
} }
if preferLive && len(liveNodes.entries) > 0 {
return liveNodes
} }
return close return nodes
} }
// len returns the number of nodes in the table. // len returns the number of nodes in the table.
@ -421,6 +434,14 @@ func (tab *Table) len() (n int) {
return n return n
} }
// bucketLen returns the number of nodes in the bucket for the given ID.
func (tab *Table) bucketLen(id enode.ID) int {
tab.mutex.Lock()
defer tab.mutex.Unlock()
return len(tab.bucket(id).entries)
}
// bucket returns the bucket for the given node ID hash. // bucket returns the bucket for the given node ID hash.
func (tab *Table) bucket(id enode.ID) *bucket { func (tab *Table) bucket(id enode.ID) *bucket {
d := enode.LogDist(tab.self().ID(), id) d := enode.LogDist(tab.self().ID(), id)

@ -190,7 +190,7 @@ func checkIPLimitInvariant(t *testing.T, tab *Table) {
} }
} }
func TestTable_closest(t *testing.T) { func TestTable_findnodeByID(t *testing.T) {
t.Parallel() t.Parallel()
test := func(test *closeTest) bool { test := func(test *closeTest) bool {
@ -202,7 +202,7 @@ func TestTable_closest(t *testing.T) {
fillTable(tab, test.All) fillTable(tab, test.All)
// check that closest(Target, N) returns nodes // check that closest(Target, N) returns nodes
result := tab.closest(test.Target, test.N, false).entries result := tab.findnodeByID(test.Target, test.N, false).entries
if hasDuplicates(result) { if hasDuplicates(result) {
t.Errorf("result contains duplicates") t.Errorf("result contains duplicates")
return false return false

@ -324,7 +324,16 @@ func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubke
Target: target, Target: target,
Expiration: uint64(time.Now().Add(expiration).Unix()), Expiration: uint64(time.Now().Add(expiration).Unix()),
}) })
return nodes, <-rm.errc // Ensure that callers don't see a timeout if the node actually responded. Since
// findnode can receive more than one neighbors response, the reply matcher will be
// active until the remote node sends enough nodes. If the remote end doesn't have
// enough nodes the reply matcher will time out waiting for the second reply, but
// there's no need for an error in that case.
err := <-rm.errc
if err == errTimeout && rm.reply != nil {
err = nil
}
return nodes, err
} }
// RequestENR sends enrRequest to the given node and waits for a response. // RequestENR sends enrRequest to the given node and waits for a response.
@ -453,9 +462,9 @@ func (t *UDPv4) loop() {
if p.from == r.from && p.ptype == r.data.Kind() && p.ip.Equal(r.ip) { if p.from == r.from && p.ptype == r.data.Kind() && p.ip.Equal(r.ip) {
ok, requestDone := p.callback(r.data) ok, requestDone := p.callback(r.data)
matched = matched || ok matched = matched || ok
p.reply = r.data
// Remove the matcher if callback indicates that all replies have been received. // Remove the matcher if callback indicates that all replies have been received.
if requestDone { if requestDone {
p.reply = r.data
p.errc <- nil p.errc <- nil
plist.Remove(el) plist.Remove(el)
} }
@ -715,9 +724,7 @@ func (t *UDPv4) handleFindnode(h *packetHandlerV4, from *net.UDPAddr, fromID eno
// Determine closest nodes. // Determine closest nodes.
target := enode.ID(crypto.Keccak256Hash(req.Target[:])) target := enode.ID(crypto.Keccak256Hash(req.Target[:]))
t.tab.mutex.Lock() closest := t.tab.findnodeByID(target, bucketSize, true).entries
closest := t.tab.closest(target, bucketSize, true).entries
t.tab.mutex.Unlock()
// Send neighbors in chunks with at most maxNeighbors per packet // Send neighbors in chunks with at most maxNeighbors per packet
// to stay below the packet size limit. // to stay below the packet size limit.

@ -22,6 +22,7 @@ import (
crand "crypto/rand" crand "crypto/rand"
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt"
"io" "io"
"math/rand" "math/rand"
"net" "net"
@ -277,7 +278,7 @@ func TestUDPv4_findnode(t *testing.T) {
test.table.db.UpdateLastPongReceived(remoteID, test.remoteaddr.IP, time.Now()) test.table.db.UpdateLastPongReceived(remoteID, test.remoteaddr.IP, time.Now())
// check that closest neighbors are returned. // check that closest neighbors are returned.
expected := test.table.closest(testTarget.ID(), bucketSize, true) expected := test.table.findnodeByID(testTarget.ID(), bucketSize, true)
test.packetIn(nil, &v4wire.Findnode{Target: testTarget, Expiration: futureExp}) test.packetIn(nil, &v4wire.Findnode{Target: testTarget, Expiration: futureExp})
waitNeighbors := func(want []*node) { waitNeighbors := func(want []*node) {
test.waitPacketOut(func(p *v4wire.Neighbors, to *net.UDPAddr, hash []byte) { test.waitPacketOut(func(p *v4wire.Neighbors, to *net.UDPAddr, hash []byte) {
@ -493,6 +494,91 @@ func TestUDPv4_EIP868(t *testing.T) {
}) })
} }
// This test verifies that a small network of nodes can boot up into a healthy state.
func TestUDPv4_smallNetConvergence(t *testing.T) {
t.Parallel()
// Start the network.
nodes := make([]*UDPv4, 4)
for i := range nodes {
var cfg Config
if i > 0 {
bn := nodes[0].Self()
cfg.Bootnodes = []*enode.Node{bn}
}
nodes[i] = startLocalhostV4(t, cfg)
defer nodes[i].Close()
}
// Run through the iterator on all nodes until
// they have all found each other.
status := make(chan error, len(nodes))
for i := range nodes {
node := nodes[i]
go func() {
found := make(map[enode.ID]bool, len(nodes))
it := node.RandomNodes()
for it.Next() {
found[it.Node().ID()] = true
if len(found) == len(nodes) {
status <- nil
return
}
}
status <- fmt.Errorf("node %s didn't find all nodes", node.Self().ID().TerminalString())
}()
}
// Wait for all status reports.
timeout := time.NewTimer(30 * time.Second)
defer timeout.Stop()
for received := 0; received < len(nodes); {
select {
case <-timeout.C:
for _, node := range nodes {
node.Close()
}
case err := <-status:
received++
if err != nil {
t.Error("ERROR:", err)
return
}
}
}
}
func startLocalhostV4(t *testing.T, cfg Config) *UDPv4 {
t.Helper()
cfg.PrivateKey = newkey()
db, _ := enode.OpenDB("")
ln := enode.NewLocalNode(db, cfg.PrivateKey)
// Prefix logs with node ID.
lprefix := fmt.Sprintf("(%s)", ln.ID().TerminalString())
lfmt := log.TerminalFormat(false)
cfg.Log = testlog.Logger(t, log.LvlTrace)
cfg.Log.SetHandler(log.FuncHandler(func(r *log.Record) error {
t.Logf("%s %s", lprefix, lfmt.Format(r))
return nil
}))
// Listen.
socket, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IP{127, 0, 0, 1}})
if err != nil {
t.Fatal(err)
}
realaddr := socket.LocalAddr().(*net.UDPAddr)
ln.SetStaticIP(realaddr.IP)
ln.SetFallbackUDP(realaddr.Port)
udp, err := ListenV4(socket, ln, cfg)
if err != nil {
t.Fatal(err)
}
return udp
}
// dgramPipe is a fake UDP socket. It queues all sent datagrams. // dgramPipe is a fake UDP socket. It queues all sent datagrams.
type dgramPipe struct { type dgramPipe struct {
mu *sync.Mutex mu *sync.Mutex

Loading…
Cancel
Save