From ac86547b0154129f4dac865579946bd2e9378fb0 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 31 May 2023 13:37:10 +0200 Subject: [PATCH] p2p/discover: add Table configuration and Nodes method (#27387) * p2p/discover: remove ReadRandomNodes Even though it's public, this method is not callable by code outside of package p2p/discover because one can't get a valid instance of Table. * p2p/discover: add Table.Nodes * p2p/discover: make Table settings configurable In unit tests and externally developed cmd/devp2p test runs, it can be useful to tune the timer intervals used by Table. --- p2p/discover/common.go | 29 +++++++---- p2p/discover/table.go | 85 +++++++++++++++++++-------------- p2p/discover/table_test.go | 35 -------------- p2p/discover/table_util_test.go | 4 +- p2p/discover/v4_udp.go | 2 +- p2p/discover/v5_udp.go | 2 +- 6 files changed, 72 insertions(+), 85 deletions(-) diff --git a/p2p/discover/common.go b/p2p/discover/common.go index c36e8dcc3a..c9f0477def 100644 --- a/p2p/discover/common.go +++ b/p2p/discover/common.go @@ -19,6 +19,7 @@ package discover import ( "crypto/ecdsa" "net" + "time" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/log" @@ -35,29 +36,39 @@ type UDPConn interface { LocalAddr() net.Addr } -type V5Config struct { - ProtocolID *[6]byte -} - // Config holds settings for the discovery listener. type Config struct { // These settings are required and configure the UDP listener: PrivateKey *ecdsa.PrivateKey - // These settings are optional: + // All remaining settings are optional. + + // Packet handling configuration: NetRestrict *netutil.Netlist // list of allowed IP networks - Bootnodes []*enode.Node // list of bootstrap nodes Unhandled chan<- ReadPacket // unhandled packets are sent on this channel - Log log.Logger // if set, log messages go here - // V5ProtocolID configures the discv5 protocol identifier. - V5ProtocolID *[6]byte + // Node table configuration: + Bootnodes []*enode.Node // list of bootstrap nodes + PingInterval time.Duration // speed of node liveness check + RefreshInterval time.Duration // used in bucket refresh + // The options below are useful in very specific cases, like in unit tests. + V5ProtocolID *[6]byte + Log log.Logger // if set, log messages go here ValidSchemes enr.IdentityScheme // allowed identity schemes Clock mclock.Clock } func (cfg Config) withDefaults() Config { + // Node table configuration: + if cfg.PingInterval == 0 { + cfg.PingInterval = 10 * time.Second + } + if cfg.RefreshInterval == 0 { + cfg.RefreshInterval = 30 * time.Minute + } + + // Debug/test settings: if cfg.Log == nil { cfg.Log = log.Root() } diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 41d5ac6e34..1397348aec 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -53,12 +53,10 @@ const ( bucketIPLimit, bucketSubnet = 2, 24 // at most 2 addresses from the same /24 tableIPLimit, tableSubnet = 10, 24 - refreshInterval = 30 * time.Minute - revalidateInterval = 10 * time.Second - copyNodesInterval = 30 * time.Second - seedMinTableTime = 5 * time.Minute - seedCount = 30 - seedMaxAge = 5 * 24 * time.Hour + copyNodesInterval = 30 * time.Second + seedMinTableTime = 5 * time.Minute + seedCount = 30 + seedMaxAge = 5 * 24 * time.Hour ) // Table is the 'node table', a Kademlia-like index of neighbor nodes. The table keeps @@ -71,9 +69,12 @@ type Table struct { rand *mrand.Rand // source of randomness, periodically reseeded ips netutil.DistinctNetSet - log log.Logger - db *enode.DB // database of known nodes - net transport + db *enode.DB // database of known nodes + net transport + cfg Config + log log.Logger + + // loop channels refreshReq chan chan struct{} initDone chan struct{} closeReq chan struct{} @@ -99,19 +100,21 @@ type bucket struct { ips netutil.DistinctNetSet } -func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger) (*Table, error) { +func newTable(t transport, db *enode.DB, cfg Config) (*Table, error) { + cfg = cfg.withDefaults() tab := &Table{ net: t, db: db, + cfg: cfg, + log: cfg.Log, refreshReq: make(chan chan struct{}), initDone: make(chan struct{}), closeReq: make(chan struct{}), closed: make(chan struct{}), rand: mrand.New(mrand.NewSource(0)), ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit}, - log: log, } - if err := tab.setFallbackNodes(bootnodes); err != nil { + if err := tab.setFallbackNodes(cfg.Bootnodes); err != nil { return nil, err } for i := range tab.buckets { @@ -125,25 +128,12 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger return tab, nil } -func (tab *Table) self() *enode.Node { - return tab.net.Self() -} - -func (tab *Table) seedRand() { - var b [8]byte - crand.Read(b[:]) - - tab.mutex.Lock() - tab.rand.Seed(int64(binary.BigEndian.Uint64(b[:]))) - tab.mutex.Unlock() -} - -// ReadRandomNodes fills the given slice with random nodes from the table. The results -// are guaranteed to be unique for a single invocation, no node will appear twice. -func (tab *Table) ReadRandomNodes(buf []*enode.Node) (n int) { +// Nodes returns all nodes contained in the table. +func (tab *Table) Nodes() []*enode.Node { if !tab.isInitDone() { - return 0 + return nil } + tab.mutex.Lock() defer tab.mutex.Unlock() @@ -153,12 +143,20 @@ func (tab *Table) ReadRandomNodes(buf []*enode.Node) (n int) { nodes = append(nodes, unwrapNode(n)) } } - // Shuffle. - for i := 0; i < len(nodes); i++ { - j := tab.rand.Intn(len(nodes)) - nodes[i], nodes[j] = nodes[j], nodes[i] - } - return copy(buf, nodes) + return nodes +} + +func (tab *Table) self() *enode.Node { + return tab.net.Self() +} + +func (tab *Table) seedRand() { + var b [8]byte + crand.Read(b[:]) + + tab.mutex.Lock() + tab.rand.Seed(int64(binary.BigEndian.Uint64(b[:]))) + tab.mutex.Unlock() } // getNode returns the node with the given ID or nil if it isn't in the table. @@ -218,7 +216,7 @@ func (tab *Table) refresh() <-chan struct{} { func (tab *Table) loop() { var ( revalidate = time.NewTimer(tab.nextRevalidateTime()) - refresh = time.NewTicker(refreshInterval) + refresh = time.NewTimer(tab.nextRefreshTime()) copyNodes = time.NewTicker(copyNodesInterval) refreshDone = make(chan struct{}) // where doRefresh reports completion revalidateDone chan struct{} // where doRevalidate reports completion @@ -251,6 +249,7 @@ loop: close(ch) } waiting, refreshDone = nil, nil + refresh.Reset(tab.nextRefreshTime()) case <-revalidate.C: revalidateDone = make(chan struct{}) go tab.doRevalidate(revalidateDone) @@ -373,7 +372,15 @@ func (tab *Table) nextRevalidateTime() time.Duration { tab.mutex.Lock() defer tab.mutex.Unlock() - return time.Duration(tab.rand.Int63n(int64(revalidateInterval))) + return time.Duration(tab.rand.Int63n(int64(tab.cfg.PingInterval))) +} + +func (tab *Table) nextRefreshTime() time.Duration { + tab.mutex.Lock() + defer tab.mutex.Unlock() + + half := tab.cfg.RefreshInterval / 2 + return half + time.Duration(tab.rand.Int63n(int64(half))) } // copyLiveNodes adds nodes from the table to the database if they have been in the table @@ -481,10 +488,12 @@ func (tab *Table) addSeenNode(n *node) { // Can't add: IP limit reached. return } + // Add to end of bucket: b.entries = append(b.entries, n) b.replacements = deleteNode(b.replacements, n) n.addedAt = time.Now() + if tab.nodeAddedHook != nil { tab.nodeAddedHook(n) } @@ -523,10 +532,12 @@ func (tab *Table) addVerifiedNode(n *node) { // Can't add: IP limit reached. return } + // Add to front of bucket. b.entries, _ = pushNode(b.entries, n, bucketSize) b.replacements = deleteNode(b.replacements, n) n.addedAt = time.Now() + if tab.nodeAddedHook != nil { tab.nodeAddedHook(n) } diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index 1ef63fe010..2781dd4225 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -247,41 +247,6 @@ func TestTable_findnodeByID(t *testing.T) { } } -func TestTable_ReadRandomNodesGetAll(t *testing.T) { - cfg := &quick.Config{ - MaxCount: 200, - Rand: rand.New(rand.NewSource(time.Now().Unix())), - Values: func(args []reflect.Value, rand *rand.Rand) { - args[0] = reflect.ValueOf(make([]*enode.Node, rand.Intn(1000))) - }, - } - test := func(buf []*enode.Node) bool { - transport := newPingRecorder() - tab, db := newTestTable(transport) - defer db.Close() - defer tab.close() - <-tab.initDone - - for i := 0; i < len(buf); i++ { - ld := cfg.Rand.Intn(len(tab.buckets)) - fillTable(tab, []*node{nodeAtDistance(tab.self().ID(), ld, intIP(ld))}) - } - gotN := tab.ReadRandomNodes(buf) - if gotN != tab.len() { - t.Errorf("wrong number of nodes, got %d, want %d", gotN, tab.len()) - return false - } - if hasDuplicates(wrapNodes(buf[:gotN])) { - t.Errorf("result contains duplicates") - return false - } - return true - } - if err := quick.Check(test, cfg); err != nil { - t.Error(err) - } -} - type closeTest struct { Self enode.ID Target enode.ID diff --git a/p2p/discover/table_util_test.go b/p2p/discover/table_util_test.go index 527afbb77e..5b5e9a0431 100644 --- a/p2p/discover/table_util_test.go +++ b/p2p/discover/table_util_test.go @@ -28,7 +28,6 @@ import ( "sync" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" ) @@ -42,8 +41,9 @@ func init() { } func newTestTable(t transport) (*Table, *enode.DB) { + cfg := Config{} db, _ := enode.OpenDB("") - tab, _ := newTable(t, db, nil, log.Root()) + tab, _ := newTable(t, db, cfg) go tab.loop() return tab, db } diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index 21fee9c793..d61a52c395 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -142,7 +142,7 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { log: cfg.Log, } - tab, err := newTable(t, ln.Database(), cfg.Bootnodes, t.log) + tab, err := newTable(t, ln.Database(), cfg) if err != nil { return nil, err } diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 6a86e72a50..7bed9dbcfd 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -174,7 +174,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { cancelCloseCtx: cancelCloseCtx, } t.talk = newTalkSystem(t) - tab, err := newTable(t, t.db, cfg.Bootnodes, cfg.Log) + tab, err := newTable(t, t.db, cfg) if err != nil { return nil, err }