From 6a9158bb1bea9feda96489f1d02e15f0a9f93b82 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 23 May 2024 14:26:09 +0200 Subject: [PATCH] p2p/discover: improved node revalidation (#29572) Node discovery periodically revalidates the nodes in its table by sending PING, checking if they are still alive. I recently noticed some issues with the implementation of this process, which can cause strange results such as nodes dropping unexpectedly, certain nodes not getting revalidated often enough, and bad results being returned to incoming FINDNODE queries. In this change, the revalidation process is improved with the following logic: - We maintain two 'revalidation lists' containing the table nodes, named 'fast' and 'slow'. - The process chooses random nodes from each list on a randomized interval, the interval being faster for the 'fast' list, and performs revalidation for the chosen node. - Whenever a node is newly inserted into the table, it goes into the 'fast' list. Once validation passes, it transfers to the 'slow' list. If a request fails, or the node changes endpoint, it transfers back into 'fast'. - livenessChecks is incremented by one for successful checks. Unlike the old implementation, we will not drop the node on the first failing check. We instead quickly decay the livenessChecks give it another chance. - Order of nodes in bucket doesn't matter anymore. I am also adding a debug API endpoint to dump the node table content. Co-authored-by: Martin HS --- cmd/devp2p/discv4cmd.go | 57 +++ internal/testlog/testlog.go | 2 +- node/api.go | 17 + p2p/discover/common.go | 47 ++- p2p/discover/lookup.go | 29 +- p2p/discover/node.go | 20 +- p2p/discover/table.go | 707 +++++++++++++++----------------- p2p/discover/table_reval.go | 223 ++++++++++ p2p/discover/table_test.go | 181 ++++---- p2p/discover/table_util_test.go | 118 +++++- p2p/discover/v4_udp.go | 10 +- p2p/discover/v4_udp_test.go | 2 +- p2p/discover/v5_udp.go | 4 +- p2p/discover/v5_udp_test.go | 2 +- p2p/server.go | 36 +- 15 files changed, 931 insertions(+), 524 deletions(-) create mode 100644 p2p/discover/table_reval.go diff --git a/cmd/devp2p/discv4cmd.go b/cmd/devp2p/discv4cmd.go index 45bcdcd367..3b5400ca3a 100644 --- a/cmd/devp2p/discv4cmd.go +++ b/cmd/devp2p/discv4cmd.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "net" + "net/http" "strconv" "strings" "time" @@ -28,9 +29,11 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/internal/flags" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rpc" "github.com/urfave/cli/v2" ) @@ -45,6 +48,7 @@ var ( discv4ResolveJSONCommand, discv4CrawlCommand, discv4TestCommand, + discv4ListenCommand, }, } discv4PingCommand = &cli.Command{ @@ -75,6 +79,14 @@ var ( Flags: discoveryNodeFlags, ArgsUsage: "", } + discv4ListenCommand = &cli.Command{ + Name: "listen", + Usage: "Runs a discovery node", + Action: discv4Listen, + Flags: flags.Merge(discoveryNodeFlags, []cli.Flag{ + httpAddrFlag, + }), + } discv4CrawlCommand = &cli.Command{ Name: "crawl", Usage: "Updates a nodes.json file with random nodes found in the DHT", @@ -131,6 +143,10 @@ var ( Usage: "Enode of the remote node under test", EnvVars: []string{"REMOTE_ENODE"}, } + httpAddrFlag = &cli.StringFlag{ + Name: "rpc", + Usage: "HTTP server listening address", + } ) var discoveryNodeFlags = []cli.Flag{ @@ -154,6 +170,27 @@ func discv4Ping(ctx *cli.Context) error { return nil } +func discv4Listen(ctx *cli.Context) error { + disc, _ := startV4(ctx) + defer disc.Close() + + fmt.Println(disc.Self()) + + httpAddr := ctx.String(httpAddrFlag.Name) + if httpAddr == "" { + // Non-HTTP mode. + select {} + } + + api := &discv4API{disc} + log.Info("Starting RPC API server", "addr", httpAddr) + srv := rpc.NewServer() + srv.RegisterName("discv4", api) + http.DefaultServeMux.Handle("/", srv) + httpsrv := http.Server{Addr: httpAddr, Handler: http.DefaultServeMux} + return httpsrv.ListenAndServe() +} + func discv4RequestRecord(ctx *cli.Context) error { n := getNodeArg(ctx) disc, _ := startV4(ctx) @@ -362,3 +399,23 @@ func parseBootnodes(ctx *cli.Context) ([]*enode.Node, error) { } return nodes, nil } + +type discv4API struct { + host *discover.UDPv4 +} + +func (api *discv4API) LookupRandom(n int) (ns []*enode.Node) { + it := api.host.RandomNodes() + for len(ns) < n && it.Next() { + ns = append(ns, it.Node()) + } + return ns +} + +func (api *discv4API) Buckets() [][]discover.BucketNode { + return api.host.TableBuckets() +} + +func (api *discv4API) Self() *enode.Node { + return api.host.Self() +} diff --git a/internal/testlog/testlog.go b/internal/testlog/testlog.go index 3740dd1f24..ad61af9eac 100644 --- a/internal/testlog/testlog.go +++ b/internal/testlog/testlog.go @@ -58,7 +58,7 @@ func (h *bufHandler) Handle(_ context.Context, r slog.Record) error { } func (h *bufHandler) Enabled(_ context.Context, lvl slog.Level) bool { - return lvl <= h.level + return lvl >= h.level } func (h *bufHandler) WithAttrs(attrs []slog.Attr) slog.Handler { diff --git a/node/api.go b/node/api.go index a71ae6aa29..33dfb3a1cc 100644 --- a/node/api.go +++ b/node/api.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/internal/debug" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rpc" ) @@ -39,6 +40,9 @@ func (n *Node) apis() []rpc.API { }, { Namespace: "debug", Service: debug.Handler, + }, { + Namespace: "debug", + Service: &p2pDebugAPI{n}, }, { Namespace: "web3", Service: &web3API{n}, @@ -333,3 +337,16 @@ func (s *web3API) ClientVersion() string { func (s *web3API) Sha3(input hexutil.Bytes) hexutil.Bytes { return crypto.Keccak256(input) } + +// p2pDebugAPI provides access to p2p internals for debugging. +type p2pDebugAPI struct { + stack *Node +} + +func (s *p2pDebugAPI) DiscoveryV4Table() [][]discover.BucketNode { + disc := s.stack.server.DiscoveryV4() + if disc != nil { + return disc.TableBuckets() + } + return nil +} diff --git a/p2p/discover/common.go b/p2p/discover/common.go index 1f763904bb..bebea8cc38 100644 --- a/p2p/discover/common.go +++ b/p2p/discover/common.go @@ -18,7 +18,11 @@ package discover import ( "crypto/ecdsa" + crand "crypto/rand" + "encoding/binary" + "math/rand" "net" + "sync" "time" "github.com/ethereum/go-ethereum/common/mclock" @@ -62,7 +66,7 @@ type Config struct { func (cfg Config) withDefaults() Config { // Node table configuration: if cfg.PingInterval == 0 { - cfg.PingInterval = 10 * time.Second + cfg.PingInterval = 3 * time.Second } if cfg.RefreshInterval == 0 { cfg.RefreshInterval = 30 * time.Minute @@ -92,3 +96,44 @@ type ReadPacket struct { Data []byte Addr *net.UDPAddr } + +type randomSource interface { + Intn(int) int + Int63n(int64) int64 + Shuffle(int, func(int, int)) +} + +// reseedingRandom is a random number generator that tracks when it was last re-seeded. +type reseedingRandom struct { + mu sync.Mutex + cur *rand.Rand +} + +func (r *reseedingRandom) seed() { + var b [8]byte + crand.Read(b[:]) + seed := binary.BigEndian.Uint64(b[:]) + new := rand.New(rand.NewSource(int64(seed))) + + r.mu.Lock() + r.cur = new + r.mu.Unlock() +} + +func (r *reseedingRandom) Intn(n int) int { + r.mu.Lock() + defer r.mu.Unlock() + return r.cur.Intn(n) +} + +func (r *reseedingRandom) Int63n(n int64) int64 { + r.mu.Lock() + defer r.mu.Unlock() + return r.cur.Int63n(n) +} + +func (r *reseedingRandom) Shuffle(n int, swap func(i, j int)) { + r.mu.Lock() + defer r.mu.Unlock() + r.cur.Shuffle(n, swap) +} diff --git a/p2p/discover/lookup.go b/p2p/discover/lookup.go index b8d97b44e1..5c3d90d6c9 100644 --- a/p2p/discover/lookup.go +++ b/p2p/discover/lookup.go @@ -140,32 +140,13 @@ func (it *lookup) slowdown() { } func (it *lookup) query(n *node, reply chan<- []*node) { - fails := it.tab.db.FindFails(n.ID(), n.IP()) r, err := it.queryfunc(n) - if errors.Is(err, errClosed) { - // Avoid recording failures on shutdown. - reply <- nil - return - } else if len(r) == 0 { - fails++ - it.tab.db.UpdateFindFails(n.ID(), n.IP(), fails) - // Remove the node from the local table if it fails to return anything useful too - // many times, but only if there are enough other nodes in the bucket. - dropped := false - if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= bucketSize/2 { - dropped = true - it.tab.delete(n) + if !errors.Is(err, errClosed) { // avoid recording failures on shutdown. + success := len(r) > 0 + it.tab.trackRequest(n, success, r) + if err != nil { + it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "err", err) } - it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "failcount", fails, "dropped", dropped, "err", err) - } else if fails > 0 { - // Reset failure counter because it counts _consecutive_ failures. - it.tab.db.UpdateFindFails(n.ID(), n.IP(), 0) - } - - // Grab as many nodes as possible. Some of them might not be alive anymore, but we'll - // just remove those again during revalidation. - for _, n := range r { - it.tab.addSeenNode(n) } reply <- r } diff --git a/p2p/discover/node.go b/p2p/discover/node.go index 9ffe101ccf..47df09e883 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -29,12 +29,22 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" ) +type BucketNode struct { + Node *enode.Node `json:"node"` + AddedToTable time.Time `json:"addedToTable"` + AddedToBucket time.Time `json:"addedToBucket"` + Checks int `json:"checks"` + Live bool `json:"live"` +} + // node represents a host on the network. // The fields of Node may not be modified. type node struct { - enode.Node - addedAt time.Time // time when the node was added to the table - livenessChecks uint // how often liveness was checked + *enode.Node + addedToTable time.Time // first time node was added to bucket or replacement list + addedToBucket time.Time // time it was added in the actual bucket + livenessChecks uint // how often liveness was checked + isValidatedLive bool // true if existence of node is considered validated right now } type encPubkey [64]byte @@ -65,7 +75,7 @@ func (e encPubkey) id() enode.ID { } func wrapNode(n *enode.Node) *node { - return &node{Node: *n} + return &node{Node: n} } func wrapNodes(ns []*enode.Node) []*node { @@ -77,7 +87,7 @@ func wrapNodes(ns []*enode.Node) []*node { } func unwrapNode(n *node) *enode.Node { - return &n.Node + return n.Node } func unwrapNodes(ns []*node) []*enode.Node { diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 2b7a28708b..74c0e930e4 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -24,16 +24,15 @@ package discover import ( "context" - crand "crypto/rand" - "encoding/binary" "fmt" - mrand "math/rand" "net" + "slices" "sort" "sync" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/enode" @@ -55,21 +54,21 @@ const ( bucketIPLimit, bucketSubnet = 2, 24 // at most 2 addresses from the same /24 tableIPLimit, tableSubnet = 10, 24 - copyNodesInterval = 30 * time.Second - seedMinTableTime = 5 * time.Minute - seedCount = 30 - seedMaxAge = 5 * 24 * time.Hour + seedMinTableTime = 5 * time.Minute + seedCount = 30 + seedMaxAge = 5 * 24 * time.Hour ) // Table is the 'node table', a Kademlia-like index of neighbor nodes. The table keeps // itself up-to-date by verifying the liveness of neighbors and requesting their node // records when announcements of a new record version are received. type Table struct { - mutex sync.Mutex // protects buckets, bucket content, nursery, rand - buckets [nBuckets]*bucket // index of known nodes by distance - nursery []*node // bootstrap nodes - rand *mrand.Rand // source of randomness, periodically reseeded - ips netutil.DistinctNetSet + mutex sync.Mutex // protects buckets, bucket content, nursery, rand + buckets [nBuckets]*bucket // index of known nodes by distance + nursery []*node // bootstrap nodes + rand reseedingRandom // source of randomness, periodically reseeded + ips netutil.DistinctNetSet + revalidation tableRevalidation db *enode.DB // database of known nodes net transport @@ -77,10 +76,14 @@ type Table struct { log log.Logger // loop channels - refreshReq chan chan struct{} - initDone chan struct{} - closeReq chan struct{} - closed chan struct{} + refreshReq chan chan struct{} + revalResponseCh chan revalidationResponse + addNodeCh chan addNodeOp + addNodeHandled chan bool + trackRequestCh chan trackRequestOp + initDone chan struct{} + closeReq chan struct{} + closed chan struct{} nodeAddedHook func(*bucket, *node) nodeRemovedHook func(*bucket, *node) @@ -104,22 +107,33 @@ type bucket struct { index int } +type addNodeOp struct { + node *node + isInbound bool +} + +type trackRequestOp struct { + node *node + foundNodes []*node + success bool +} + func newTable(t transport, db *enode.DB, cfg Config) (*Table, error) { cfg = cfg.withDefaults() tab := &Table{ - net: t, - db: db, - cfg: cfg, - log: cfg.Log, - refreshReq: make(chan chan struct{}), - initDone: make(chan struct{}), - closeReq: make(chan struct{}), - closed: make(chan struct{}), - rand: mrand.New(mrand.NewSource(0)), - ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit}, - } - if err := tab.setFallbackNodes(cfg.Bootnodes); err != nil { - return nil, err + net: t, + db: db, + cfg: cfg, + log: cfg.Log, + refreshReq: make(chan chan struct{}), + revalResponseCh: make(chan revalidationResponse), + addNodeCh: make(chan addNodeOp), + addNodeHandled: make(chan bool), + trackRequestCh: make(chan trackRequestOp), + initDone: make(chan struct{}), + closeReq: make(chan struct{}), + closed: make(chan struct{}), + ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit}, } for i := range tab.buckets { tab.buckets[i] = &bucket{ @@ -127,41 +141,34 @@ func newTable(t transport, db *enode.DB, cfg Config) (*Table, error) { ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit}, } } - tab.seedRand() - tab.loadSeedNodes() - - return tab, nil -} + tab.rand.seed() + tab.revalidation.init(&cfg) -func newMeteredTable(t transport, db *enode.DB, cfg Config) (*Table, error) { - tab, err := newTable(t, db, cfg) - if err != nil { + // initial table content + if err := tab.setFallbackNodes(cfg.Bootnodes); err != nil { return nil, err } - if metrics.Enabled { - tab.nodeAddedHook = func(b *bucket, n *node) { - bucketsCounter[b.index].Inc(1) - } - tab.nodeRemovedHook = func(b *bucket, n *node) { - bucketsCounter[b.index].Dec(1) - } - } + tab.loadSeedNodes() + return tab, nil } // Nodes returns all nodes contained in the table. -func (tab *Table) Nodes() []*enode.Node { - if !tab.isInitDone() { - return nil - } - +func (tab *Table) Nodes() [][]BucketNode { tab.mutex.Lock() defer tab.mutex.Unlock() - var nodes []*enode.Node - for _, b := range &tab.buckets { - for _, n := range b.entries { - nodes = append(nodes, unwrapNode(n)) + nodes := make([][]BucketNode, len(tab.buckets)) + for i, b := range &tab.buckets { + nodes[i] = make([]BucketNode, len(b.entries)) + for j, n := range b.entries { + nodes[i][j] = BucketNode{ + Node: n.Node, + Checks: int(n.livenessChecks), + Live: n.isValidatedLive, + AddedToTable: n.addedToTable, + AddedToBucket: n.addedToBucket, + } } } return nodes @@ -171,15 +178,6 @@ func (tab *Table) self() *enode.Node { return tab.net.Self() } -func (tab *Table) seedRand() { - var b [8]byte - crand.Read(b[:]) - - tab.mutex.Lock() - tab.rand.Seed(int64(binary.BigEndian.Uint64(b[:]))) - tab.mutex.Unlock() -} - // getNode returns the node with the given ID or nil if it isn't in the table. func (tab *Table) getNode(id enode.ID) *enode.Node { tab.mutex.Lock() @@ -239,52 +237,173 @@ func (tab *Table) refresh() <-chan struct{} { return done } -// loop schedules runs of doRefresh, doRevalidate and copyLiveNodes. +// findnodeByID returns the n nodes in the table that are closest to the given id. +// This is used by the FINDNODE/v4 handler. +// +// The preferLive parameter says whether the caller wants liveness-checked results. If +// preferLive is true and the table contains any verified nodes, the result will not +// contain unverified nodes. However, if there are no verified nodes at all, the result +// will contain unverified nodes. +func (tab *Table) findnodeByID(target enode.ID, nresults int, preferLive bool) *nodesByDistance { + tab.mutex.Lock() + defer tab.mutex.Unlock() + + // Scan all buckets. There might be a better way to do this, but there aren't that many + // buckets, so this solution should be fine. The worst-case complexity of this loop + // is O(tab.len() * nresults). + nodes := &nodesByDistance{target: target} + liveNodes := &nodesByDistance{target: target} + for _, b := range &tab.buckets { + for _, n := range b.entries { + nodes.push(n, nresults) + if preferLive && n.isValidatedLive { + liveNodes.push(n, nresults) + } + } + } + + if preferLive && len(liveNodes.entries) > 0 { + return liveNodes + } + return nodes +} + +// appendLiveNodes adds nodes at the given distance to the result slice. +// This is used by the FINDNODE/v5 handler. +func (tab *Table) appendLiveNodes(dist uint, result []*enode.Node) []*enode.Node { + if dist > 256 { + return result + } + if dist == 0 { + return append(result, tab.self()) + } + + tab.mutex.Lock() + for _, n := range tab.bucketAtDistance(int(dist)).entries { + if n.isValidatedLive { + result = append(result, n.Node) + } + } + tab.mutex.Unlock() + + // Shuffle result to avoid always returning same nodes in FINDNODE/v5. + tab.rand.Shuffle(len(result), func(i, j int) { + result[i], result[j] = result[j], result[i] + }) + return result +} + +// len returns the number of nodes in the table. +func (tab *Table) len() (n int) { + tab.mutex.Lock() + defer tab.mutex.Unlock() + + for _, b := range &tab.buckets { + n += len(b.entries) + } + return n +} + +// addFoundNode adds a node which may not be live. If the bucket has space available, +// adding the node succeeds immediately. Otherwise, the node is added to the replacements +// list. +// +// The caller must not hold tab.mutex. +func (tab *Table) addFoundNode(n *node) bool { + op := addNodeOp{node: n, isInbound: false} + select { + case tab.addNodeCh <- op: + return <-tab.addNodeHandled + case <-tab.closeReq: + return false + } +} + +// addInboundNode adds a node from an inbound contact. If the bucket has no space, the +// node is added to the replacements list. +// +// There is an additional safety measure: if the table is still initializing the node is +// not added. This prevents an attack where the table could be filled by just sending ping +// repeatedly. +// +// The caller must not hold tab.mutex. +func (tab *Table) addInboundNode(n *node) bool { + op := addNodeOp{node: n, isInbound: true} + select { + case tab.addNodeCh <- op: + return <-tab.addNodeHandled + case <-tab.closeReq: + return false + } +} + +func (tab *Table) trackRequest(n *node, success bool, foundNodes []*node) { + op := trackRequestOp{n, foundNodes, success} + select { + case tab.trackRequestCh <- op: + case <-tab.closeReq: + } +} + +// loop is the main loop of Table. func (tab *Table) loop() { var ( - revalidate = time.NewTimer(tab.nextRevalidateTime()) - refresh = time.NewTimer(tab.nextRefreshTime()) - copyNodes = time.NewTicker(copyNodesInterval) - refreshDone = make(chan struct{}) // where doRefresh reports completion - revalidateDone chan struct{} // where doRevalidate reports completion - waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs + refresh = time.NewTimer(tab.nextRefreshTime()) + refreshDone = make(chan struct{}) // where doRefresh reports completion + waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs + revalTimer = mclock.NewAlarm(tab.cfg.Clock) + reseedRandTimer = time.NewTicker(10 * time.Minute) ) defer refresh.Stop() - defer revalidate.Stop() - defer copyNodes.Stop() + defer revalTimer.Stop() + defer reseedRandTimer.Stop() // Start initial refresh. go tab.doRefresh(refreshDone) loop: for { + nextTime := tab.revalidation.run(tab, tab.cfg.Clock.Now()) + revalTimer.Schedule(nextTime) + select { + case <-reseedRandTimer.C: + tab.rand.seed() + + case <-revalTimer.C(): + + case r := <-tab.revalResponseCh: + tab.revalidation.handleResponse(tab, r) + + case op := <-tab.addNodeCh: + tab.mutex.Lock() + ok := tab.handleAddNode(op) + tab.mutex.Unlock() + tab.addNodeHandled <- ok + + case op := <-tab.trackRequestCh: + tab.handleTrackRequest(op) + case <-refresh.C: - tab.seedRand() if refreshDone == nil { refreshDone = make(chan struct{}) go tab.doRefresh(refreshDone) } + case req := <-tab.refreshReq: waiting = append(waiting, req) if refreshDone == nil { refreshDone = make(chan struct{}) go tab.doRefresh(refreshDone) } + case <-refreshDone: for _, ch := range waiting { close(ch) } waiting, refreshDone = nil, nil refresh.Reset(tab.nextRefreshTime()) - case <-revalidate.C: - revalidateDone = make(chan struct{}) - go tab.doRevalidate(revalidateDone) - case <-revalidateDone: - revalidate.Reset(tab.nextRevalidateTime()) - revalidateDone = nil - case <-copyNodes.C: - go tab.copyLiveNodes() + case <-tab.closeReq: break loop } @@ -296,9 +415,6 @@ loop: for _, ch := range waiting { close(ch) } - if revalidateDone != nil { - <-revalidateDone - } close(tab.closed) } @@ -335,169 +451,15 @@ func (tab *Table) loadSeedNodes() { age := time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age) } - tab.addSeenNode(seed) + tab.handleAddNode(addNodeOp{node: seed, isInbound: false}) } } -// doRevalidate checks that the last node in a random bucket is still live and replaces or -// deletes the node if it isn't. -func (tab *Table) doRevalidate(done chan<- struct{}) { - defer func() { done <- struct{}{} }() - - last, bi := tab.nodeToRevalidate() - if last == nil { - // No non-empty bucket found. - return - } - - // Ping the selected node and wait for a pong. - remoteSeq, err := tab.net.ping(unwrapNode(last)) - - // Also fetch record if the node replied and returned a higher sequence number. - if last.Seq() < remoteSeq { - n, err := tab.net.RequestENR(unwrapNode(last)) - if err != nil { - tab.log.Debug("ENR request failed", "id", last.ID(), "addr", last.addr(), "err", err) - } else { - last = &node{Node: *n, addedAt: last.addedAt, livenessChecks: last.livenessChecks} - } - } - - tab.mutex.Lock() - defer tab.mutex.Unlock() - b := tab.buckets[bi] - if err == nil { - // The node responded, move it to the front. - last.livenessChecks++ - tab.log.Debug("Revalidated node", "b", bi, "id", last.ID(), "checks", last.livenessChecks) - tab.bumpInBucket(b, last) - return - } - // No reply received, pick a replacement or delete the node if there aren't - // any replacements. - if r := tab.replace(b, last); r != nil { - tab.log.Debug("Replaced dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks, "r", r.ID(), "rip", r.IP()) - } else { - tab.log.Debug("Removed dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks) - } -} - -// nodeToRevalidate returns the last node in a random, non-empty bucket. -func (tab *Table) nodeToRevalidate() (n *node, bi int) { - tab.mutex.Lock() - defer tab.mutex.Unlock() - - for _, bi = range tab.rand.Perm(len(tab.buckets)) { - b := tab.buckets[bi] - if len(b.entries) > 0 { - last := b.entries[len(b.entries)-1] - return last, bi - } - } - return nil, 0 -} - -func (tab *Table) nextRevalidateTime() time.Duration { - tab.mutex.Lock() - defer tab.mutex.Unlock() - - return time.Duration(tab.rand.Int63n(int64(tab.cfg.PingInterval))) -} - func (tab *Table) nextRefreshTime() time.Duration { - tab.mutex.Lock() - defer tab.mutex.Unlock() - half := tab.cfg.RefreshInterval / 2 return half + time.Duration(tab.rand.Int63n(int64(half))) } -// copyLiveNodes adds nodes from the table to the database if they have been in the table -// longer than seedMinTableTime. -func (tab *Table) copyLiveNodes() { - tab.mutex.Lock() - defer tab.mutex.Unlock() - - now := time.Now() - for _, b := range &tab.buckets { - for _, n := range b.entries { - if n.livenessChecks > 0 && now.Sub(n.addedAt) >= seedMinTableTime { - tab.db.UpdateNode(unwrapNode(n)) - } - } - } -} - -// findnodeByID returns the n nodes in the table that are closest to the given id. -// This is used by the FINDNODE/v4 handler. -// -// The preferLive parameter says whether the caller wants liveness-checked results. If -// preferLive is true and the table contains any verified nodes, the result will not -// contain unverified nodes. However, if there are no verified nodes at all, the result -// will contain unverified nodes. -func (tab *Table) findnodeByID(target enode.ID, nresults int, preferLive bool) *nodesByDistance { - tab.mutex.Lock() - defer tab.mutex.Unlock() - - // Scan all buckets. There might be a better way to do this, but there aren't that many - // buckets, so this solution should be fine. The worst-case complexity of this loop - // is O(tab.len() * nresults). - nodes := &nodesByDistance{target: target} - liveNodes := &nodesByDistance{target: target} - for _, b := range &tab.buckets { - for _, n := range b.entries { - nodes.push(n, nresults) - if preferLive && n.livenessChecks > 0 { - liveNodes.push(n, nresults) - } - } - } - - if preferLive && len(liveNodes.entries) > 0 { - return liveNodes - } - return nodes -} - -// appendLiveNodes adds nodes at the given distance to the result slice. -func (tab *Table) appendLiveNodes(dist uint, result []*enode.Node) []*enode.Node { - if dist > 256 { - return result - } - if dist == 0 { - return append(result, tab.self()) - } - - tab.mutex.Lock() - defer tab.mutex.Unlock() - for _, n := range tab.bucketAtDistance(int(dist)).entries { - if n.livenessChecks >= 1 { - node := n.Node // avoid handing out pointer to struct field - result = append(result, &node) - } - } - return result -} - -// len returns the number of nodes in the table. -func (tab *Table) len() (n int) { - tab.mutex.Lock() - defer tab.mutex.Unlock() - - for _, b := range &tab.buckets { - n += len(b.entries) - } - return n -} - -// bucketLen returns the number of nodes in the bucket for the given ID. -func (tab *Table) bucketLen(id enode.ID) int { - tab.mutex.Lock() - defer tab.mutex.Unlock() - - return len(tab.bucket(id).entries) -} - // bucket returns the bucket for the given node ID hash. func (tab *Table) bucket(id enode.ID) *bucket { d := enode.LogDist(tab.self().ID(), id) @@ -511,95 +473,6 @@ func (tab *Table) bucketAtDistance(d int) *bucket { return tab.buckets[d-bucketMinDistance-1] } -// addSeenNode adds a node which may or may not be live to the end of a bucket. If the -// bucket has space available, adding the node succeeds immediately. Otherwise, the node is -// added to the replacements list. -// -// The caller must not hold tab.mutex. -func (tab *Table) addSeenNode(n *node) { - if n.ID() == tab.self().ID() { - return - } - - tab.mutex.Lock() - defer tab.mutex.Unlock() - b := tab.bucket(n.ID()) - if contains(b.entries, n.ID()) { - // Already in bucket, don't add. - return - } - if len(b.entries) >= bucketSize { - // Bucket full, maybe add as replacement. - tab.addReplacement(b, n) - return - } - if !tab.addIP(b, n.IP()) { - // Can't add: IP limit reached. - return - } - - // Add to end of bucket: - b.entries = append(b.entries, n) - b.replacements = deleteNode(b.replacements, n) - n.addedAt = time.Now() - - if tab.nodeAddedHook != nil { - tab.nodeAddedHook(b, n) - } -} - -// addVerifiedNode adds a node whose existence has been verified recently to the front of a -// bucket. If the node is already in the bucket, it is moved to the front. If the bucket -// has no space, the node is added to the replacements list. -// -// There is an additional safety measure: if the table is still initializing the node -// is not added. This prevents an attack where the table could be filled by just sending -// ping repeatedly. -// -// The caller must not hold tab.mutex. -func (tab *Table) addVerifiedNode(n *node) { - if !tab.isInitDone() { - return - } - if n.ID() == tab.self().ID() { - return - } - - tab.mutex.Lock() - defer tab.mutex.Unlock() - b := tab.bucket(n.ID()) - if tab.bumpInBucket(b, n) { - // Already in bucket, moved to front. - return - } - if len(b.entries) >= bucketSize { - // Bucket full, maybe add as replacement. - tab.addReplacement(b, n) - return - } - if !tab.addIP(b, n.IP()) { - // Can't add: IP limit reached. - return - } - - // Add to front of bucket. - b.entries, _ = pushNode(b.entries, n, bucketSize) - b.replacements = deleteNode(b.replacements, n) - n.addedAt = time.Now() - - if tab.nodeAddedHook != nil { - tab.nodeAddedHook(b, n) - } -} - -// delete removes an entry from the node table. It is used to evacuate dead nodes. -func (tab *Table) delete(node *node) { - tab.mutex.Lock() - defer tab.mutex.Unlock() - - tab.deleteInBucket(tab.bucket(node.ID()), node) -} - func (tab *Table) addIP(b *bucket, ip net.IP) bool { if len(ip) == 0 { return false // Nodes without IP cannot be added. @@ -627,15 +500,51 @@ func (tab *Table) removeIP(b *bucket, ip net.IP) { b.ips.Remove(ip) } +// handleAddNode adds the node in the request to the table, if there is space. +// The caller must hold tab.mutex. +func (tab *Table) handleAddNode(req addNodeOp) bool { + if req.node.ID() == tab.self().ID() { + return false + } + // For nodes from inbound contact, there is an additional safety measure: if the table + // is still initializing the node is not added. + if req.isInbound && !tab.isInitDone() { + return false + } + + b := tab.bucket(req.node.ID()) + if tab.bumpInBucket(b, req.node.Node) { + // Already in bucket, update record. + return false + } + if len(b.entries) >= bucketSize { + // Bucket full, maybe add as replacement. + tab.addReplacement(b, req.node) + return false + } + if !tab.addIP(b, req.node.IP()) { + // Can't add: IP limit reached. + return false + } + + // Add to bucket. + b.entries = append(b.entries, req.node) + b.replacements = deleteNode(b.replacements, req.node) + tab.nodeAdded(b, req.node) + return true +} + +// addReplacement adds n to the replacement cache of bucket b. func (tab *Table) addReplacement(b *bucket, n *node) { - for _, e := range b.replacements { - if e.ID() == n.ID() { - return // already in list - } + if contains(b.replacements, n.ID()) { + // TODO: update ENR + return } if !tab.addIP(b, n.IP()) { return } + + n.addedToTable = time.Now() var removed *node b.replacements, removed = pushNode(b.replacements, n, maxReplacements) if removed != nil { @@ -643,59 +552,107 @@ func (tab *Table) addReplacement(b *bucket, n *node) { } } -// replace removes n from the replacement list and replaces 'last' with it if it is the -// last entry in the bucket. If 'last' isn't the last entry, it has either been replaced -// with someone else or became active. -func (tab *Table) replace(b *bucket, last *node) *node { - if len(b.entries) == 0 || b.entries[len(b.entries)-1].ID() != last.ID() { - // Entry has moved, don't replace it. +func (tab *Table) nodeAdded(b *bucket, n *node) { + if n.addedToTable == (time.Time{}) { + n.addedToTable = time.Now() + } + n.addedToBucket = time.Now() + tab.revalidation.nodeAdded(tab, n) + if tab.nodeAddedHook != nil { + tab.nodeAddedHook(b, n) + } + if metrics.Enabled { + bucketsCounter[b.index].Inc(1) + } +} + +func (tab *Table) nodeRemoved(b *bucket, n *node) { + tab.revalidation.nodeRemoved(n) + if tab.nodeRemovedHook != nil { + tab.nodeRemovedHook(b, n) + } + if metrics.Enabled { + bucketsCounter[b.index].Dec(1) + } +} + +// deleteInBucket removes node n from the table. +// If there are replacement nodes in the bucket, the node is replaced. +func (tab *Table) deleteInBucket(b *bucket, id enode.ID) *node { + index := slices.IndexFunc(b.entries, func(e *node) bool { return e.ID() == id }) + if index == -1 { + // Entry has been removed already. return nil } - // Still the last entry. + + // Remove the node. + n := b.entries[index] + b.entries = slices.Delete(b.entries, index, index+1) + tab.removeIP(b, n.IP()) + tab.nodeRemoved(b, n) + + // Add replacement. if len(b.replacements) == 0 { - tab.deleteInBucket(b, last) + tab.log.Debug("Removed dead node", "b", b.index, "id", n.ID(), "ip", n.IP()) return nil } - r := b.replacements[tab.rand.Intn(len(b.replacements))] - b.replacements = deleteNode(b.replacements, r) - b.entries[len(b.entries)-1] = r - tab.removeIP(b, last.IP()) - return r -} - -// bumpInBucket moves the given node to the front of the bucket entry list -// if it is contained in that list. -func (tab *Table) bumpInBucket(b *bucket, n *node) bool { - for i := range b.entries { - if b.entries[i].ID() == n.ID() { - if !n.IP().Equal(b.entries[i].IP()) { - // Endpoint has changed, ensure that the new IP fits into table limits. - tab.removeIP(b, b.entries[i].IP()) - if !tab.addIP(b, n.IP()) { - // It doesn't, put the previous one back. - tab.addIP(b, b.entries[i].IP()) - return false - } - } - // Move it to the front. - copy(b.entries[1:], b.entries[:i]) - b.entries[0] = n - return true + rindex := tab.rand.Intn(len(b.replacements)) + rep := b.replacements[rindex] + b.replacements = slices.Delete(b.replacements, rindex, rindex+1) + b.entries = append(b.entries, rep) + tab.nodeAdded(b, rep) + tab.log.Debug("Replaced dead node", "b", b.index, "id", n.ID(), "ip", n.IP(), "r", rep.ID(), "rip", rep.IP()) + return rep +} + +// bumpInBucket updates the node record of n in the bucket. +func (tab *Table) bumpInBucket(b *bucket, newRecord *enode.Node) bool { + i := slices.IndexFunc(b.entries, func(elem *node) bool { + return elem.ID() == newRecord.ID() + }) + if i == -1 { + return false + } + + if !newRecord.IP().Equal(b.entries[i].IP()) { + // Endpoint has changed, ensure that the new IP fits into table limits. + tab.removeIP(b, b.entries[i].IP()) + if !tab.addIP(b, newRecord.IP()) { + // It doesn't, put the previous one back. + tab.addIP(b, b.entries[i].IP()) + return false } } - return false + b.entries[i].Node = newRecord + return true } -func (tab *Table) deleteInBucket(b *bucket, n *node) { - // Check if the node is actually in the bucket so the removed hook - // isn't called multiple times for the same node. - if !contains(b.entries, n.ID()) { - return +func (tab *Table) handleTrackRequest(op trackRequestOp) { + var fails int + if op.success { + // Reset failure counter because it counts _consecutive_ failures. + tab.db.UpdateFindFails(op.node.ID(), op.node.IP(), 0) + } else { + fails = tab.db.FindFails(op.node.ID(), op.node.IP()) + fails++ + tab.db.UpdateFindFails(op.node.ID(), op.node.IP(), fails) } - b.entries = deleteNode(b.entries, n) - tab.removeIP(b, n.IP()) - if tab.nodeRemovedHook != nil { - tab.nodeRemovedHook(b, n) + + tab.mutex.Lock() + defer tab.mutex.Unlock() + + b := tab.bucket(op.node.ID()) + // Remove the node from the local table if it fails to return anything useful too + // many times, but only if there are enough other nodes in the bucket. This latter + // condition specifically exists to make bootstrapping in smaller test networks more + // reliable. + if fails >= maxFindnodeFailures && len(b.entries) >= bucketSize/4 { + tab.deleteInBucket(b, op.node.ID()) + } + + // Add found nodes. + for _, n := range op.foundNodes { + tab.handleAddNode(addNodeOp{n, false}) } } diff --git a/p2p/discover/table_reval.go b/p2p/discover/table_reval.go new file mode 100644 index 0000000000..9a13900ebc --- /dev/null +++ b/p2p/discover/table_reval.go @@ -0,0 +1,223 @@ +// Copyright 2024 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package discover + +import ( + "fmt" + "math" + "slices" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/p2p/enode" +) + +const never = mclock.AbsTime(math.MaxInt64) + +// tableRevalidation implements the node revalidation process. +// It tracks all nodes contained in Table, and schedules sending PING to them. +type tableRevalidation struct { + fast revalidationList + slow revalidationList + activeReq map[enode.ID]struct{} +} + +type revalidationResponse struct { + n *node + newRecord *enode.Node + list *revalidationList + didRespond bool +} + +func (tr *tableRevalidation) init(cfg *Config) { + tr.activeReq = make(map[enode.ID]struct{}) + tr.fast.nextTime = never + tr.fast.interval = cfg.PingInterval + tr.fast.name = "fast" + tr.slow.nextTime = never + tr.slow.interval = cfg.PingInterval * 3 + tr.slow.name = "slow" +} + +// nodeAdded is called when the table receives a new node. +func (tr *tableRevalidation) nodeAdded(tab *Table, n *node) { + tr.fast.push(n, tab.cfg.Clock.Now(), &tab.rand) +} + +// nodeRemoved is called when a node was removed from the table. +func (tr *tableRevalidation) nodeRemoved(n *node) { + if !tr.fast.remove(n) { + tr.slow.remove(n) + } +} + +// run performs node revalidation. +// It returns the next time it should be invoked, which is used in the Table main loop +// to schedule a timer. However, run can be called at any time. +func (tr *tableRevalidation) run(tab *Table, now mclock.AbsTime) (nextTime mclock.AbsTime) { + if n := tr.fast.get(now, &tab.rand, tr.activeReq); n != nil { + tr.startRequest(tab, &tr.fast, n) + tr.fast.schedule(now, &tab.rand) + } + if n := tr.slow.get(now, &tab.rand, tr.activeReq); n != nil { + tr.startRequest(tab, &tr.slow, n) + tr.slow.schedule(now, &tab.rand) + } + + return min(tr.fast.nextTime, tr.slow.nextTime) +} + +// startRequest spawns a revalidation request for node n. +func (tr *tableRevalidation) startRequest(tab *Table, list *revalidationList, n *node) { + if _, ok := tr.activeReq[n.ID()]; ok { + panic(fmt.Errorf("duplicate startRequest (list %q, node %v)", list.name, n.ID())) + } + tr.activeReq[n.ID()] = struct{}{} + resp := revalidationResponse{n: n, list: list} + + // Fetch the node while holding lock. + tab.mutex.Lock() + node := n.Node + tab.mutex.Unlock() + + go tab.doRevalidate(resp, node) +} + +func (tab *Table) doRevalidate(resp revalidationResponse, node *enode.Node) { + // Ping the selected node and wait for a pong response. + remoteSeq, err := tab.net.ping(node) + resp.didRespond = err == nil + + // Also fetch record if the node replied and returned a higher sequence number. + if remoteSeq > node.Seq() { + newrec, err := tab.net.RequestENR(node) + if err != nil { + tab.log.Debug("ENR request failed", "id", node.ID(), "err", err) + } else { + resp.newRecord = newrec + } + } + + select { + case tab.revalResponseCh <- resp: + case <-tab.closed: + } +} + +// handleResponse processes the result of a revalidation request. +func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationResponse) { + now := tab.cfg.Clock.Now() + n := resp.n + b := tab.bucket(n.ID()) + delete(tr.activeReq, n.ID()) + + tab.mutex.Lock() + defer tab.mutex.Unlock() + + if !resp.didRespond { + // Revalidation failed. + n.livenessChecks /= 3 + if n.livenessChecks <= 0 { + tab.deleteInBucket(b, n.ID()) + } else { + tr.moveToList(&tr.fast, resp.list, n, now, &tab.rand) + } + return + } + + // The node responded. + n.livenessChecks++ + n.isValidatedLive = true + var endpointChanged bool + if resp.newRecord != nil { + endpointChanged = tab.bumpInBucket(b, resp.newRecord) + if endpointChanged { + // If the node changed its advertised endpoint, the updated ENR is not served + // until it has been revalidated. + n.isValidatedLive = false + } + } + tab.log.Debug("Revalidated node", "b", b.index, "id", n.ID(), "checks", n.livenessChecks, "q", resp.list.name) + + // Move node over to slow queue after first validation. + if !endpointChanged { + tr.moveToList(&tr.slow, resp.list, n, now, &tab.rand) + } else { + tr.moveToList(&tr.fast, resp.list, n, now, &tab.rand) + } + + // Store potential seeds in database. + if n.isValidatedLive && n.livenessChecks > 5 { + tab.db.UpdateNode(resp.n.Node) + } +} + +func (tr *tableRevalidation) moveToList(dest, source *revalidationList, n *node, now mclock.AbsTime, rand randomSource) { + if source == dest { + return + } + if !source.remove(n) { + panic(fmt.Errorf("moveToList(%q -> %q): node %v not in source list", source.name, dest.name, n.ID())) + } + dest.push(n, now, rand) +} + +// revalidationList holds a list nodes and the next revalidation time. +type revalidationList struct { + nodes []*node + nextTime mclock.AbsTime + interval time.Duration + name string +} + +// get returns a random node from the queue. Nodes in the 'exclude' map are not returned. +func (list *revalidationList) get(now mclock.AbsTime, rand randomSource, exclude map[enode.ID]struct{}) *node { + if now < list.nextTime || len(list.nodes) == 0 { + return nil + } + for i := 0; i < len(list.nodes)*3; i++ { + n := list.nodes[rand.Intn(len(list.nodes))] + _, excluded := exclude[n.ID()] + if !excluded { + return n + } + } + return nil +} + +func (list *revalidationList) schedule(now mclock.AbsTime, rand randomSource) { + list.nextTime = now.Add(time.Duration(rand.Int63n(int64(list.interval)))) +} + +func (list *revalidationList) push(n *node, now mclock.AbsTime, rand randomSource) { + list.nodes = append(list.nodes, n) + if list.nextTime == never { + list.schedule(now, rand) + } +} + +func (list *revalidationList) remove(n *node) bool { + i := slices.Index(list.nodes, n) + if i == -1 { + return false + } + list.nodes = slices.Delete(list.nodes, i, i+1) + if len(list.nodes) == 0 { + list.nextTime = never + } + return true +} diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index 3ba3422251..f72ecd94c9 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -20,14 +20,16 @@ import ( "crypto/ecdsa" "fmt" "math/rand" - "net" "reflect" "testing" "testing/quick" "time" + "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/internal/testlog" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/netutil" @@ -49,106 +51,109 @@ func TestTable_pingReplace(t *testing.T) { } func testPingReplace(t *testing.T, newNodeIsResponding, lastInBucketIsResponding bool) { + simclock := new(mclock.Simulated) transport := newPingRecorder() - tab, db := newTestTable(transport) + tab, db := newTestTable(transport, Config{ + Clock: simclock, + Log: testlog.Logger(t, log.LevelTrace), + }) defer db.Close() defer tab.close() <-tab.initDone // Fill up the sender's bucket. - pingKey, _ := crypto.HexToECDSA("45a915e4d060149eb4365960e6a7a45f334393093061116b197e3240065ff2d8") - pingSender := wrapNode(enode.NewV4(&pingKey.PublicKey, net.IP{127, 0, 0, 1}, 99, 99)) - last := fillBucket(tab, pingSender) + replacementNodeKey, _ := crypto.HexToECDSA("45a915e4d060149eb4365960e6a7a45f334393093061116b197e3240065ff2d8") + replacementNode := wrapNode(enode.NewV4(&replacementNodeKey.PublicKey, net.IP{127, 0, 0, 1}, 99, 99)) + last := fillBucket(tab, replacementNode.ID()) + tab.mutex.Lock() + nodeEvents := newNodeEventRecorder(128) + tab.nodeAddedHook = nodeEvents.nodeAdded + tab.nodeRemovedHook = nodeEvents.nodeRemoved + tab.mutex.Unlock() - // Add the sender as if it just pinged us. Revalidate should replace the last node in - // its bucket if it is unresponsive. Revalidate again to ensure that + // The revalidation process should replace + // this node in the bucket if it is unresponsive. transport.dead[last.ID()] = !lastInBucketIsResponding - transport.dead[pingSender.ID()] = !newNodeIsResponding - tab.addSeenNode(pingSender) - tab.doRevalidate(make(chan struct{}, 1)) - tab.doRevalidate(make(chan struct{}, 1)) - - if !transport.pinged[last.ID()] { - // Oldest node in bucket is pinged to see whether it is still alive. - t.Error("table did not ping last node in bucket") + transport.dead[replacementNode.ID()] = !newNodeIsResponding + + // Add replacement node to table. + tab.addFoundNode(replacementNode) + + t.Log("last:", last.ID()) + t.Log("replacement:", replacementNode.ID()) + + // Wait until the last node was pinged. + waitForRevalidationPing(t, transport, tab, last.ID()) + + if !lastInBucketIsResponding { + if !nodeEvents.waitNodeAbsent(last.ID(), 2*time.Second) { + t.Error("last node was not removed") + } + if !nodeEvents.waitNodePresent(replacementNode.ID(), 2*time.Second) { + t.Error("replacement node was not added") + } + + // If a replacement is expected, we also need to wait until the replacement node + // was pinged and added/removed. + waitForRevalidationPing(t, transport, tab, replacementNode.ID()) + if !newNodeIsResponding { + if !nodeEvents.waitNodeAbsent(replacementNode.ID(), 2*time.Second) { + t.Error("replacement node was not removed") + } + } } + // Check bucket content. tab.mutex.Lock() defer tab.mutex.Unlock() wantSize := bucketSize if !lastInBucketIsResponding && !newNodeIsResponding { wantSize-- } - if l := len(tab.bucket(pingSender.ID()).entries); l != wantSize { - t.Errorf("wrong bucket size after bond: got %d, want %d", l, wantSize) + bucket := tab.bucket(replacementNode.ID()) + if l := len(bucket.entries); l != wantSize { + t.Errorf("wrong bucket size after revalidation: got %d, want %d", l, wantSize) } - if found := contains(tab.bucket(pingSender.ID()).entries, last.ID()); found != lastInBucketIsResponding { - t.Errorf("last entry found: %t, want: %t", found, lastInBucketIsResponding) + if ok := contains(bucket.entries, last.ID()); ok != lastInBucketIsResponding { + t.Errorf("revalidated node found: %t, want: %t", ok, lastInBucketIsResponding) } wantNewEntry := newNodeIsResponding && !lastInBucketIsResponding - if found := contains(tab.bucket(pingSender.ID()).entries, pingSender.ID()); found != wantNewEntry { - t.Errorf("new entry found: %t, want: %t", found, wantNewEntry) + if ok := contains(bucket.entries, replacementNode.ID()); ok != wantNewEntry { + t.Errorf("replacement node found: %t, want: %t", ok, wantNewEntry) } } -func TestBucket_bumpNoDuplicates(t *testing.T) { - t.Parallel() - cfg := &quick.Config{ - MaxCount: 1000, - Rand: rand.New(rand.NewSource(time.Now().Unix())), - Values: func(args []reflect.Value, rand *rand.Rand) { - // generate a random list of nodes. this will be the content of the bucket. - n := rand.Intn(bucketSize-1) + 1 - nodes := make([]*node, n) - for i := range nodes { - nodes[i] = nodeAtDistance(enode.ID{}, 200, intIP(200)) - } - args[0] = reflect.ValueOf(nodes) - // generate random bump positions. - bumps := make([]int, rand.Intn(100)) - for i := range bumps { - bumps[i] = rand.Intn(len(nodes)) - } - args[1] = reflect.ValueOf(bumps) - }, - } - - prop := func(nodes []*node, bumps []int) (ok bool) { - tab, db := newTestTable(newPingRecorder()) - defer db.Close() - defer tab.close() +// waitForRevalidationPing waits until a PING message is sent to a node with the given id. +func waitForRevalidationPing(t *testing.T, transport *pingRecorder, tab *Table, id enode.ID) *enode.Node { + t.Helper() - b := &bucket{entries: make([]*node, len(nodes))} - copy(b.entries, nodes) - for i, pos := range bumps { - tab.bumpInBucket(b, b.entries[pos]) - if hasDuplicates(b.entries) { - t.Logf("bucket has duplicates after %d/%d bumps:", i+1, len(bumps)) - for _, n := range b.entries { - t.Logf(" %p", n) - } - return false - } + simclock := tab.cfg.Clock.(*mclock.Simulated) + maxAttempts := tab.len() * 8 + for i := 0; i < maxAttempts; i++ { + simclock.Run(tab.cfg.PingInterval) + p := transport.waitPing(2 * time.Second) + if p == nil { + t.Fatal("Table did not send revalidation ping") + } + if id == (enode.ID{}) || p.ID() == id { + return p } - checkIPLimitInvariant(t, tab) - return true - } - if err := quick.Check(prop, cfg); err != nil { - t.Error(err) } + t.Fatalf("Table did not ping node %v (%d attempts)", id, maxAttempts) + return nil } // This checks that the table-wide IP limit is applied correctly. func TestTable_IPLimit(t *testing.T) { transport := newPingRecorder() - tab, db := newTestTable(transport) + tab, db := newTestTable(transport, Config{}) defer db.Close() defer tab.close() for i := 0; i < tableIPLimit+1; i++ { n := nodeAtDistance(tab.self().ID(), i, net.IP{172, 0, 1, byte(i)}) - tab.addSeenNode(n) + tab.addFoundNode(n) } if tab.len() > tableIPLimit { t.Errorf("too many nodes in table") @@ -159,14 +164,14 @@ func TestTable_IPLimit(t *testing.T) { // This checks that the per-bucket IP limit is applied correctly. func TestTable_BucketIPLimit(t *testing.T) { transport := newPingRecorder() - tab, db := newTestTable(transport) + tab, db := newTestTable(transport, Config{}) defer db.Close() defer tab.close() d := 3 for i := 0; i < bucketIPLimit+1; i++ { n := nodeAtDistance(tab.self().ID(), d, net.IP{172, 0, 1, byte(i)}) - tab.addSeenNode(n) + tab.addFoundNode(n) } if tab.len() > bucketIPLimit { t.Errorf("too many nodes in table") @@ -196,7 +201,7 @@ func TestTable_findnodeByID(t *testing.T) { test := func(test *closeTest) bool { // for any node table, Target and N transport := newPingRecorder() - tab, db := newTestTable(transport) + tab, db := newTestTable(transport, Config{}) defer db.Close() defer tab.close() fillTable(tab, test.All, true) @@ -271,7 +276,7 @@ func (*closeTest) Generate(rand *rand.Rand, size int) reflect.Value { } func TestTable_addVerifiedNode(t *testing.T) { - tab, db := newTestTable(newPingRecorder()) + tab, db := newTestTable(newPingRecorder(), Config{}) <-tab.initDone defer db.Close() defer tab.close() @@ -279,31 +284,32 @@ func TestTable_addVerifiedNode(t *testing.T) { // Insert two nodes. n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1}) n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2}) - tab.addSeenNode(n1) - tab.addSeenNode(n2) + tab.addFoundNode(n1) + tab.addFoundNode(n2) + bucket := tab.bucket(n1.ID()) // Verify bucket content: bcontent := []*node{n1, n2} - if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) { - t.Fatalf("wrong bucket content: %v", tab.bucket(n1.ID()).entries) + if !reflect.DeepEqual(unwrapNodes(bucket.entries), unwrapNodes(bcontent)) { + t.Fatalf("wrong bucket content: %v", bucket.entries) } // Add a changed version of n2. newrec := n2.Record() newrec.Set(enr.IP{99, 99, 99, 99}) newn2 := wrapNode(enode.SignNull(newrec, n2.ID())) - tab.addVerifiedNode(newn2) + tab.addInboundNode(newn2) // Check that bucket is updated correctly. - newBcontent := []*node{newn2, n1} - if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, newBcontent) { - t.Fatalf("wrong bucket content after update: %v", tab.bucket(n1.ID()).entries) + newBcontent := []*node{n1, newn2} + if !reflect.DeepEqual(unwrapNodes(bucket.entries), unwrapNodes(newBcontent)) { + t.Fatalf("wrong bucket content after update: %v", bucket.entries) } checkIPLimitInvariant(t, tab) } func TestTable_addSeenNode(t *testing.T) { - tab, db := newTestTable(newPingRecorder()) + tab, db := newTestTable(newPingRecorder(), Config{}) <-tab.initDone defer db.Close() defer tab.close() @@ -311,8 +317,8 @@ func TestTable_addSeenNode(t *testing.T) { // Insert two nodes. n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1}) n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2}) - tab.addSeenNode(n1) - tab.addSeenNode(n2) + tab.addFoundNode(n1) + tab.addFoundNode(n2) // Verify bucket content: bcontent := []*node{n1, n2} @@ -324,7 +330,7 @@ func TestTable_addSeenNode(t *testing.T) { newrec := n2.Record() newrec.Set(enr.IP{99, 99, 99, 99}) newn2 := wrapNode(enode.SignNull(newrec, n2.ID())) - tab.addSeenNode(newn2) + tab.addFoundNode(newn2) // Check that bucket content is unchanged. if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) { @@ -337,7 +343,10 @@ func TestTable_addSeenNode(t *testing.T) { // announces a new sequence number, the new record should be pulled. func TestTable_revalidateSyncRecord(t *testing.T) { transport := newPingRecorder() - tab, db := newTestTable(transport) + tab, db := newTestTable(transport, Config{ + Clock: new(mclock.Simulated), + Log: testlog.Logger(t, log.LevelTrace), + }) <-tab.initDone defer db.Close() defer tab.close() @@ -347,14 +356,18 @@ func TestTable_revalidateSyncRecord(t *testing.T) { r.Set(enr.IP(net.IP{127, 0, 0, 1})) id := enode.ID{1} n1 := wrapNode(enode.SignNull(&r, id)) - tab.addSeenNode(n1) + tab.addFoundNode(n1) // Update the node record. r.Set(enr.WithEntry("foo", "bar")) n2 := enode.SignNull(&r, id) transport.updateRecord(n2) - tab.doRevalidate(make(chan struct{}, 1)) + // Wait for revalidation. We wait for the node to be revalidated two times + // in order to synchronize with the update in the able. + waitForRevalidationPing(t, transport, tab, n2.ID()) + waitForRevalidationPing(t, transport, tab, n2.ID()) + intable := tab.getNode(id) if !reflect.DeepEqual(intable, n2) { t.Fatalf("table contains old record with seq %d, want seq %d", intable.Seq(), n2.Seq()) diff --git a/p2p/discover/table_util_test.go b/p2p/discover/table_util_test.go index f5d4d39bdb..86c6048094 100644 --- a/p2p/discover/table_util_test.go +++ b/p2p/discover/table_util_test.go @@ -26,6 +26,8 @@ import ( "net" "slices" "sync" + "sync/atomic" + "time" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/enode" @@ -40,8 +42,7 @@ func init() { nullNode = enode.SignNull(&r, enode.ID{}) } -func newTestTable(t transport) (*Table, *enode.DB) { - cfg := Config{} +func newTestTable(t transport, cfg Config) (*Table, *enode.DB) { db, _ := enode.OpenDB("") tab, _ := newTable(t, db, cfg) go tab.loop() @@ -98,11 +99,14 @@ func intIP(i int) net.IP { } // fillBucket inserts nodes into the given bucket until it is full. -func fillBucket(tab *Table, n *node) (last *node) { - ld := enode.LogDist(tab.self().ID(), n.ID()) - b := tab.bucket(n.ID()) +func fillBucket(tab *Table, id enode.ID) (last *node) { + ld := enode.LogDist(tab.self().ID(), id) + b := tab.bucket(id) for len(b.entries) < bucketSize { - b.entries = append(b.entries, nodeAtDistance(tab.self().ID(), ld, intIP(ld))) + node := nodeAtDistance(tab.self().ID(), ld, intIP(ld)) + if !tab.addFoundNode(node) { + panic("node not added") + } } return b.entries[bucketSize-1] } @@ -113,16 +117,19 @@ func fillTable(tab *Table, nodes []*node, setLive bool) { for _, n := range nodes { if setLive { n.livenessChecks = 1 + n.isValidatedLive = true } - tab.addSeenNode(n) + tab.addFoundNode(n) } } type pingRecorder struct { - mu sync.Mutex - dead, pinged map[enode.ID]bool - records map[enode.ID]*enode.Node - n *enode.Node + mu sync.Mutex + cond *sync.Cond + dead map[enode.ID]bool + records map[enode.ID]*enode.Node + pinged []*enode.Node + n *enode.Node } func newPingRecorder() *pingRecorder { @@ -130,12 +137,13 @@ func newPingRecorder() *pingRecorder { r.Set(enr.IP{0, 0, 0, 0}) n := enode.SignNull(&r, enode.ID{}) - return &pingRecorder{ + t := &pingRecorder{ dead: make(map[enode.ID]bool), - pinged: make(map[enode.ID]bool), records: make(map[enode.ID]*enode.Node), n: n, } + t.cond = sync.NewCond(&t.mu) + return t } // updateRecord updates a node record. Future calls to ping and @@ -151,12 +159,40 @@ func (t *pingRecorder) Self() *enode.Node { return nullNode } func (t *pingRecorder) lookupSelf() []*enode.Node { return nil } func (t *pingRecorder) lookupRandom() []*enode.Node { return nil } +func (t *pingRecorder) waitPing(timeout time.Duration) *enode.Node { + t.mu.Lock() + defer t.mu.Unlock() + + // Wake up the loop on timeout. + var timedout atomic.Bool + timer := time.AfterFunc(timeout, func() { + timedout.Store(true) + t.cond.Broadcast() + }) + defer timer.Stop() + + // Wait for a ping. + for { + if timedout.Load() { + return nil + } + if len(t.pinged) > 0 { + n := t.pinged[0] + t.pinged = append(t.pinged[:0], t.pinged[1:]...) + return n + } + t.cond.Wait() + } +} + // ping simulates a ping request. func (t *pingRecorder) ping(n *enode.Node) (seq uint64, err error) { t.mu.Lock() defer t.mu.Unlock() - t.pinged[n.ID()] = true + t.pinged = append(t.pinged, n) + t.cond.Broadcast() + if t.dead[n.ID()] { return 0, errTimeout } @@ -256,3 +292,57 @@ func hexEncPubkey(h string) (ret encPubkey) { copy(ret[:], b) return ret } + +type nodeEventRecorder struct { + evc chan recordedNodeEvent +} + +type recordedNodeEvent struct { + node *node + added bool +} + +func newNodeEventRecorder(buffer int) *nodeEventRecorder { + return &nodeEventRecorder{ + evc: make(chan recordedNodeEvent, buffer), + } +} + +func (set *nodeEventRecorder) nodeAdded(b *bucket, n *node) { + select { + case set.evc <- recordedNodeEvent{n, true}: + default: + panic("no space in event buffer") + } +} + +func (set *nodeEventRecorder) nodeRemoved(b *bucket, n *node) { + select { + case set.evc <- recordedNodeEvent{n, false}: + default: + panic("no space in event buffer") + } +} + +func (set *nodeEventRecorder) waitNodePresent(id enode.ID, timeout time.Duration) bool { + return set.waitNodeEvent(id, timeout, true) +} + +func (set *nodeEventRecorder) waitNodeAbsent(id enode.ID, timeout time.Duration) bool { + return set.waitNodeEvent(id, timeout, false) +} + +func (set *nodeEventRecorder) waitNodeEvent(id enode.ID, timeout time.Duration, added bool) bool { + timer := time.NewTimer(timeout) + defer timer.Stop() + for { + select { + case ev := <-set.evc: + if ev.node.ID() == id && ev.added == added { + return true + } + case <-timer.C: + return false + } + } +} diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index 7a0a0f1c77..be6058ec50 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -142,7 +142,7 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { log: cfg.Log, } - tab, err := newMeteredTable(t, ln.Database(), cfg) + tab, err := newTable(t, ln.Database(), cfg) if err != nil { return nil, err } @@ -375,6 +375,10 @@ func (t *UDPv4) RequestENR(n *enode.Node) (*enode.Node, error) { return respN, nil } +func (t *UDPv4) TableBuckets() [][]BucketNode { + return t.tab.Nodes() +} + // pending adds a reply matcher to the pending reply queue. // see the documentation of type replyMatcher for a detailed explanation. func (t *UDPv4) pending(id enode.ID, ip net.IP, ptype byte, callback replyMatchFunc) *replyMatcher { @@ -669,10 +673,10 @@ func (t *UDPv4) handlePing(h *packetHandlerV4, from *net.UDPAddr, fromID enode.I n := wrapNode(enode.NewV4(h.senderKey, from.IP, int(req.From.TCP), from.Port)) if time.Since(t.db.LastPongReceived(n.ID(), from.IP)) > bondExpiration { t.sendPing(fromID, from, func() { - t.tab.addVerifiedNode(n) + t.tab.addInboundNode(n) }) } else { - t.tab.addVerifiedNode(n) + t.tab.addInboundNode(n) } // Update node database and endpoint predictor. diff --git a/p2p/discover/v4_udp_test.go b/p2p/discover/v4_udp_test.go index 9b80214f75..9c454d98e3 100644 --- a/p2p/discover/v4_udp_test.go +++ b/p2p/discover/v4_udp_test.go @@ -264,7 +264,7 @@ func TestUDPv4_findnode(t *testing.T) { n := wrapNode(enode.NewV4(&key.PublicKey, ip, 0, 2000)) // Ensure half of table content isn't verified live yet. if i > numCandidates/2 { - n.livenessChecks = 1 + n.isValidatedLive = true live[n.ID()] = true } nodes.push(n, numCandidates) diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 20a8bccd05..8cdc9dfbce 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -175,7 +175,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { cancelCloseCtx: cancelCloseCtx, } t.talk = newTalkSystem(t) - tab, err := newMeteredTable(t, t.db, cfg) + tab, err := newTable(t, t.db, cfg) if err != nil { return nil, err } @@ -699,7 +699,7 @@ func (t *UDPv5) handlePacket(rawpacket []byte, fromAddr *net.UDPAddr) error { } if fromNode != nil { // Handshake succeeded, add to table. - t.tab.addSeenNode(wrapNode(fromNode)) + t.tab.addInboundNode(wrapNode(fromNode)) } if packet.Kind() != v5wire.WhoareyouPacket { // WHOAREYOU logged separately to report errors. diff --git a/p2p/discover/v5_udp_test.go b/p2p/discover/v5_udp_test.go index 4373ea8184..0015f7cc70 100644 --- a/p2p/discover/v5_udp_test.go +++ b/p2p/discover/v5_udp_test.go @@ -141,7 +141,7 @@ func TestUDPv5_unknownPacket(t *testing.T) { // Make node known. n := test.getNode(test.remotekey, test.remoteaddr).Node() - test.table.addSeenNode(wrapNode(n)) + test.table.addFoundNode(wrapNode(n)) test.packetIn(&v5wire.Unknown{Nonce: nonce}) test.waitPacketOut(func(p *v5wire.Whoareyou, addr *net.UDPAddr, _ v5wire.Nonce) { diff --git a/p2p/server.go b/p2p/server.go index 5b9a4aa71f..79c5fc76b6 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -190,8 +190,8 @@ type Server struct { nodedb *enode.DB localnode *enode.LocalNode - ntab *discover.UDPv4 - DiscV5 *discover.UDPv5 + discv4 *discover.UDPv4 + discv5 *discover.UDPv5 discmix *enode.FairMix dialsched *dialScheduler @@ -400,6 +400,16 @@ func (srv *Server) Self() *enode.Node { return ln.Node() } +// DiscoveryV4 returns the discovery v4 instance, if configured. +func (srv *Server) DiscoveryV4() *discover.UDPv4 { + return srv.discv4 +} + +// DiscoveryV4 returns the discovery v5 instance, if configured. +func (srv *Server) DiscoveryV5() *discover.UDPv5 { + return srv.discv5 +} + // Stop terminates the server and all active peer connections. // It blocks until all active connections have been closed. func (srv *Server) Stop() { @@ -547,13 +557,13 @@ func (srv *Server) setupDiscovery() error { ) // If both versions of discovery are running, setup a shared // connection, so v5 can read unhandled messages from v4. - if srv.DiscoveryV4 && srv.DiscoveryV5 { + if srv.Config.DiscoveryV4 && srv.Config.DiscoveryV5 { unhandled = make(chan discover.ReadPacket, 100) sconn = &sharedUDPConn{conn, unhandled} } // Start discovery services. - if srv.DiscoveryV4 { + if srv.Config.DiscoveryV4 { cfg := discover.Config{ PrivateKey: srv.PrivateKey, NetRestrict: srv.NetRestrict, @@ -565,17 +575,17 @@ func (srv *Server) setupDiscovery() error { if err != nil { return err } - srv.ntab = ntab + srv.discv4 = ntab srv.discmix.AddSource(ntab.RandomNodes()) } - if srv.DiscoveryV5 { + if srv.Config.DiscoveryV5 { cfg := discover.Config{ PrivateKey: srv.PrivateKey, NetRestrict: srv.NetRestrict, Bootnodes: srv.BootstrapNodesV5, Log: srv.log, } - srv.DiscV5, err = discover.ListenV5(sconn, srv.localnode, cfg) + srv.discv5, err = discover.ListenV5(sconn, srv.localnode, cfg) if err != nil { return err } @@ -602,8 +612,8 @@ func (srv *Server) setupDialScheduler() { dialer: srv.Dialer, clock: srv.clock, } - if srv.ntab != nil { - config.resolver = srv.ntab + if srv.discv4 != nil { + config.resolver = srv.discv4 } if config.dialer == nil { config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}} @@ -799,11 +809,11 @@ running: srv.log.Trace("P2P networking is spinning down") // Terminate discovery. If there is a running lookup it will terminate soon. - if srv.ntab != nil { - srv.ntab.Close() + if srv.discv4 != nil { + srv.discv4.Close() } - if srv.DiscV5 != nil { - srv.DiscV5.Close() + if srv.discv5 != nil { + srv.discv5.Close() } // Disconnect all peers. for _, p := range peers {