From e3430ac7df8a7c232f6374120eb1297c7fdfe5ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 2 Feb 2021 10:44:36 +0200 Subject: [PATCH] eth: check snap satelliteness, delegate drop to eth (#22235) * eth: check snap satelliteness, delegate drop to eth * eth: better handle eth/snap satellite relation, merge reg/unreg paths --- eth/handler.go | 106 +++++++------ eth/handler_eth.go | 4 +- eth/handler_eth_test.go | 4 +- eth/handler_snap.go | 8 +- eth/peer.go | 9 +- eth/peerset.go | 261 +++++++++++++-------------------- eth/protocols/eth/handler.go | 6 +- eth/protocols/eth/protocol.go | 8 +- eth/protocols/snap/handler.go | 6 +- eth/protocols/snap/protocol.go | 8 +- eth/sync.go | 4 +- eth/sync_test.go | 2 +- p2p/peer.go | 11 +- 13 files changed, 201 insertions(+), 236 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index 5ae0925bb..a5a62b894 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -218,7 +218,7 @@ func newHandler(config *handlerConfig) (*handler, error) { h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer) fetchTx := func(peer string, hashes []common.Hash) error { - p := h.peers.ethPeer(peer) + p := h.peers.peer(peer) if p == nil { return errors.New("unknown peer") } @@ -229,8 +229,17 @@ func newHandler(config *handlerConfig) (*handler, error) { return h, nil } -// runEthPeer +// runEthPeer registers an eth peer into the joint eth/snap peerset, adds it to +// various subsistems and starts handling messages. func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { + // If the peer has a `snap` extension, wait for it to connect so we can have + // a uniform initialization/teardown mechanism + snap, err := h.peers.waitSnapExtension(peer) + if err != nil { + peer.Log().Error("Snapshot extension barrier failed", "err", err) + return err + } + // TODO(karalabe): Not sure why this is needed if !h.chainSync.handlePeerEvent(peer) { return p2p.DiscQuitting } @@ -251,37 +260,46 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { return err } reject := false // reserved peer slots - if atomic.LoadUint32(&h.snapSync) == 1 && !peer.SupportsCap("snap", 1) { - // If we are running snap-sync, we want to reserve roughly half the peer - // slots for peers supporting the snap protocol. - // The logic here is; we only allow up to 5 more non-snap peers than snap-peers. - if all, snp := h.peers.Len(), h.peers.SnapLen(); all-snp > snp+5 { - reject = true + if atomic.LoadUint32(&h.snapSync) == 1 { + if snap == nil { + // If we are running snap-sync, we want to reserve roughly half the peer + // slots for peers supporting the snap protocol. + // The logic here is; we only allow up to 5 more non-snap peers than snap-peers. + if all, snp := h.peers.len(), h.peers.snapLen(); all-snp > snp+5 { + reject = true + } } } // Ignore maxPeers if this is a trusted peer if !peer.Peer.Info().Network.Trusted { - if reject || h.peers.Len() >= h.maxPeers { + if reject || h.peers.len() >= h.maxPeers { return p2p.DiscTooManyPeers } } peer.Log().Debug("Ethereum peer connected", "name", peer.Name()) // Register the peer locally - if err := h.peers.registerEthPeer(peer); err != nil { + if err := h.peers.registerPeer(peer, snap); err != nil { peer.Log().Error("Ethereum peer registration failed", "err", err) return err } defer h.removePeer(peer.ID()) - p := h.peers.ethPeer(peer.ID()) + p := h.peers.peer(peer.ID()) if p == nil { return errors.New("peer dropped during handling") } // Register the peer in the downloader. If the downloader considers it banned, we disconnect if err := h.downloader.RegisterPeer(peer.ID(), peer.Version(), peer); err != nil { + peer.Log().Error("Failed to register peer in eth syncer", "err", err) return err } + if snap != nil { + if err := h.downloader.SnapSyncer.Register(snap); err != nil { + peer.Log().Error("Failed to register peer in snap syncer", "err", err) + return err + } + } h.chainSync.handlePeerEvent(peer) // Propagate existing transactions. new transactions appearing @@ -317,25 +335,23 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { return handler(peer) } -// runSnapPeer -func (h *handler) runSnapPeer(peer *snap.Peer, handler snap.Handler) error { +// runSnapExtension registers a `snap` peer into the joint eth/snap peerset and +// starts handling inbound messages. As `snap` is only a satellite protocol to +// `eth`, all subsystem registrations and lifecycle management will be done by +// the main `eth` handler to prevent strange races. +func (h *handler) runSnapExtension(peer *snap.Peer, handler snap.Handler) error { h.peerWG.Add(1) defer h.peerWG.Done() - // Register the peer locally - if err := h.peers.registerSnapPeer(peer); err != nil { - peer.Log().Error("Snapshot peer registration failed", "err", err) - return err - } - defer h.removePeer(peer.ID()) - - if err := h.downloader.SnapSyncer.Register(peer); err != nil { + if err := h.peers.registerSnapExtension(peer); err != nil { + peer.Log().Error("Snapshot extension registration failed", "err", err) return err } - // Handle incoming messages until the connection is torn down return handler(peer) } +// removePeer unregisters a peer from the downloader and fetchers, removes it from +// the set of tracked peers and closes the network connection to it. func (h *handler) removePeer(id string) { // Create a custom logger to avoid printing the entire id var logger log.Logger @@ -345,33 +361,27 @@ func (h *handler) removePeer(id string) { } else { logger = log.New("peer", id[:8]) } - // Remove the eth peer if it exists - eth := h.peers.ethPeer(id) - if eth != nil { - logger.Debug("Removing Ethereum peer") - h.downloader.UnregisterPeer(id) - h.txFetcher.Drop(id) - - if err := h.peers.unregisterEthPeer(id); err != nil { - logger.Error("Ethereum peer removal failed", "err", err) - } + // Abort if the peer does not exist + peer := h.peers.peer(id) + if peer == nil { + logger.Error("Ethereum peer removal failed", "err", errPeerNotRegistered) + return } - // Remove the snap peer if it exists - snap := h.peers.snapPeer(id) - if snap != nil { - logger.Debug("Removing Snapshot peer") + // Remove the `eth` peer if it exists + logger.Debug("Removing Ethereum peer", "snap", peer.snapExt != nil) + + // Remove the `snap` extension if it exists + if peer.snapExt != nil { h.downloader.SnapSyncer.Unregister(id) - if err := h.peers.unregisterSnapPeer(id); err != nil { - logger.Error("Snapshot peer removel failed", "err", err) - } - } - // Hard disconnect at the networking layer - if eth != nil { - eth.Peer.Disconnect(p2p.DiscUselessPeer) } - if snap != nil { - snap.Peer.Disconnect(p2p.DiscUselessPeer) + h.downloader.UnregisterPeer(id) + h.txFetcher.Drop(id) + + if err := h.peers.unregisterPeer(id); err != nil { + logger.Error("Ethereum peer removal failed", "err", err) } + // Hard disconnect at the networking layer + peer.Peer.Disconnect(p2p.DiscUselessPeer) } func (h *handler) Start(maxPeers int) { @@ -417,7 +427,7 @@ func (h *handler) Stop() { // will only announce its availability (depending what's requested). func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { hash := block.Hash() - peers := h.peers.ethPeersWithoutBlock(hash) + peers := h.peers.peersWithoutBlock(hash) // If propagation is requested, send to a subset of the peer if propagate { @@ -456,7 +466,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions, propagate bool) // Broadcast transactions to a batch of peers not knowing about it if propagate { for _, tx := range txs { - peers := h.peers.ethPeersWithoutTransaction(tx.Hash()) + peers := h.peers.peersWithoutTransaction(tx.Hash()) // Send the block to a subset of our peers transfer := peers[:int(math.Sqrt(float64(len(peers))))] @@ -472,7 +482,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions, propagate bool) } // Otherwise only broadcast the announcement to peers for _, tx := range txs { - peers := h.peers.ethPeersWithoutTransaction(tx.Hash()) + peers := h.peers.peersWithoutTransaction(tx.Hash()) for _, peer := range peers { annos[peer] = append(annos[peer], tx.Hash()) } diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 84bdac659..3ff9f2245 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -47,7 +47,7 @@ func (h *ethHandler) RunPeer(peer *eth.Peer, hand eth.Handler) error { // PeerInfo retrieves all known `eth` information about a peer. func (h *ethHandler) PeerInfo(id enode.ID) interface{} { - if p := h.peers.ethPeer(id.String()); p != nil { + if p := h.peers.peer(id.String()); p != nil { return p.info() } return nil @@ -107,7 +107,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { // handleHeaders is invoked from a peer's message handler when it transmits a batch // of headers for the local node to process. func (h *ethHandler) handleHeaders(peer *eth.Peer, headers []*types.Header) error { - p := h.peers.ethPeer(peer.ID()) + p := h.peers.peer(peer.ID()) if p == nil { return errors.New("unregistered during callback") } diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 0e5c0c90e..5f5d4e9e8 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -574,11 +574,11 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo // Verify that the remote peer is maintained or dropped if drop { - if peers := handler.handler.peers.Len(); peers != 0 { + if peers := handler.handler.peers.len(); peers != 0 { t.Fatalf("peer count mismatch: have %d, want %d", peers, 0) } } else { - if peers := handler.handler.peers.Len(); peers != 1 { + if peers := handler.handler.peers.len(); peers != 1 { t.Fatalf("peer count mismatch: have %d, want %d", peers, 1) } } diff --git a/eth/handler_snap.go b/eth/handler_snap.go index 25975bf60..767416ffd 100644 --- a/eth/handler_snap.go +++ b/eth/handler_snap.go @@ -30,13 +30,15 @@ func (h *snapHandler) Chain() *core.BlockChain { return h.chain } // RunPeer is invoked when a peer joins on the `snap` protocol. func (h *snapHandler) RunPeer(peer *snap.Peer, hand snap.Handler) error { - return (*handler)(h).runSnapPeer(peer, hand) + return (*handler)(h).runSnapExtension(peer, hand) } // PeerInfo retrieves all known `snap` information about a peer. func (h *snapHandler) PeerInfo(id enode.ID) interface{} { - if p := h.peers.snapPeer(id.String()); p != nil { - return p.info() + if p := h.peers.peer(id.String()); p != nil { + if p.snapExt != nil { + return p.snapExt.info() + } } return nil } diff --git a/eth/peer.go b/eth/peer.go index 6970c8afd..1cea9c640 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -36,9 +36,11 @@ type ethPeerInfo struct { // ethPeer is a wrapper around eth.Peer to maintain a few extra metadata. type ethPeer struct { *eth.Peer + snapExt *snapPeer // Satellite `snap` connection - syncDrop *time.Timer // Connection dropper if `eth` sync progress isn't validated in time - lock sync.RWMutex // Mutex protecting the internal fields + syncDrop *time.Timer // Connection dropper if `eth` sync progress isn't validated in time + snapWait chan struct{} // Notification channel for snap connections + lock sync.RWMutex // Mutex protecting the internal fields } // info gathers and returns some `eth` protocol metadata known about a peer. @@ -61,9 +63,6 @@ type snapPeerInfo struct { // snapPeer is a wrapper around snap.Peer to maintain a few extra metadata. type snapPeer struct { *snap.Peer - - ethDrop *time.Timer // Connection dropper if `eth` doesn't connect in time - lock sync.RWMutex // Mutex protecting the internal fields } // info gathers and returns some `snap` protocol metadata known about a peer. diff --git a/eth/peerset.go b/eth/peerset.go index 663c5ce36..f0657e140 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -20,12 +20,10 @@ import ( "errors" "math/big" "sync" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" ) @@ -42,22 +40,19 @@ var ( // a peer set, but no peer with the given id exists. errPeerNotRegistered = errors.New("peer not registered") - // ethConnectTimeout is the `snap` timeout for `eth` to connect too. - ethConnectTimeout = 3 * time.Second + // errSnapWithoutEth is returned if a peer attempts to connect only on the + // snap protocol without advertizing the eth main protocol. + errSnapWithoutEth = errors.New("peer connected on snap without compatible eth support") ) // peerSet represents the collection of active peers currently participating in -// the `eth` or `snap` protocols. +// the `eth` protocol, with or without the `snap` extension. type peerSet struct { - ethPeers map[string]*ethPeer // Peers connected on the `eth` protocol - snapPeers map[string]*snapPeer // Peers connected on the `snap` protocol + peers map[string]*ethPeer // Peers connected on the `eth` protocol + snapPeers int // Number of `snap` compatible peers for connection prioritization - ethJoinFeed event.Feed // Events when an `eth` peer successfully joins - ethDropFeed event.Feed // Events when an `eth` peer gets dropped - snapJoinFeed event.Feed // Events when a `snap` peer joins on both `eth` and `snap` - snapDropFeed event.Feed // Events when a `snap` peer gets dropped (only if fully joined) - - scope event.SubscriptionScope // Subscription group to unsubscribe everyone at once + snapWait map[string]chan *snap.Peer // Peers connected on `eth` waiting for their snap extension + snapPend map[string]*snap.Peer // Peers connected on the `snap` protocol, but not yet on `eth` lock sync.RWMutex closed bool @@ -66,176 +61,134 @@ type peerSet struct { // newPeerSet creates a new peer set to track the active participants. func newPeerSet() *peerSet { return &peerSet{ - ethPeers: make(map[string]*ethPeer), - snapPeers: make(map[string]*snapPeer), + peers: make(map[string]*ethPeer), + snapWait: make(map[string]chan *snap.Peer), + snapPend: make(map[string]*snap.Peer), } } -// subscribeEthJoin registers a subscription for peers joining (and completing -// the handshake) on the `eth` protocol. -func (ps *peerSet) subscribeEthJoin(ch chan<- *eth.Peer) event.Subscription { - return ps.scope.Track(ps.ethJoinFeed.Subscribe(ch)) -} - -// subscribeEthDrop registers a subscription for peers being dropped from the -// `eth` protocol. -func (ps *peerSet) subscribeEthDrop(ch chan<- *eth.Peer) event.Subscription { - return ps.scope.Track(ps.ethDropFeed.Subscribe(ch)) -} - -// subscribeSnapJoin registers a subscription for peers joining (and completing -// the `eth` join) on the `snap` protocol. -func (ps *peerSet) subscribeSnapJoin(ch chan<- *snap.Peer) event.Subscription { - return ps.scope.Track(ps.snapJoinFeed.Subscribe(ch)) -} - -// subscribeSnapDrop registers a subscription for peers being dropped from the -// `snap` protocol. -func (ps *peerSet) subscribeSnapDrop(ch chan<- *snap.Peer) event.Subscription { - return ps.scope.Track(ps.snapDropFeed.Subscribe(ch)) -} - -// registerEthPeer injects a new `eth` peer into the working set, or returns an -// error if the peer is already known. The peer is announced on the `eth` join -// feed and if it completes a pending `snap` peer, also on that feed. -func (ps *peerSet) registerEthPeer(peer *eth.Peer) error { - ps.lock.Lock() - if ps.closed { - ps.lock.Unlock() - return errPeerSetClosed +// registerSnapExtension unblocks an already connected `eth` peer waiting for its +// `snap` extension, or if no such peer exists, tracks the extension for the time +// being until the `eth` main protocol starts looking for it. +func (ps *peerSet) registerSnapExtension(peer *snap.Peer) error { + // Reject the peer if it advertises `snap` without `eth` as `snap` is only a + // satellite protocol meaningful with the chain selection of `eth` + if !peer.SupportsCap(eth.ProtocolName, eth.ProtocolVersions) { + return errSnapWithoutEth } + // Ensure nobody can double connect + ps.lock.Lock() + defer ps.lock.Unlock() + id := peer.ID() - if _, ok := ps.ethPeers[id]; ok { - ps.lock.Unlock() - return errPeerAlreadyRegistered + if _, ok := ps.peers[id]; ok { + return errPeerAlreadyRegistered // avoid connections with the same id as existing ones } - ps.ethPeers[id] = ðPeer{Peer: peer} - - snap, ok := ps.snapPeers[id] - ps.lock.Unlock() - - if ok { - // Previously dangling `snap` peer, stop it's timer since `eth` connected - snap.lock.Lock() - if snap.ethDrop != nil { - snap.ethDrop.Stop() - snap.ethDrop = nil - } - snap.lock.Unlock() + if _, ok := ps.snapPend[id]; ok { + return errPeerAlreadyRegistered // avoid connections with the same id as pending ones } - ps.ethJoinFeed.Send(peer) - if ok { - ps.snapJoinFeed.Send(snap.Peer) + // Inject the peer into an `eth` counterpart is available, otherwise save for later + if wait, ok := ps.snapWait[id]; ok { + delete(ps.snapWait, id) + wait <- peer + return nil } + ps.snapPend[id] = peer return nil } -// unregisterEthPeer removes a remote peer from the active set, disabling any further -// actions to/from that particular entity. The drop is announced on the `eth` drop -// feed and also on the `snap` feed if the eth/snap duality was broken just now. -func (ps *peerSet) unregisterEthPeer(id string) error { +// waitExtensions blocks until all satellite protocols are connected and tracked +// by the peerset. +func (ps *peerSet) waitSnapExtension(peer *eth.Peer) (*snap.Peer, error) { + // If the peer does not support a compatible `snap`, don't wait + if !peer.SupportsCap(snap.ProtocolName, snap.ProtocolVersions) { + return nil, nil + } + // Ensure nobody can double connect ps.lock.Lock() - eth, ok := ps.ethPeers[id] - if !ok { + + id := peer.ID() + if _, ok := ps.peers[id]; ok { ps.lock.Unlock() - return errPeerNotRegistered + return nil, errPeerAlreadyRegistered // avoid connections with the same id as existing ones + } + if _, ok := ps.snapWait[id]; ok { + ps.lock.Unlock() + return nil, errPeerAlreadyRegistered // avoid connections with the same id as pending ones } - delete(ps.ethPeers, id) + // If `snap` already connected, retrieve the peer from the pending set + if snap, ok := ps.snapPend[id]; ok { + delete(ps.snapPend, id) - snap, ok := ps.snapPeers[id] + ps.lock.Unlock() + return snap, nil + } + // Otherwise wait for `snap` to connect concurrently + wait := make(chan *snap.Peer) + ps.snapWait[id] = wait ps.lock.Unlock() - ps.ethDropFeed.Send(eth) - if ok { - ps.snapDropFeed.Send(snap) - } - return nil + return <-wait, nil } -// registerSnapPeer injects a new `snap` peer into the working set, or returns -// an error if the peer is already known. The peer is announced on the `snap` -// join feed if it completes an existing `eth` peer. -// -// If the peer isn't yet connected on `eth` and fails to do so within a given -// amount of time, it is dropped. This enforces that `snap` is an extension to -// `eth`, not a standalone leeching protocol. -func (ps *peerSet) registerSnapPeer(peer *snap.Peer) error { +// registerPeer injects a new `eth` peer into the working set, or returns an error +// if the peer is already known. +func (ps *peerSet) registerPeer(peer *eth.Peer, ext *snap.Peer) error { + // Start tracking the new peer ps.lock.Lock() + defer ps.lock.Unlock() + if ps.closed { - ps.lock.Unlock() return errPeerSetClosed } id := peer.ID() - if _, ok := ps.snapPeers[id]; ok { - ps.lock.Unlock() + if _, ok := ps.peers[id]; ok { return errPeerAlreadyRegistered } - ps.snapPeers[id] = &snapPeer{Peer: peer} - - _, ok := ps.ethPeers[id] - if !ok { - // Dangling `snap` peer, start a timer to drop if `eth` doesn't connect - ps.snapPeers[id].ethDrop = time.AfterFunc(ethConnectTimeout, func() { - peer.Log().Warn("Snapshot peer missing eth, dropping", "addr", peer.RemoteAddr(), "type", peer.Name()) - peer.Disconnect(p2p.DiscUselessPeer) - }) + eth := ðPeer{ + Peer: peer, } - ps.lock.Unlock() - - if ok { - ps.snapJoinFeed.Send(peer) + if ext != nil { + eth.snapExt = &snapPeer{ext} + ps.snapPeers++ } + ps.peers[id] = eth return nil } -// unregisterSnapPeer removes a remote peer from the active set, disabling any -// further actions to/from that particular entity. The drop is announced on the -// `snap` drop feed. -func (ps *peerSet) unregisterSnapPeer(id string) error { +// unregisterPeer removes a remote peer from the active set, disabling any further +// actions to/from that particular entity. +func (ps *peerSet) unregisterPeer(id string) error { ps.lock.Lock() - peer, ok := ps.snapPeers[id] + defer ps.lock.Unlock() + + peer, ok := ps.peers[id] if !ok { - ps.lock.Unlock() return errPeerNotRegistered } - delete(ps.snapPeers, id) - ps.lock.Unlock() - - peer.lock.Lock() - if peer.ethDrop != nil { - peer.ethDrop.Stop() - peer.ethDrop = nil + delete(ps.peers, id) + if peer.snapExt != nil { + ps.snapPeers-- } - peer.lock.Unlock() - - ps.snapDropFeed.Send(peer) return nil } -// ethPeer retrieves the registered `eth` peer with the given id. -func (ps *peerSet) ethPeer(id string) *ethPeer { - ps.lock.RLock() - defer ps.lock.RUnlock() - - return ps.ethPeers[id] -} - -// snapPeer retrieves the registered `snap` peer with the given id. -func (ps *peerSet) snapPeer(id string) *snapPeer { +// peer retrieves the registered peer with the given id. +func (ps *peerSet) peer(id string) *ethPeer { ps.lock.RLock() defer ps.lock.RUnlock() - return ps.snapPeers[id] + return ps.peers[id] } -// ethPeersWithoutBlock retrieves a list of `eth` peers that do not have a given -// block in their set of known hashes so it might be propagated to them. -func (ps *peerSet) ethPeersWithoutBlock(hash common.Hash) []*ethPeer { +// peersWithoutBlock retrieves a list of peers that do not have a given block in +// their set of known hashes so it might be propagated to them. +func (ps *peerSet) peersWithoutBlock(hash common.Hash) []*ethPeer { ps.lock.RLock() defer ps.lock.RUnlock() - list := make([]*ethPeer, 0, len(ps.ethPeers)) - for _, p := range ps.ethPeers { + list := make([]*ethPeer, 0, len(ps.peers)) + for _, p := range ps.peers { if !p.KnownBlock(hash) { list = append(list, p) } @@ -243,14 +196,14 @@ func (ps *peerSet) ethPeersWithoutBlock(hash common.Hash) []*ethPeer { return list } -// ethPeersWithoutTransaction retrieves a list of `eth` peers that do not have a -// given transaction in their set of known hashes. -func (ps *peerSet) ethPeersWithoutTransaction(hash common.Hash) []*ethPeer { +// peersWithoutTransaction retrieves a list of peers that do not have a given +// transaction in their set of known hashes. +func (ps *peerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer { ps.lock.RLock() defer ps.lock.RUnlock() - list := make([]*ethPeer, 0, len(ps.ethPeers)) - for _, p := range ps.ethPeers { + list := make([]*ethPeer, 0, len(ps.peers)) + for _, p := range ps.peers { if !p.KnownTransaction(hash) { list = append(list, p) } @@ -258,28 +211,27 @@ func (ps *peerSet) ethPeersWithoutTransaction(hash common.Hash) []*ethPeer { return list } -// Len returns if the current number of `eth` peers in the set. Since the `snap` +// len returns if the current number of `eth` peers in the set. Since the `snap` // peers are tied to the existence of an `eth` connection, that will always be a // subset of `eth`. -func (ps *peerSet) Len() int { +func (ps *peerSet) len() int { ps.lock.RLock() defer ps.lock.RUnlock() - return len(ps.ethPeers) + return len(ps.peers) } -// SnapLen returns if the current number of `snap` peers in the set. Since the `snap` -// peers are tied to the existence of an `eth` connection, that will always be a -// subset of `eth`. -func (ps *peerSet) SnapLen() int { +// snapLen returns if the current number of `snap` peers in the set. +func (ps *peerSet) snapLen() int { ps.lock.RLock() defer ps.lock.RUnlock() - return len(ps.snapPeers) + + return ps.snapPeers } -// ethPeerWithHighestTD retrieves the known peer with the currently highest total +// peerWithHighestTD retrieves the known peer with the currently highest total // difficulty. -func (ps *peerSet) ethPeerWithHighestTD() *eth.Peer { +func (ps *peerSet) peerWithHighestTD() *eth.Peer { ps.lock.RLock() defer ps.lock.RUnlock() @@ -287,7 +239,7 @@ func (ps *peerSet) ethPeerWithHighestTD() *eth.Peer { bestPeer *eth.Peer bestTd *big.Int ) - for _, p := range ps.ethPeers { + for _, p := range ps.peers { if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 { bestPeer, bestTd = p.Peer, td } @@ -300,10 +252,7 @@ func (ps *peerSet) close() { ps.lock.Lock() defer ps.lock.Unlock() - for _, p := range ps.ethPeers { - p.Disconnect(p2p.DiscQuitting) - } - for _, p := range ps.snapPeers { + for _, p := range ps.peers { p.Disconnect(p2p.DiscQuitting) } ps.closed = true diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 25ddcd93e..e32008fb4 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -103,12 +103,12 @@ type TxPool interface { // MakeProtocols constructs the P2P protocol definitions for `eth`. func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator) []p2p.Protocol { - protocols := make([]p2p.Protocol, len(protocolVersions)) - for i, version := range protocolVersions { + protocols := make([]p2p.Protocol, len(ProtocolVersions)) + for i, version := range ProtocolVersions { version := version // Closure protocols[i] = p2p.Protocol{ - Name: protocolName, + Name: ProtocolName, Version: version, Length: protocolLengths[version], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index 63d3494ec..9fff64b72 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -34,13 +34,13 @@ const ( ETH65 = 65 ) -// protocolName is the official short name of the `eth` protocol used during +// ProtocolName is the official short name of the `eth` protocol used during // devp2p capability negotiation. -const protocolName = "eth" +const ProtocolName = "eth" -// protocolVersions are the supported versions of the `eth` protocol (first +// ProtocolVersions are the supported versions of the `eth` protocol (first // is primary). -var protocolVersions = []uint{ETH65, ETH64} +var ProtocolVersions = []uint{ETH65, ETH64} // protocolLengths are the number of implemented message corresponding to // different protocol versions. diff --git a/eth/protocols/snap/handler.go b/eth/protocols/snap/handler.go index 36322e648..24c859955 100644 --- a/eth/protocols/snap/handler.go +++ b/eth/protocols/snap/handler.go @@ -77,12 +77,12 @@ type Backend interface { // MakeProtocols constructs the P2P protocol definitions for `snap`. func MakeProtocols(backend Backend, dnsdisc enode.Iterator) []p2p.Protocol { - protocols := make([]p2p.Protocol, len(protocolVersions)) - for i, version := range protocolVersions { + protocols := make([]p2p.Protocol, len(ProtocolVersions)) + for i, version := range ProtocolVersions { version := version // Closure protocols[i] = p2p.Protocol{ - Name: protocolName, + Name: ProtocolName, Version: version, Length: protocolLengths[version], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { diff --git a/eth/protocols/snap/protocol.go b/eth/protocols/snap/protocol.go index f1a25a206..a74142caf 100644 --- a/eth/protocols/snap/protocol.go +++ b/eth/protocols/snap/protocol.go @@ -30,13 +30,13 @@ const ( snap1 = 1 ) -// protocolName is the official short name of the `snap` protocol used during +// ProtocolName is the official short name of the `snap` protocol used during // devp2p capability negotiation. -const protocolName = "snap" +const ProtocolName = "snap" -// protocolVersions are the supported versions of the `snap` protocol (first +// ProtocolVersions are the supported versions of the `snap` protocol (first // is primary). -var protocolVersions = []uint{snap1} +var ProtocolVersions = []uint{snap1} // protocolLengths are the number of implemented message corresponding to // different protocol versions. diff --git a/eth/sync.go b/eth/sync.go index eedb8b747..dc72e8838 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -247,11 +247,11 @@ func (cs *chainSyncer) nextSyncOp() *chainSyncOp { } else if minPeers > cs.handler.maxPeers { minPeers = cs.handler.maxPeers } - if cs.handler.peers.Len() < minPeers { + if cs.handler.peers.len() < minPeers { return nil } // We have enough peers, check TD - peer := cs.handler.peers.ethPeerWithHighestTD() + peer := cs.handler.peers.peerWithHighestTD() if peer == nil { return nil } diff --git a/eth/sync_test.go b/eth/sync_test.go index 473e19518..9cc806b18 100644 --- a/eth/sync_test.go +++ b/eth/sync_test.go @@ -70,7 +70,7 @@ func testFastSyncDisabling(t *testing.T, protocol uint) { time.Sleep(250 * time.Millisecond) // Check that fast sync was disabled - op := peerToSyncOp(downloader.FastSync, empty.handler.peers.ethPeerWithHighestTD()) + op := peerToSyncOp(downloader.FastSync, empty.handler.peers.peerWithHighestTD()) if err := empty.handler.doSync(op); err != nil { t.Fatal("sync failed:", err) } diff --git a/p2p/peer.go b/p2p/peer.go index 43ccef5c4..08881e258 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -158,11 +158,16 @@ func (p *Peer) Caps() []Cap { return p.rw.caps } -// SupportsCap returns true if the peer supports the given protocol/version -func (p *Peer) SupportsCap(protocol string, version uint) bool { +// SupportsCap returns true if the peer supports any of the enumerated versions +// of a specific protocol. +func (p *Peer) SupportsCap(protocol string, versions []uint) bool { for _, cap := range p.rw.caps { if cap.Name == protocol { - return version <= cap.Version + for _, ver := range versions { + if cap.Version == ver { + return true + } + } } } return false