From 7d5ff770e22a3791c0f9c2794a19f59ca2756b33 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 19 Aug 2015 14:11:12 +0200 Subject: [PATCH 1/3] p2p/discover: continue reading after temporary errors Might solve #1579 --- p2p/discover/udp.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index 008e63937d..6aefb68f7e 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -458,6 +458,10 @@ func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) ([]byte, return packet, nil } +type tempError interface { + Temporary() bool +} + // readLoop runs in its own goroutine. it handles incoming UDP packets. func (t *udp) readLoop() { defer t.conn.Close() @@ -467,7 +471,13 @@ func (t *udp) readLoop() { buf := make([]byte, 1280) for { nbytes, from, err := t.conn.ReadFromUDP(buf) - if err != nil { + if tempErr, ok := err.(tempError); ok && tempErr.Temporary() { + // Ignore temporary read errors. + glog.V(logger.Debug).Infof("Temporary read error: %v", err) + continue + } else if err != nil { + // Shut down the loop for permament errors. + glog.V(logger.Debug).Infof("Read error: %v", err) return } t.handlePacket(from, buf[:nbytes]) From edccc7ae3430836141b803c252f26bf1ef98d185 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 19 Aug 2015 14:35:01 +0200 Subject: [PATCH 2/3] p2p: continue listening after temporary errors --- p2p/server.go | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/p2p/server.go b/p2p/server.go index 7351a26544..d8be853230 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -542,6 +542,10 @@ func (srv *Server) encHandshakeChecks(peers map[discover.NodeID]*Peer, c *conn) } } +type tempError interface { + Temporary() bool +} + // listenLoop runs in its own goroutine and accepts // inbound connections. func (srv *Server) listenLoop() { @@ -561,16 +565,31 @@ func (srv *Server) listenLoop() { } for { + // Wait for a handshake slot before accepting. <-slots - fd, err := srv.listener.Accept() - if err != nil { - return + + var ( + fd net.Conn + err error + ) + for { + fd, err = srv.listener.Accept() + if tempErr, ok := err.(tempError); ok && tempErr.Temporary() { + glog.V(logger.Debug).Infof("Temporary read error: %v", err) + continue + } else if err != nil { + glog.V(logger.Debug).Infof("Read error: %v", err) + return + } + break } - mfd := newMeteredConn(fd, true) + fd = newMeteredConn(fd, true) + glog.V(logger.Debug).Infof("Accepted conn %v\n", fd.RemoteAddr()) - glog.V(logger.Debug).Infof("Accepted conn %v\n", mfd.RemoteAddr()) + // Spawn the handler. It will give the slot back when the connection + // has been established. go func() { - srv.setupConn(mfd, inboundConn, nil) + srv.setupConn(fd, inboundConn, nil) slots <- struct{}{} }() } From dd54fef89888372ab5961c1b5a6ac917fc47d49c Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Mon, 17 Aug 2015 11:27:41 +0200 Subject: [PATCH 3/3] p2p/discover: don't attempt to replace nodes that are being replaced PR #1621 changed Table locking so the mutex is not held while a contested node is being pinged. If multiple nodes ping the local node during this time window, multiple ping packets will be sent to the contested node. The changes in this commit prevent multiple packets by tracking whether the node is being replaced. --- p2p/discover/node.go | 4 ++++ p2p/discover/table.go | 15 +++++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/p2p/discover/node.go b/p2p/discover/node.go index b6956e197d..a14f294249 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -48,6 +48,10 @@ type Node struct { // In those tests, the content of sha will not actually correspond // with ID. sha common.Hash + + // whether this node is currently being pinged in order to replace + // it in a bucket + contested bool } func newNode(id NodeID, ip net.IP, udpPort, tcpPort uint16) *Node { diff --git a/p2p/discover/table.go b/p2p/discover/table.go index b077f010c7..972bc10777 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -455,24 +455,31 @@ func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error { func (tab *Table) add(new *Node) { b := tab.buckets[logdist(tab.self.sha, new.sha)] tab.mutex.Lock() + defer tab.mutex.Unlock() if b.bump(new) { - tab.mutex.Unlock() return } var oldest *Node if len(b.entries) == bucketSize { oldest = b.entries[bucketSize-1] + if oldest.contested { + // The node is already being replaced, don't attempt + // to replace it. + return + } + oldest.contested = true // Let go of the mutex so other goroutines can access // the table while we ping the least recently active node. tab.mutex.Unlock() - if err := tab.ping(oldest.ID, oldest.addr()); err == nil { + err := tab.ping(oldest.ID, oldest.addr()) + tab.mutex.Lock() + oldest.contested = false + if err == nil { // The node responded, don't replace it. return } - tab.mutex.Lock() } added := b.replace(new, oldest) - tab.mutex.Unlock() if added && tab.nodeAddedHook != nil { tab.nodeAddedHook(new) }