From 96ae35e2ac8c360781407d7294081aabdcbb3652 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 24 Feb 2017 09:58:04 +0100 Subject: [PATCH] p2p, p2p/discover, p2p/nat: rework logging using context keys --- p2p/dial.go | 13 ++--- p2p/discover/database.go | 8 +-- p2p/discover/node.go | 5 ++ p2p/discover/ntp.go | 13 +---- p2p/discover/table.go | 14 ++--- p2p/discover/udp.go | 46 +++++++++------- p2p/nat/nat.go | 11 ++-- p2p/peer.go | 43 +++++++-------- p2p/peer_error.go | 34 +++++++----- p2p/peer_test.go | 20 ++++--- p2p/server.go | 115 ++++++++++++++++++++++----------------- 11 files changed, 171 insertions(+), 151 deletions(-) diff --git a/p2p/dial.go b/p2p/dial.go index 65180e0294..bb3befab24 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -133,7 +133,7 @@ func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now var newtasks []task addDial := func(flag connFlag, n *discover.Node) bool { if err := s.checkDial(n, peers); err != nil { - log.Debug(fmt.Sprintf("skipping dial candidate %x@%v:%d: %v", n.ID[:8], n.IP, n.TCP, err)) + log.Trace("Skipping dial candidate", "id", n.ID, "addr", &net.TCPAddr{IP: n.IP, Port: int(n.TCP)}, "err", err) return false } s.dialing[n.ID] = flag @@ -162,7 +162,7 @@ func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now err := s.checkDial(t.dest, peers) switch err { case errNotWhitelisted, errSelf: - log.Debug(fmt.Sprintf("removing static dial candidate %x@%v:%d: %v", t.dest.ID[:8], t.dest.IP, t.dest.TCP, err)) + log.Warn("Removing static dial candidate", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}, "err", err) delete(s.static, t.dest.ID) case nil: s.dialing[id] = t.flags @@ -266,7 +266,7 @@ func (t *dialTask) Do(srv *Server) { // The backoff delay resets when the node is found. func (t *dialTask) resolve(srv *Server) bool { if srv.ntab == nil { - log.Debug(fmt.Sprintf("can't resolve node %x: discovery is disabled", t.dest.ID[:6])) + log.Debug("Can't resolve node", "id", t.dest.ID, "err", "discovery is disabled") return false } if t.resolveDelay == 0 { @@ -282,23 +282,22 @@ func (t *dialTask) resolve(srv *Server) bool { if t.resolveDelay > maxResolveDelay { t.resolveDelay = maxResolveDelay } - log.Debug(fmt.Sprintf("resolving node %x failed (new delay: %v)", t.dest.ID[:6], t.resolveDelay)) + log.Debug("Resolving node failed", "id", t.dest.ID, "newdelay", t.resolveDelay) return false } // The node was found. t.resolveDelay = initialResolveDelay t.dest = resolved - log.Debug(fmt.Sprintf("resolved node %x: %v:%d", t.dest.ID[:6], t.dest.IP, t.dest.TCP)) + log.Debug("Resolved node", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}) return true } // dial performs the actual connection attempt. func (t *dialTask) dial(srv *Server, dest *discover.Node) bool { addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)} - log.Debug(fmt.Sprintf("dial tcp %v (%x)", addr, dest.ID[:6])) fd, err := srv.Dialer.Dial("tcp", addr.String()) if err != nil { - log.Trace(fmt.Sprintf("%v", err)) + log.Trace("Dial error", "task", t, "err", err) return false } mfd := newMeteredConn(fd, false) diff --git a/p2p/discover/database.go b/p2p/discover/database.go index a8b32d31e8..7206a63c63 100644 --- a/p2p/discover/database.go +++ b/p2p/discover/database.go @@ -23,7 +23,6 @@ import ( "bytes" "crypto/rand" "encoding/binary" - "fmt" "os" "sync" "time" @@ -180,12 +179,11 @@ func (db *nodeDB) storeInt64(key []byte, n int64) error { func (db *nodeDB) node(id NodeID) *Node { blob, err := db.lvl.Get(makeKey(id, nodeDBDiscoverRoot), nil) if err != nil { - log.Trace(fmt.Sprintf("failed to retrieve node %v: %v", id, err)) return nil } node := new(Node) if err := rlp.DecodeBytes(blob, node); err != nil { - log.Warn(fmt.Sprintf("failed to decode node RLP: %v", err)) + log.Error("Failed to decode node RLP", "err", err) return nil } node.sha = crypto.Keccak256Hash(node.ID[:]) @@ -233,7 +231,7 @@ func (db *nodeDB) expirer() { select { case <-tick: if err := db.expireNodes(); err != nil { - log.Error(fmt.Sprintf("Failed to expire nodedb items: %v", err)) + log.Error("Failed to expire nodedb items", "err", err) } case <-db.quit: @@ -352,7 +350,7 @@ func nextNode(it iterator.Iterator) *Node { } var n Node if err := rlp.DecodeBytes(it.Value(), &n); err != nil { - log.Warn(fmt.Sprintf("invalid node %x: %v", id, err)) + log.Warn("Failed to decode node RLP", "id", id, "err", err) continue } return &n diff --git a/p2p/discover/node.go b/p2p/discover/node.go index f0262762ef..6a7ab814e1 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -221,6 +221,11 @@ func (n NodeID) GoString() string { return fmt.Sprintf("discover.HexID(\"%x\")", n[:]) } +// TerminalString returns a shortened hex string for terminal logging. +func (n NodeID) TerminalString() string { + return hex.EncodeToString(n[:8]) +} + // HexID converts a hex string to a NodeID. // The string may be prefixed with 0x. func HexID(in string) (NodeID, error) { diff --git a/p2p/discover/ntp.go b/p2p/discover/ntp.go index df67e1c5bd..1bb52399fb 100644 --- a/p2p/discover/ntp.go +++ b/p2p/discover/ntp.go @@ -23,7 +23,6 @@ import ( "fmt" "net" "sort" - "strings" "time" "github.com/ethereum/go-ethereum/log" @@ -50,16 +49,10 @@ func checkClockDrift() { return } if drift < -driftThreshold || drift > driftThreshold { - warning := fmt.Sprintf("System clock seems off by %v, which can prevent network connectivity", drift) - howtofix := fmt.Sprintf("Please enable network time synchronisation in system settings") - separator := strings.Repeat("-", len(warning)) - - log.Warn(fmt.Sprint(separator)) - log.Warn(fmt.Sprint(warning)) - log.Warn(fmt.Sprint(howtofix)) - log.Warn(fmt.Sprint(separator)) + log.Warn(fmt.Sprintf("System clock seems off by %v, which can prevent network connectivity", drift)) + log.Warn("Please enable network time synchronisation in system settings.") } else { - log.Debug(fmt.Sprintf("Sanity NTP check reported %v drift, all ok", drift)) + log.Debug("NTP sanity check done", "drift", drift) } } diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 03392b5636..2f5a26c34c 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -277,10 +277,10 @@ func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node { // Bump the failure counter to detect and evacuate non-bonded entries fails := tab.db.findFails(n.ID) + 1 tab.db.updateFindFails(n.ID, fails) - log.Trace(fmt.Sprintf("Bumping failures for %x: %d", n.ID[:8], fails)) + log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails) if fails >= maxFindnodeFailures { - log.Trace(fmt.Sprintf("Evacuating node %x: %d findnode failures", n.ID[:8], fails)) + log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails) tab.delete(n) } } @@ -385,13 +385,11 @@ func (tab *Table) doRefresh(done chan struct{}) { seeds = tab.bondall(append(seeds, tab.nursery...)) if len(seeds) == 0 { - log.Debug(fmt.Sprintf("no seed nodes found")) + log.Debug("No discv4 seed nodes found") } for _, n := range seeds { - log.Debug("", "msg", log.Lazy{Fn: func() string { - age := time.Since(tab.db.lastPong(n.ID)) - return fmt.Sprintf("seed node (age %v): %v", age, n) - }}) + age := log.Lazy{Fn: func() time.Duration { return time.Since(tab.db.lastPong(n.ID)) }} + log.Trace("Found seed node in database", "id", n.ID, "addr", n.addr(), "age", age) } tab.mutex.Lock() tab.stuff(seeds) @@ -470,7 +468,7 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16 var result error age := time.Since(tab.db.lastPong(id)) if node == nil || fails > 0 || age > nodeDBNodeExpiration { - log.Trace(fmt.Sprintf("Bonding %x: known=%t, fails=%d age=%v", id[:8], node != nil, fails, age)) + log.Trace("Starting bonding ping/pong", "id", id, "known", node != nil, "failcount", fails, "age", age) tab.bondmu.Lock() w := tab.bonding[id] diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index eafc3f394e..93545e7d55 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -147,6 +147,7 @@ func nodeToRPC(n *Node) rpcNode { type packet interface { handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error + name() string } type conn interface { @@ -223,7 +224,7 @@ func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, nodeDBP if err != nil { return nil, err } - log.Info(fmt.Sprint("Listening,", tab.self)) + log.Debug("UDP listener up", "self", tab.self) return tab, nil } @@ -269,7 +270,7 @@ func (t *udp) close() { func (t *udp) ping(toid NodeID, toaddr *net.UDPAddr) error { // TODO: maybe check for ReplyTo field in callback to measure RTT errc := t.pending(toid, pongPacket, func(interface{}) bool { return true }) - t.send(toaddr, pingPacket, ping{ + t.send(toaddr, pingPacket, &ping{ Version: Version, From: t.ourEndpoint, To: makeEndpoint(toaddr, 0), // TODO: maybe use known TCP port from DB @@ -293,14 +294,14 @@ func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node nreceived++ n, err := t.nodeFromRPC(toaddr, rn) if err != nil { - log.Trace(fmt.Sprintf("invalid neighbor node (%v) from %v: %v", rn.IP, toaddr, err)) + log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", toaddr, "err", err) continue } nodes = append(nodes, n) } return nreceived >= bucketSize }) - t.send(toaddr, findnodePacket, findnode{ + t.send(toaddr, findnodePacket, &findnode{ Target: target, Expiration: uint64(time.Now().Add(expiration).Unix()), }) @@ -458,15 +459,13 @@ func init() { } } -func (t *udp) send(toaddr *net.UDPAddr, ptype byte, req interface{}) error { +func (t *udp) send(toaddr *net.UDPAddr, ptype byte, req packet) error { packet, err := encodePacket(t.priv, ptype, req) if err != nil { return err } - log.Trace(fmt.Sprintf(">>> %v %T", toaddr, req)) - if _, err = t.conn.WriteToUDP(packet, toaddr); err != nil { - log.Trace(fmt.Sprint("UDP send failed:", err)) - } + _, err = t.conn.WriteToUDP(packet, toaddr) + log.Trace(">> "+req.name(), "addr", toaddr, "err", err) return err } @@ -475,13 +474,13 @@ func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) ([]byte, b.Write(headSpace) b.WriteByte(ptype) if err := rlp.Encode(b, req); err != nil { - log.Error(fmt.Sprint("error encoding packet:", err)) + log.Error("Can't encode discv4 packet", "err", err) return nil, err } packet := b.Bytes() sig, err := crypto.Sign(crypto.Keccak256(packet[headSize:]), priv) if err != nil { - log.Error(fmt.Sprint("could not sign packet:", err)) + log.Error("Can't sign discv4 packet", "err", err) return nil, err } copy(packet[macSize:], sig) @@ -503,11 +502,11 @@ func (t *udp) readLoop() { nbytes, from, err := t.conn.ReadFromUDP(buf) if netutil.IsTemporaryError(err) { // Ignore temporary read errors. - log.Debug(fmt.Sprintf("Temporary read error: %v", err)) + log.Debug("Temporary UDP read error", "err", err) continue } else if err != nil { // Shut down the loop for permament errors. - log.Debug(fmt.Sprintf("Read error: %v", err)) + log.Debug("UDP read error", "err", err) return } t.handlePacket(from, buf[:nbytes]) @@ -517,14 +516,11 @@ func (t *udp) readLoop() { func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error { packet, fromID, hash, err := decodePacket(buf) if err != nil { - log.Debug(fmt.Sprintf("Bad packet from %v: %v", from, err)) + log.Debug("Bad discv4 packet", "addr", from, "err", err) return err } - status := "ok" - if err = packet.handle(t, from, fromID, hash); err != nil { - status = err.Error() - } - log.Trace(fmt.Sprintf("<<< %v %T: %s", from, packet, status)) + err = packet.handle(t, from, fromID, hash) + log.Trace("<< "+packet.name(), "addr", from, "err", err) return err } @@ -563,7 +559,7 @@ func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) er if expired(req.Expiration) { return errExpired } - t.send(from, pongPacket, pong{ + t.send(from, pongPacket, &pong{ To: makeEndpoint(from, req.From.TCP), ReplyTok: mac, Expiration: uint64(time.Now().Add(expiration).Unix()), @@ -575,6 +571,8 @@ func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) er return nil } +func (req *ping) name() string { return "PING/v4" } + func (req *pong) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error { if expired(req.Expiration) { return errExpired @@ -585,6 +583,8 @@ func (req *pong) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) er return nil } +func (req *pong) name() string { return "PONG/v4" } + func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error { if expired(req.Expiration) { return errExpired @@ -613,13 +613,15 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte } p.Nodes = append(p.Nodes, nodeToRPC(n)) if len(p.Nodes) == maxNeighbors || i == len(closest)-1 { - t.send(from, neighborsPacket, p) + t.send(from, neighborsPacket, &p) p.Nodes = p.Nodes[:0] } } return nil } +func (req *findnode) name() string { return "FINDNODE/v4" } + func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error { if expired(req.Expiration) { return errExpired @@ -630,6 +632,8 @@ func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byt return nil } +func (req *neighbors) name() string { return "NEIGHBORS/v4" } + func expired(ts uint64) bool { return time.Unix(int64(ts), 0).Before(time.Now()) } diff --git a/p2p/nat/nat.go b/p2p/nat/nat.go index e5883cf98d..a254648c66 100644 --- a/p2p/nat/nat.go +++ b/p2p/nat/nat.go @@ -98,16 +98,17 @@ const ( // Map adds a port mapping on m and keeps it alive until c is closed. // This function is typically invoked in its own goroutine. func Map(m Interface, c chan struct{}, protocol string, extport, intport int, name string) { + log := log.New("proto", protocol, "extport", extport, "intport", intport, "interface", m) refresh := time.NewTimer(mapUpdateInterval) defer func() { refresh.Stop() - log.Debug(fmt.Sprintf("deleting port mapping: %s %d -> %d (%s) using %s", protocol, extport, intport, name, m)) + log.Debug("Deleting port mapping") m.DeleteMapping(protocol, extport, intport) }() if err := m.AddMapping(protocol, extport, intport, name, mapTimeout); err != nil { - log.Debug(fmt.Sprintf("network port %s:%d could not be mapped: %v", protocol, intport, err)) + log.Debug("Couldn't add port mapping", "err", err) } else { - log.Info(fmt.Sprintf("mapped network port %s:%d -> %d (%s) using %s", protocol, extport, intport, name, m)) + log.Info("Mapped network port") } for { select { @@ -116,9 +117,9 @@ func Map(m Interface, c chan struct{}, protocol string, extport, intport int, na return } case <-refresh.C: - log.Trace(fmt.Sprintf("refresh port mapping %s:%d -> %d (%s) using %s", protocol, extport, intport, name, m)) + log.Trace("Refreshing port mapping") if err := m.AddMapping(protocol, extport, intport, name, mapTimeout); err != nil { - log.Debug(fmt.Sprintf("network port %s:%d could not be mapped: %v", protocol, intport, err)) + log.Debug("Couldn't add port mapping", "err", err) } refresh.Reset(mapUpdateInterval) } diff --git a/p2p/peer.go b/p2p/peer.go index 5d09927a5a..a9c20189a4 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -17,7 +17,6 @@ package p2p import ( - "errors" "fmt" "io" "net" @@ -25,6 +24,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/rlp" @@ -64,6 +64,8 @@ type protoHandshake struct { type Peer struct { rw *conn running map[string]*protoRW + log log.Logger + created mclock.AbsTime wg sync.WaitGroup protoErr chan error @@ -125,20 +127,25 @@ func newPeer(conn *conn, protocols []Protocol) *Peer { p := &Peer{ rw: conn, running: protomap, + created: mclock.Now(), disc: make(chan DiscReason), protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop closed: make(chan struct{}), + log: log.New("id", conn.id, "conn", conn.flags), } return p } -func (p *Peer) run() DiscReason { +func (p *Peer) Log() log.Logger { + return p.log +} + +func (p *Peer) run() (remoteRequested bool, err error) { var ( writeStart = make(chan struct{}, 1) writeErr = make(chan error, 1) readErr = make(chan error, 1) - reason DiscReason - requested bool + reason DiscReason // sent to the peer ) p.wg.Add(2) go p.readLoop(readErr) @@ -152,31 +159,26 @@ func (p *Peer) run() DiscReason { loop: for { select { - case err := <-writeErr: + case err = <-writeErr: // A write finished. Allow the next write to start if // there was no error. if err != nil { - log.Trace(fmt.Sprintf("%v: write error: %v", p, err)) reason = DiscNetworkError break loop } writeStart <- struct{}{} - case err := <-readErr: + case err = <-readErr: if r, ok := err.(DiscReason); ok { - log.Debug(fmt.Sprintf("%v: remote requested disconnect: %v", p, r)) - requested = true + remoteRequested = true reason = r } else { - log.Trace(fmt.Sprintf("%v: read error: %v", p, err)) reason = DiscNetworkError } break loop - case err := <-p.protoErr: + case err = <-p.protoErr: reason = discReasonForError(err) - log.Debug(fmt.Sprintf("%v: protocol error: %v (%v)", p, err, reason)) break loop - case reason = <-p.disc: - log.Debug(fmt.Sprintf("%v: locally requested disconnect: %v", p, reason)) + case err = <-p.disc: break loop } } @@ -184,10 +186,7 @@ loop: close(p.closed) p.rw.close(reason) p.wg.Wait() - if requested { - reason = DiscRequested - } - return reason + return remoteRequested, err } func (p *Peer) pingLoop() { @@ -297,14 +296,14 @@ func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) proto.closed = p.closed proto.wstart = writeStart proto.werr = writeErr - log.Trace(fmt.Sprintf("%v: Starting protocol %s/%d", p, proto.Name, proto.Version)) + p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version)) go func() { err := proto.Run(p, proto) if err == nil { - log.Trace(fmt.Sprintf("%v: Protocol %s/%d returned", p, proto.Name, proto.Version)) - err = errors.New("protocol returned") + p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version)) + err = errProtocolReturned } else if err != io.EOF { - log.Trace(fmt.Sprintf("%v: Protocol %s/%d error: %v", p, proto.Name, proto.Version, err)) + p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err) } p.protoErr <- err p.wg.Done() diff --git a/p2p/peer_error.go b/p2p/peer_error.go index 62c7b665dd..a1cddb707b 100644 --- a/p2p/peer_error.go +++ b/p2p/peer_error.go @@ -17,6 +17,7 @@ package p2p import ( + "errors" "fmt" ) @@ -51,6 +52,8 @@ func (self *peerError) Error() string { return self.message } +var errProtocolReturned = errors.New("protocol returned") + type DiscReason uint const ( @@ -70,24 +73,24 @@ const ( ) var discReasonToString = [...]string{ - DiscRequested: "Disconnect requested", - DiscNetworkError: "Network error", - DiscProtocolError: "Breach of protocol", - DiscUselessPeer: "Useless peer", - DiscTooManyPeers: "Too many peers", - DiscAlreadyConnected: "Already connected", - DiscIncompatibleVersion: "Incompatible P2P protocol version", - DiscInvalidIdentity: "Invalid node identity", - DiscQuitting: "Client quitting", - DiscUnexpectedIdentity: "Unexpected identity", - DiscSelf: "Connected to self", - DiscReadTimeout: "Read timeout", - DiscSubprotocolError: "Subprotocol error", + DiscRequested: "disconnect requested", + DiscNetworkError: "network error", + DiscProtocolError: "breach of protocol", + DiscUselessPeer: "useless peer", + DiscTooManyPeers: "too many peers", + DiscAlreadyConnected: "already connected", + DiscIncompatibleVersion: "incompatible p2p protocol version", + DiscInvalidIdentity: "invalid node identity", + DiscQuitting: "client quitting", + DiscUnexpectedIdentity: "unexpected identity", + DiscSelf: "connected to self", + DiscReadTimeout: "read timeout", + DiscSubprotocolError: "subprotocol error", } func (d DiscReason) String() string { if len(discReasonToString) < int(d) { - return fmt.Sprintf("Unknown Reason(%d)", d) + return fmt.Sprintf("unknown disconnect reason %d", d) } return discReasonToString[d] } @@ -100,6 +103,9 @@ func discReasonForError(err error) DiscReason { if reason, ok := err.(DiscReason); ok { return reason } + if err == errProtocolReturned { + return DiscQuitting + } peerError, ok := err.(*peerError) if ok { switch peerError.code { diff --git a/p2p/peer_test.go b/p2p/peer_test.go index f44300b15c..a3e1c74fd8 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -43,7 +43,7 @@ var discard = Protocol{ }, } -func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan DiscReason) { +func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan error) { fd1, fd2 := net.Pipe() c1 := &conn{fd: fd1, transport: newTestTransport(randomID(), fd1)} c2 := &conn{fd: fd2, transport: newTestTransport(randomID(), fd2)} @@ -53,15 +53,17 @@ func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan DiscReason) { } peer := newPeer(c1, protos) - errc := make(chan DiscReason, 1) - go func() { errc <- peer.run() }() + errc := make(chan error, 1) + go func() { + _, err := peer.run() + errc <- err + }() closer := func() { c2.close(errors.New("close func called")) } return closer, c2, peer, errc } func TestPeerProtoReadMsg(t *testing.T) { - done := make(chan struct{}) proto := Protocol{ Name: "a", Length: 5, @@ -75,7 +77,6 @@ func TestPeerProtoReadMsg(t *testing.T) { if err := ExpectMsg(rw, 4, []uint{3}); err != nil { t.Error(err) } - close(done) return nil }, } @@ -88,9 +89,10 @@ func TestPeerProtoReadMsg(t *testing.T) { Send(rw, baseProtocolLength+4, []uint{3}) select { - case <-done: case err := <-errc: - t.Errorf("peer returned: %v", err) + if err != errProtocolReturned { + t.Errorf("peer returned error: %v", err) + } case <-time.After(2 * time.Second): t.Errorf("receive timeout") } @@ -137,8 +139,8 @@ func TestPeerDisconnect(t *testing.T) { } select { case reason := <-disc: - if reason != DiscRequested { - t.Errorf("run returned wrong reason: got %v, want %v", reason, DiscRequested) + if reason != DiscQuitting { + t.Errorf("run returned wrong reason: got %v, want %v", reason, DiscQuitting) } case <-time.After(500 * time.Millisecond): t.Error("peer did not return") diff --git a/p2p/server.go b/p2p/server.go index 9f1478a410..48b4e8be3d 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -25,6 +25,8 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discv5" @@ -162,12 +164,18 @@ type Server struct { removestatic chan *discover.Node posthandshake chan *conn addpeer chan *conn - delpeer chan *Peer + delpeer chan peerDrop loopWG sync.WaitGroup // loop, listenLoop } type peerOpFunc func(map[discover.NodeID]*Peer) +type peerDrop struct { + *Peer + err error + requested bool // true if signaled by the peer +} + type connFlag int const ( @@ -204,9 +212,9 @@ type transport interface { } func (c *conn) String() string { - s := c.flags.String() + " conn" + s := c.flags.String() if (c.id != discover.NodeID{}) { - s += fmt.Sprintf(" %x", c.id[:8]) + s += " " + c.id.String() } s += " " + c.fd.RemoteAddr().String() return s @@ -215,16 +223,16 @@ func (c *conn) String() string { func (f connFlag) String() string { s := "" if f&trustedConn != 0 { - s += " trusted" + s += "-trusted" } if f&dynDialedConn != 0 { - s += " dyn dial" + s += "-dyndial" } if f&staticDialedConn != 0 { - s += " static dial" + s += "-staticdial" } if f&inboundConn != 0 { - s += " inbound" + s += "-inbound" } if s != "" { s = s[1:] @@ -288,26 +296,30 @@ func (srv *Server) Self() *discover.Node { srv.lock.Lock() defer srv.lock.Unlock() - // If the server's not running, return an empty node if !srv.running { return &discover.Node{IP: net.ParseIP("0.0.0.0")} } - // If the node is running but discovery is off, manually assemble the node infos - if srv.ntab == nil { - // Inbound connections disabled, use zero address - if srv.listener == nil { + return srv.makeSelf(srv.listener, srv.ntab) +} + +func (srv *Server) makeSelf(listener net.Listener, ntab discoverTable) *discover.Node { + // If the server's not running, return an empty node. + // If the node is running but discovery is off, manually assemble the node infos. + if ntab == nil { + // Inbound connections disabled, use zero address. + if listener == nil { return &discover.Node{IP: net.ParseIP("0.0.0.0"), ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)} } // Otherwise inject the listener address too - addr := srv.listener.Addr().(*net.TCPAddr) + addr := listener.Addr().(*net.TCPAddr) return &discover.Node{ ID: discover.PubkeyID(&srv.PrivateKey.PublicKey), IP: addr.IP, TCP: uint16(addr.Port), } } - // Otherwise return the live node infos - return srv.ntab.Self() + // Otherwise return the discovery node. + return ntab.Self() } // Stop terminates the server and all active peer connections. @@ -336,7 +348,7 @@ func (srv *Server) Start() (err error) { return errors.New("server already running") } srv.running = true - log.Info(fmt.Sprint("Starting Server")) + log.Info("Starting P2P networking") // static fields if srv.PrivateKey == nil { @@ -350,7 +362,7 @@ func (srv *Server) Start() (err error) { } srv.quit = make(chan struct{}) srv.addpeer = make(chan *conn) - srv.delpeer = make(chan *Peer) + srv.delpeer = make(chan peerDrop) srv.posthandshake = make(chan *conn) srv.addstatic = make(chan *discover.Node) srv.removestatic = make(chan *discover.Node) @@ -398,7 +410,7 @@ func (srv *Server) Start() (err error) { } } if srv.NoDial && srv.ListenAddr == "" { - log.Warn(fmt.Sprint("I will be kind-of useless, neither dialing nor listening.")) + log.Warn("P2P server will be useless, neither dialing nor listening") } srv.loopWG.Add(1) @@ -466,7 +478,7 @@ func (srv *Server) run(dialstate dialer) { i := 0 for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ { t := ts[i] - log.Trace(fmt.Sprint("new task:", t)) + log.Trace("New dial task", "task", t) go func() { t.Do(srv); taskdone <- t }() runningTasks = append(runningTasks, t) } @@ -489,19 +501,18 @@ running: select { case <-srv.quit: // The server was stopped. Run the cleanup logic. - log.Trace(fmt.Sprint("<-quit: spinning down")) break running case n := <-srv.addstatic: // This channel is used by AddPeer to add to the // ephemeral static peer list. Add it to the dialer, // it will keep the node connected. - log.Trace(fmt.Sprint("<-addstatic:", n)) + log.Debug("Adding static node", "node", n) dialstate.addStatic(n) case n := <-srv.removestatic: // This channel is used by RemovePeer to send a // disconnect request to a peer and begin the // stop keeping the node connected - log.Trace(fmt.Sprint("<-removestatic:", n)) + log.Debug("Removing static node", "node", n) dialstate.removeStatic(n) if p, ok := peers[n.ID]; ok { p.Disconnect(DiscRequested) @@ -514,7 +525,7 @@ running: // A task got done. Tell dialstate about it so it // can update its state and remove it from the active // tasks list. - log.Trace(fmt.Sprint("<-taskdone:", t)) + log.Trace("Dial task done", "task", t) dialstate.taskDone(t, time.Now()) delTask(t) case c := <-srv.posthandshake: @@ -524,19 +535,17 @@ running: // Ensure that the trusted flag is set before checking against MaxPeers. c.flags |= trustedConn } - log.Trace(fmt.Sprint("<-posthandshake:", c)) // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them. c.cont <- srv.encHandshakeChecks(peers, c) case c := <-srv.addpeer: // At this point the connection is past the protocol handshake. // Its capabilities are known and the remote identity is verified. - log.Trace(fmt.Sprint("<-addpeer:", c)) err := srv.protoHandshakeChecks(peers, c) - if err != nil { - log.Trace(fmt.Sprintf("Not adding %v as peer: %v", c, err)) - } else { + if err == nil { // The handshakes are done and it passed all checks. p := newPeer(c, srv.Protocols) + name := truncateName(c.name) + log.Debug("Adding p2p peer", "id", c.id, "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) peers[c.id] = p go srv.runPeer(p) } @@ -544,13 +553,16 @@ running: // dial tasks complete after the peer has been added or // discarded. Unblock the task last. c.cont <- err - case p := <-srv.delpeer: + case pd := <-srv.delpeer: // A peer disconnected. - log.Trace(fmt.Sprint("<-delpeer:", p)) - delete(peers, p.ID()) + d := common.PrettyDuration(mclock.Now() - pd.created) + pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err) + delete(peers, pd.ID()) } } + log.Trace("P2P networking is spinning down") + // Terminate discovery. If there is a running lookup it will terminate soon. if srv.ntab != nil { srv.ntab.Close() @@ -565,10 +577,9 @@ running: // Wait for peers to shut down. Pending connections and tasks are // not handled here and will terminate soon-ish because srv.quit // is closed. - log.Trace(fmt.Sprintf("ignoring %d pending tasks at spindown", len(runningTasks))) for len(peers) > 0 { p := <-srv.delpeer - log.Trace(fmt.Sprint("<-delpeer (spindown):", p)) + p.log.Trace("<-delpeer (spindown)", "remainingTasks", len(runningTasks)) delete(peers, p.ID()) } } @@ -604,7 +615,7 @@ type tempError interface { // inbound connections. func (srv *Server) listenLoop() { defer srv.loopWG.Done() - log.Info(fmt.Sprint("Listening on", srv.listener.Addr())) + log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab)) // This channel acts as a semaphore limiting // active inbound connections that are lingering pre-handshake. @@ -629,10 +640,10 @@ func (srv *Server) listenLoop() { for { fd, err = srv.listener.Accept() if tempErr, ok := err.(tempError); ok && tempErr.Temporary() { - log.Debug(fmt.Sprintf("Temporary read error: %v", err)) + log.Debug("Temporary read error", "err", err) continue } else if err != nil { - log.Debug(fmt.Sprintf("Read error: %v", err)) + log.Debug("Read error", "err", err) return } break @@ -641,7 +652,7 @@ func (srv *Server) listenLoop() { // Reject connections that do not match NetRestrict. if srv.NetRestrict != nil { if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) { - log.Debug(fmt.Sprintf("Rejected conn %v because it is not whitelisted in NetRestrict", fd.RemoteAddr())) + log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr()) fd.Close() slots <- struct{}{} continue @@ -649,7 +660,7 @@ func (srv *Server) listenLoop() { } fd = newMeteredConn(fd, true) - log.Debug(fmt.Sprintf("Accepted conn %v", fd.RemoteAddr())) + log.Trace("Accepted connection", "addr", fd.RemoteAddr()) // Spawn the handler. It will give the slot back when the connection // has been established. @@ -676,36 +687,37 @@ func (srv *Server) setupConn(fd net.Conn, flags connFlag, dialDest *discover.Nod // Run the encryption handshake. var err error if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil { - log.Debug(fmt.Sprintf("%v faild enc handshake: %v", c, err)) + log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) c.close(err) return } + clog := log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags) // For dialed connections, check that the remote public key matches. if dialDest != nil && c.id != dialDest.ID { c.close(DiscUnexpectedIdentity) - log.Debug(fmt.Sprintf("%v dialed identity mismatch, want %x", c, dialDest.ID[:8])) + clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID) return } if err := srv.checkpoint(c, srv.posthandshake); err != nil { - log.Debug(fmt.Sprintf("%v failed checkpoint posthandshake: %v", c, err)) + clog.Trace("Rejected peer before protocol handshake", "err", err) c.close(err) return } // Run the protocol handshake phs, err := c.doProtoHandshake(srv.ourHandshake) if err != nil { - log.Debug(fmt.Sprintf("%v failed proto handshake: %v", c, err)) + clog.Trace("Failed proto handshake", "err", err) c.close(err) return } if phs.ID != c.id { - log.Debug(fmt.Sprintf("%v wrong proto handshake identity: %x", c, phs.ID[:8])) + clog.Trace("Wrong devp2p handshake identity", "err", phs.ID) c.close(DiscUnexpectedIdentity) return } c.caps, c.name = phs.Caps, phs.Name if err := srv.checkpoint(c, srv.addpeer); err != nil { - log.Debug(fmt.Sprintf("%v failed checkpoint addpeer: %v", c, err)) + clog.Trace("Rejected peer", "err", err) c.close(err) return } @@ -713,6 +725,13 @@ func (srv *Server) setupConn(fd net.Conn, flags connFlag, dialDest *discover.Nod // launched by run. } +func truncateName(s string) string { + if len(s) > 20 { + return s[:20] + "..." + } + return s +} + // checkpoint sends the conn to run, which performs the // post-handshake checks for the stage (posthandshake, addpeer). func (srv *Server) checkpoint(c *conn, stage chan<- *conn) error { @@ -733,17 +752,13 @@ func (srv *Server) checkpoint(c *conn, stage chan<- *conn) error { // it waits until the Peer logic returns and removes // the peer. func (srv *Server) runPeer(p *Peer) { - log.Debug(fmt.Sprintf("Added %v", p)) - if srv.newPeerHook != nil { srv.newPeerHook(p) } - discreason := p.run() + remoteRequested, err := p.run() // Note: run waits for existing peers to be sent on srv.delpeer // before returning, so this send should not select on srv.quit. - srv.delpeer <- p - - log.Debug(fmt.Sprintf("Removed %v (%v)", p, discreason)) + srv.delpeer <- peerDrop{p, err, remoteRequested} } // NodeInfo represents a short summary of the information known about the host.