diff --git a/p2p/metrics.go b/p2p/metrics.go index 30bd56bd4d..44946473fa 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -20,102 +20,37 @@ package p2p import ( "net" - "sync" - "sync/atomic" - "time" - "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" ) const ( - MetricsInboundTraffic = "p2p/ingress" // Name for the registered inbound traffic meter - MetricsOutboundTraffic = "p2p/egress" // Name for the registered outbound traffic meter - MetricsOutboundConnects = "p2p/dials" // Name for the registered outbound connects meter - MetricsInboundConnects = "p2p/serves" // Name for the registered inbound connects meter - - MeteredPeerLimit = 1024 // This amount of peers are individually metered + ingressMeterName = "p2p/ingress" + egressMeterName = "p2p/egress" ) var ( - ingressConnectMeter = metrics.NewRegisteredMeter(MetricsInboundConnects, nil) // Meter counting the ingress connections - ingressTrafficMeter = metrics.NewRegisteredMeter(MetricsInboundTraffic, nil) // Meter metering the cumulative ingress traffic - egressConnectMeter = metrics.NewRegisteredMeter(MetricsOutboundConnects, nil) // Meter counting the egress connections - egressTrafficMeter = metrics.NewRegisteredMeter(MetricsOutboundTraffic, nil) // Meter metering the cumulative egress traffic - activePeerGauge = metrics.NewRegisteredGauge("p2p/peers", nil) // Gauge tracking the current peer count - - PeerIngressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsInboundTraffic+"/") // Registry containing the peer ingress - PeerEgressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsOutboundTraffic+"/") // Registry containing the peer egress - - meteredPeerFeed event.Feed // Event feed for peer metrics - meteredPeerCount int32 // Actually stored peer connection count -) - -// MeteredPeerEventType is the type of peer events emitted by a metered connection. -type MeteredPeerEventType int - -const ( - // PeerHandshakeSucceeded is the type of event - // emitted when a peer successfully makes the handshake. - PeerHandshakeSucceeded MeteredPeerEventType = iota - - // PeerHandshakeFailed is the type of event emitted when a peer fails to - // make the handshake or disconnects before it. - PeerHandshakeFailed - - // PeerDisconnected is the type of event emitted when a peer disconnects. - PeerDisconnected + ingressConnectMeter = metrics.NewRegisteredMeter("p2p/serves", nil) + ingressTrafficMeter = metrics.NewRegisteredMeter(ingressMeterName, nil) + egressConnectMeter = metrics.NewRegisteredMeter("p2p/dials", nil) + egressTrafficMeter = metrics.NewRegisteredMeter(egressMeterName, nil) + activePeerGauge = metrics.NewRegisteredGauge("p2p/peers", nil) ) -// MeteredPeerEvent is an event emitted when peers connect or disconnect. -type MeteredPeerEvent struct { - Type MeteredPeerEventType // Type of peer event - Addr string // TCP address of the peer - Elapsed time.Duration // Time elapsed between the connection and the handshake/disconnection - Peer *Peer // Connected remote node instance - Ingress uint64 // Ingress count at the moment of the event - Egress uint64 // Egress count at the moment of the event -} - -// SubscribeMeteredPeerEvent registers a subscription for peer life-cycle events -// if metrics collection is enabled. -func SubscribeMeteredPeerEvent(ch chan<- MeteredPeerEvent) event.Subscription { - return meteredPeerFeed.Subscribe(ch) -} - // meteredConn is a wrapper around a net.Conn that meters both the // inbound and outbound network traffic. type meteredConn struct { - net.Conn // Network connection to wrap with metering - - connected time.Time // Connection time of the peer - addr *net.TCPAddr // TCP address of the peer - peer *Peer // Peer instance - - // trafficMetered denotes if the peer is registered in the traffic registries. - // Its value is true if the metered peer count doesn't reach the limit in the - // moment of the peer's connection. - trafficMetered bool - ingressMeter metrics.Meter // Meter for the read bytes of the peer - egressMeter metrics.Meter // Meter for the written bytes of the peer - - lock sync.RWMutex // Lock protecting the metered connection's internals + net.Conn } // newMeteredConn creates a new metered connection, bumps the ingress or egress // connection meter and also increases the metered peer count. If the metrics -// system is disabled or the IP address is unspecified, this function returns -// the original object. +// system is disabled, function returns the original connection. func newMeteredConn(conn net.Conn, ingress bool, addr *net.TCPAddr) net.Conn { // Short circuit if metrics are disabled if !metrics.Enabled { return conn } - if addr == nil || addr.IP.IsUnspecified() { - log.Warn("Peer address is unspecified") - return conn - } // Bump the connection counters and wrap the connection if ingress { ingressConnectMeter.Mark(1) @@ -123,12 +58,7 @@ func newMeteredConn(conn net.Conn, ingress bool, addr *net.TCPAddr) net.Conn { egressConnectMeter.Mark(1) } activePeerGauge.Inc(1) - - return &meteredConn{ - Conn: conn, - addr: addr, - connected: time.Now(), - } + return &meteredConn{Conn: conn} } // Read delegates a network read to the underlying connection, bumping the common @@ -136,11 +66,6 @@ func newMeteredConn(conn net.Conn, ingress bool, addr *net.TCPAddr) net.Conn { func (c *meteredConn) Read(b []byte) (n int, err error) { n, err = c.Conn.Read(b) ingressTrafficMeter.Mark(int64(n)) - c.lock.RLock() - if c.trafficMetered { - c.ingressMeter.Mark(int64(n)) - } - c.lock.RUnlock() return n, err } @@ -149,84 +74,15 @@ func (c *meteredConn) Read(b []byte) (n int, err error) { func (c *meteredConn) Write(b []byte) (n int, err error) { n, err = c.Conn.Write(b) egressTrafficMeter.Mark(int64(n)) - c.lock.RLock() - if c.trafficMetered { - c.egressMeter.Mark(int64(n)) - } - c.lock.RUnlock() return n, err } -// handshakeDone is called after the connection passes the handshake. -func (c *meteredConn) handshakeDone(peer *Peer) { - if atomic.AddInt32(&meteredPeerCount, 1) >= MeteredPeerLimit { - // Don't register the peer in the traffic registries. - atomic.AddInt32(&meteredPeerCount, -1) - c.lock.Lock() - c.peer, c.trafficMetered = peer, false - c.lock.Unlock() - log.Warn("Metered peer count reached the limit") - } else { - enode := peer.Node().String() - c.lock.Lock() - c.peer, c.trafficMetered = peer, true - c.ingressMeter = metrics.NewRegisteredMeter(enode, PeerIngressRegistry) - c.egressMeter = metrics.NewRegisteredMeter(enode, PeerEgressRegistry) - c.lock.Unlock() - } - meteredPeerFeed.Send(MeteredPeerEvent{ - Type: PeerHandshakeSucceeded, - Addr: c.addr.String(), - Peer: peer, - Elapsed: time.Since(c.connected), - }) -} - // Close delegates a close operation to the underlying connection, unregisters // the peer from the traffic registries and emits close event. func (c *meteredConn) Close() error { err := c.Conn.Close() - c.lock.RLock() - if c.peer == nil { - // If the peer disconnects before/during the handshake. - c.lock.RUnlock() - meteredPeerFeed.Send(MeteredPeerEvent{ - Type: PeerHandshakeFailed, - Addr: c.addr.String(), - Elapsed: time.Since(c.connected), - }) - activePeerGauge.Dec(1) - return err - } - peer := c.peer - if !c.trafficMetered { - // If the peer isn't registered in the traffic registries. - c.lock.RUnlock() - meteredPeerFeed.Send(MeteredPeerEvent{ - Type: PeerDisconnected, - Addr: c.addr.String(), - Peer: peer, - }) + if err == nil { activePeerGauge.Dec(1) - return err } - ingress, egress, enode := uint64(c.ingressMeter.Count()), uint64(c.egressMeter.Count()), c.peer.Node().String() - c.lock.RUnlock() - - // Decrement the metered peer count - atomic.AddInt32(&meteredPeerCount, -1) - - // Unregister the peer from the traffic registries - PeerIngressRegistry.Unregister(enode) - PeerEgressRegistry.Unregister(enode) - - meteredPeerFeed.Send(MeteredPeerEvent{ - Type: PeerDisconnected, - Addr: c.addr.String(), - Peer: peer, - Ingress: ingress, - Egress: egress, - }) - activePeerGauge.Dec(1) return err } diff --git a/p2p/peer.go b/p2p/peer.go index 9a9788bc17..4398ad0f23 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -302,7 +302,8 @@ func (p *Peer) handle(msg Msg) error { return fmt.Errorf("msg code out of range: %v", msg.Code) } if metrics.Enabled { - metrics.GetOrRegisterMeter(fmt.Sprintf("%s/%s/%d/%#02x", MetricsInboundTraffic, proto.Name, proto.Version, msg.Code-proto.offset), nil).Mark(int64(msg.meterSize)) + m := fmt.Sprintf("%s/%s/%d/%#02x", ingressMeterName, proto.Name, proto.Version, msg.Code-proto.offset) + metrics.GetOrRegisterMeter(m, nil).Mark(int64(msg.meterSize)) } select { case proto.in <- msg: diff --git a/p2p/rlpx.go b/p2p/rlpx.go index c9ca6ea42f..c134aec1de 100644 --- a/p2p/rlpx.go +++ b/p2p/rlpx.go @@ -595,7 +595,8 @@ func (rw *rlpxFrameRW) WriteMsg(msg Msg) error { } msg.meterSize = msg.Size if metrics.Enabled && msg.meterCap.Name != "" { // don't meter non-subprotocol messages - metrics.GetOrRegisterMeter(fmt.Sprintf("%s/%s/%d/%#02x", MetricsOutboundTraffic, msg.meterCap.Name, msg.meterCap.Version, msg.meterCode), nil).Mark(int64(msg.meterSize)) + m := fmt.Sprintf("%s/%s/%d/%#02x", egressMeterName, msg.meterCap.Name, msg.meterCap.Version, msg.meterCode) + metrics.GetOrRegisterMeter(m, nil).Mark(int64(msg.meterSize)) } // write header headbuf := make([]byte, 32) diff --git a/p2p/server.go b/p2p/server.go index 1ed1be2ac6..c87b7758df 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -755,9 +755,6 @@ running: peers[c.node.ID()] = p srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", truncateName(c.name)) srv.dialsched.peerAdded(c) - if conn, ok := c.fd.(*meteredConn); ok { - conn.handshakeDone(p) - } if p.Inbound() { inboundCount++ }