Merge pull request #1689 from fjl/discover-ignore-temp-errors

p2p, p2p/discover: small fixes
pull/1318/head
Jeffrey Wilcke 9 years ago
commit 61a6911eeb
  1. 4
      p2p/discover/node.go
  2. 15
      p2p/discover/table.go
  3. 12
      p2p/discover/udp.go
  4. 29
      p2p/server.go

@ -48,6 +48,10 @@ type Node struct {
// In those tests, the content of sha will not actually correspond // In those tests, the content of sha will not actually correspond
// with ID. // with ID.
sha common.Hash sha common.Hash
// whether this node is currently being pinged in order to replace
// it in a bucket
contested bool
} }
func newNode(id NodeID, ip net.IP, udpPort, tcpPort uint16) *Node { func newNode(id NodeID, ip net.IP, udpPort, tcpPort uint16) *Node {

@ -455,24 +455,31 @@ func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
func (tab *Table) add(new *Node) { func (tab *Table) add(new *Node) {
b := tab.buckets[logdist(tab.self.sha, new.sha)] b := tab.buckets[logdist(tab.self.sha, new.sha)]
tab.mutex.Lock() tab.mutex.Lock()
defer tab.mutex.Unlock()
if b.bump(new) { if b.bump(new) {
tab.mutex.Unlock()
return return
} }
var oldest *Node var oldest *Node
if len(b.entries) == bucketSize { if len(b.entries) == bucketSize {
oldest = b.entries[bucketSize-1] oldest = b.entries[bucketSize-1]
if oldest.contested {
// The node is already being replaced, don't attempt
// to replace it.
return
}
oldest.contested = true
// Let go of the mutex so other goroutines can access // Let go of the mutex so other goroutines can access
// the table while we ping the least recently active node. // the table while we ping the least recently active node.
tab.mutex.Unlock() tab.mutex.Unlock()
if err := tab.ping(oldest.ID, oldest.addr()); err == nil { err := tab.ping(oldest.ID, oldest.addr())
tab.mutex.Lock()
oldest.contested = false
if err == nil {
// The node responded, don't replace it. // The node responded, don't replace it.
return return
} }
tab.mutex.Lock()
} }
added := b.replace(new, oldest) added := b.replace(new, oldest)
tab.mutex.Unlock()
if added && tab.nodeAddedHook != nil { if added && tab.nodeAddedHook != nil {
tab.nodeAddedHook(new) tab.nodeAddedHook(new)
} }

@ -458,6 +458,10 @@ func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) ([]byte,
return packet, nil return packet, nil
} }
type tempError interface {
Temporary() bool
}
// readLoop runs in its own goroutine. it handles incoming UDP packets. // readLoop runs in its own goroutine. it handles incoming UDP packets.
func (t *udp) readLoop() { func (t *udp) readLoop() {
defer t.conn.Close() defer t.conn.Close()
@ -467,7 +471,13 @@ func (t *udp) readLoop() {
buf := make([]byte, 1280) buf := make([]byte, 1280)
for { for {
nbytes, from, err := t.conn.ReadFromUDP(buf) nbytes, from, err := t.conn.ReadFromUDP(buf)
if err != nil { if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
// Ignore temporary read errors.
glog.V(logger.Debug).Infof("Temporary read error: %v", err)
continue
} else if err != nil {
// Shut down the loop for permament errors.
glog.V(logger.Debug).Infof("Read error: %v", err)
return return
} }
t.handlePacket(from, buf[:nbytes]) t.handlePacket(from, buf[:nbytes])

@ -542,6 +542,10 @@ func (srv *Server) encHandshakeChecks(peers map[discover.NodeID]*Peer, c *conn)
} }
} }
type tempError interface {
Temporary() bool
}
// listenLoop runs in its own goroutine and accepts // listenLoop runs in its own goroutine and accepts
// inbound connections. // inbound connections.
func (srv *Server) listenLoop() { func (srv *Server) listenLoop() {
@ -561,16 +565,31 @@ func (srv *Server) listenLoop() {
} }
for { for {
// Wait for a handshake slot before accepting.
<-slots <-slots
fd, err := srv.listener.Accept()
if err != nil { var (
fd net.Conn
err error
)
for {
fd, err = srv.listener.Accept()
if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
glog.V(logger.Debug).Infof("Temporary read error: %v", err)
continue
} else if err != nil {
glog.V(logger.Debug).Infof("Read error: %v", err)
return return
} }
mfd := newMeteredConn(fd, true) break
}
fd = newMeteredConn(fd, true)
glog.V(logger.Debug).Infof("Accepted conn %v\n", fd.RemoteAddr())
glog.V(logger.Debug).Infof("Accepted conn %v\n", mfd.RemoteAddr()) // Spawn the handler. It will give the slot back when the connection
// has been established.
go func() { go func() {
srv.setupConn(mfd, inboundConn, nil) srv.setupConn(fd, inboundConn, nil)
slots <- struct{}{} slots <- struct{}{}
}() }()
} }

Loading…
Cancel
Save