|
|
|
@ -25,6 +25,8 @@ import ( |
|
|
|
|
"sync" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"github.com/ethereum/go-ethereum/common" |
|
|
|
|
"github.com/ethereum/go-ethereum/common/mclock" |
|
|
|
|
"github.com/ethereum/go-ethereum/log" |
|
|
|
|
"github.com/ethereum/go-ethereum/p2p/discover" |
|
|
|
|
"github.com/ethereum/go-ethereum/p2p/discv5" |
|
|
|
@ -162,12 +164,18 @@ type Server struct { |
|
|
|
|
removestatic chan *discover.Node |
|
|
|
|
posthandshake chan *conn |
|
|
|
|
addpeer chan *conn |
|
|
|
|
delpeer chan *Peer |
|
|
|
|
delpeer chan peerDrop |
|
|
|
|
loopWG sync.WaitGroup // loop, listenLoop
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type peerOpFunc func(map[discover.NodeID]*Peer) |
|
|
|
|
|
|
|
|
|
type peerDrop struct { |
|
|
|
|
*Peer |
|
|
|
|
err error |
|
|
|
|
requested bool // true if signaled by the peer
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type connFlag int |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
@ -204,9 +212,9 @@ type transport interface { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (c *conn) String() string { |
|
|
|
|
s := c.flags.String() + " conn" |
|
|
|
|
s := c.flags.String() |
|
|
|
|
if (c.id != discover.NodeID{}) { |
|
|
|
|
s += fmt.Sprintf(" %x", c.id[:8]) |
|
|
|
|
s += " " + c.id.String() |
|
|
|
|
} |
|
|
|
|
s += " " + c.fd.RemoteAddr().String() |
|
|
|
|
return s |
|
|
|
@ -215,16 +223,16 @@ func (c *conn) String() string { |
|
|
|
|
func (f connFlag) String() string { |
|
|
|
|
s := "" |
|
|
|
|
if f&trustedConn != 0 { |
|
|
|
|
s += " trusted" |
|
|
|
|
s += "-trusted" |
|
|
|
|
} |
|
|
|
|
if f&dynDialedConn != 0 { |
|
|
|
|
s += " dyn dial" |
|
|
|
|
s += "-dyndial" |
|
|
|
|
} |
|
|
|
|
if f&staticDialedConn != 0 { |
|
|
|
|
s += " static dial" |
|
|
|
|
s += "-staticdial" |
|
|
|
|
} |
|
|
|
|
if f&inboundConn != 0 { |
|
|
|
|
s += " inbound" |
|
|
|
|
s += "-inbound" |
|
|
|
|
} |
|
|
|
|
if s != "" { |
|
|
|
|
s = s[1:] |
|
|
|
@ -288,26 +296,30 @@ func (srv *Server) Self() *discover.Node { |
|
|
|
|
srv.lock.Lock() |
|
|
|
|
defer srv.lock.Unlock() |
|
|
|
|
|
|
|
|
|
// If the server's not running, return an empty node
|
|
|
|
|
if !srv.running { |
|
|
|
|
return &discover.Node{IP: net.ParseIP("0.0.0.0")} |
|
|
|
|
} |
|
|
|
|
// If the node is running but discovery is off, manually assemble the node infos
|
|
|
|
|
if srv.ntab == nil { |
|
|
|
|
// Inbound connections disabled, use zero address
|
|
|
|
|
if srv.listener == nil { |
|
|
|
|
return srv.makeSelf(srv.listener, srv.ntab) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (srv *Server) makeSelf(listener net.Listener, ntab discoverTable) *discover.Node { |
|
|
|
|
// If the server's not running, return an empty node.
|
|
|
|
|
// If the node is running but discovery is off, manually assemble the node infos.
|
|
|
|
|
if ntab == nil { |
|
|
|
|
// Inbound connections disabled, use zero address.
|
|
|
|
|
if listener == nil { |
|
|
|
|
return &discover.Node{IP: net.ParseIP("0.0.0.0"), ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)} |
|
|
|
|
} |
|
|
|
|
// Otherwise inject the listener address too
|
|
|
|
|
addr := srv.listener.Addr().(*net.TCPAddr) |
|
|
|
|
addr := listener.Addr().(*net.TCPAddr) |
|
|
|
|
return &discover.Node{ |
|
|
|
|
ID: discover.PubkeyID(&srv.PrivateKey.PublicKey), |
|
|
|
|
IP: addr.IP, |
|
|
|
|
TCP: uint16(addr.Port), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Otherwise return the live node infos
|
|
|
|
|
return srv.ntab.Self() |
|
|
|
|
// Otherwise return the discovery node.
|
|
|
|
|
return ntab.Self() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Stop terminates the server and all active peer connections.
|
|
|
|
@ -336,7 +348,7 @@ func (srv *Server) Start() (err error) { |
|
|
|
|
return errors.New("server already running") |
|
|
|
|
} |
|
|
|
|
srv.running = true |
|
|
|
|
log.Info(fmt.Sprint("Starting Server")) |
|
|
|
|
log.Info("Starting P2P networking") |
|
|
|
|
|
|
|
|
|
// static fields
|
|
|
|
|
if srv.PrivateKey == nil { |
|
|
|
@ -350,7 +362,7 @@ func (srv *Server) Start() (err error) { |
|
|
|
|
} |
|
|
|
|
srv.quit = make(chan struct{}) |
|
|
|
|
srv.addpeer = make(chan *conn) |
|
|
|
|
srv.delpeer = make(chan *Peer) |
|
|
|
|
srv.delpeer = make(chan peerDrop) |
|
|
|
|
srv.posthandshake = make(chan *conn) |
|
|
|
|
srv.addstatic = make(chan *discover.Node) |
|
|
|
|
srv.removestatic = make(chan *discover.Node) |
|
|
|
@ -398,7 +410,7 @@ func (srv *Server) Start() (err error) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if srv.NoDial && srv.ListenAddr == "" { |
|
|
|
|
log.Warn(fmt.Sprint("I will be kind-of useless, neither dialing nor listening.")) |
|
|
|
|
log.Warn("P2P server will be useless, neither dialing nor listening") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
srv.loopWG.Add(1) |
|
|
|
@ -466,7 +478,7 @@ func (srv *Server) run(dialstate dialer) { |
|
|
|
|
i := 0 |
|
|
|
|
for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ { |
|
|
|
|
t := ts[i] |
|
|
|
|
log.Trace(fmt.Sprint("new task:", t)) |
|
|
|
|
log.Trace("New dial task", "task", t) |
|
|
|
|
go func() { t.Do(srv); taskdone <- t }() |
|
|
|
|
runningTasks = append(runningTasks, t) |
|
|
|
|
} |
|
|
|
@ -489,19 +501,18 @@ running: |
|
|
|
|
select { |
|
|
|
|
case <-srv.quit: |
|
|
|
|
// The server was stopped. Run the cleanup logic.
|
|
|
|
|
log.Trace(fmt.Sprint("<-quit: spinning down")) |
|
|
|
|
break running |
|
|
|
|
case n := <-srv.addstatic: |
|
|
|
|
// This channel is used by AddPeer to add to the
|
|
|
|
|
// ephemeral static peer list. Add it to the dialer,
|
|
|
|
|
// it will keep the node connected.
|
|
|
|
|
log.Trace(fmt.Sprint("<-addstatic:", n)) |
|
|
|
|
log.Debug("Adding static node", "node", n) |
|
|
|
|
dialstate.addStatic(n) |
|
|
|
|
case n := <-srv.removestatic: |
|
|
|
|
// This channel is used by RemovePeer to send a
|
|
|
|
|
// disconnect request to a peer and begin the
|
|
|
|
|
// stop keeping the node connected
|
|
|
|
|
log.Trace(fmt.Sprint("<-removestatic:", n)) |
|
|
|
|
log.Debug("Removing static node", "node", n) |
|
|
|
|
dialstate.removeStatic(n) |
|
|
|
|
if p, ok := peers[n.ID]; ok { |
|
|
|
|
p.Disconnect(DiscRequested) |
|
|
|
@ -514,7 +525,7 @@ running: |
|
|
|
|
// A task got done. Tell dialstate about it so it
|
|
|
|
|
// can update its state and remove it from the active
|
|
|
|
|
// tasks list.
|
|
|
|
|
log.Trace(fmt.Sprint("<-taskdone:", t)) |
|
|
|
|
log.Trace("Dial task done", "task", t) |
|
|
|
|
dialstate.taskDone(t, time.Now()) |
|
|
|
|
delTask(t) |
|
|
|
|
case c := <-srv.posthandshake: |
|
|
|
@ -524,19 +535,17 @@ running: |
|
|
|
|
// Ensure that the trusted flag is set before checking against MaxPeers.
|
|
|
|
|
c.flags |= trustedConn |
|
|
|
|
} |
|
|
|
|
log.Trace(fmt.Sprint("<-posthandshake:", c)) |
|
|
|
|
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
|
|
|
|
|
c.cont <- srv.encHandshakeChecks(peers, c) |
|
|
|
|
case c := <-srv.addpeer: |
|
|
|
|
// At this point the connection is past the protocol handshake.
|
|
|
|
|
// Its capabilities are known and the remote identity is verified.
|
|
|
|
|
log.Trace(fmt.Sprint("<-addpeer:", c)) |
|
|
|
|
err := srv.protoHandshakeChecks(peers, c) |
|
|
|
|
if err != nil { |
|
|
|
|
log.Trace(fmt.Sprintf("Not adding %v as peer: %v", c, err)) |
|
|
|
|
} else { |
|
|
|
|
if err == nil { |
|
|
|
|
// The handshakes are done and it passed all checks.
|
|
|
|
|
p := newPeer(c, srv.Protocols) |
|
|
|
|
name := truncateName(c.name) |
|
|
|
|
log.Debug("Adding p2p peer", "id", c.id, "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) |
|
|
|
|
peers[c.id] = p |
|
|
|
|
go srv.runPeer(p) |
|
|
|
|
} |
|
|
|
@ -544,13 +553,16 @@ running: |
|
|
|
|
// dial tasks complete after the peer has been added or
|
|
|
|
|
// discarded. Unblock the task last.
|
|
|
|
|
c.cont <- err |
|
|
|
|
case p := <-srv.delpeer: |
|
|
|
|
case pd := <-srv.delpeer: |
|
|
|
|
// A peer disconnected.
|
|
|
|
|
log.Trace(fmt.Sprint("<-delpeer:", p)) |
|
|
|
|
delete(peers, p.ID()) |
|
|
|
|
d := common.PrettyDuration(mclock.Now() - pd.created) |
|
|
|
|
pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err) |
|
|
|
|
delete(peers, pd.ID()) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
log.Trace("P2P networking is spinning down") |
|
|
|
|
|
|
|
|
|
// Terminate discovery. If there is a running lookup it will terminate soon.
|
|
|
|
|
if srv.ntab != nil { |
|
|
|
|
srv.ntab.Close() |
|
|
|
@ -565,10 +577,9 @@ running: |
|
|
|
|
// Wait for peers to shut down. Pending connections and tasks are
|
|
|
|
|
// not handled here and will terminate soon-ish because srv.quit
|
|
|
|
|
// is closed.
|
|
|
|
|
log.Trace(fmt.Sprintf("ignoring %d pending tasks at spindown", len(runningTasks))) |
|
|
|
|
for len(peers) > 0 { |
|
|
|
|
p := <-srv.delpeer |
|
|
|
|
log.Trace(fmt.Sprint("<-delpeer (spindown):", p)) |
|
|
|
|
p.log.Trace("<-delpeer (spindown)", "remainingTasks", len(runningTasks)) |
|
|
|
|
delete(peers, p.ID()) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -604,7 +615,7 @@ type tempError interface { |
|
|
|
|
// inbound connections.
|
|
|
|
|
func (srv *Server) listenLoop() { |
|
|
|
|
defer srv.loopWG.Done() |
|
|
|
|
log.Info(fmt.Sprint("Listening on", srv.listener.Addr())) |
|
|
|
|
log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab)) |
|
|
|
|
|
|
|
|
|
// This channel acts as a semaphore limiting
|
|
|
|
|
// active inbound connections that are lingering pre-handshake.
|
|
|
|
@ -629,10 +640,10 @@ func (srv *Server) listenLoop() { |
|
|
|
|
for { |
|
|
|
|
fd, err = srv.listener.Accept() |
|
|
|
|
if tempErr, ok := err.(tempError); ok && tempErr.Temporary() { |
|
|
|
|
log.Debug(fmt.Sprintf("Temporary read error: %v", err)) |
|
|
|
|
log.Debug("Temporary read error", "err", err) |
|
|
|
|
continue |
|
|
|
|
} else if err != nil { |
|
|
|
|
log.Debug(fmt.Sprintf("Read error: %v", err)) |
|
|
|
|
log.Debug("Read error", "err", err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
break |
|
|
|
@ -641,7 +652,7 @@ func (srv *Server) listenLoop() { |
|
|
|
|
// Reject connections that do not match NetRestrict.
|
|
|
|
|
if srv.NetRestrict != nil { |
|
|
|
|
if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) { |
|
|
|
|
log.Debug(fmt.Sprintf("Rejected conn %v because it is not whitelisted in NetRestrict", fd.RemoteAddr())) |
|
|
|
|
log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr()) |
|
|
|
|
fd.Close() |
|
|
|
|
slots <- struct{}{} |
|
|
|
|
continue |
|
|
|
@ -649,7 +660,7 @@ func (srv *Server) listenLoop() { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fd = newMeteredConn(fd, true) |
|
|
|
|
log.Debug(fmt.Sprintf("Accepted conn %v", fd.RemoteAddr())) |
|
|
|
|
log.Trace("Accepted connection", "addr", fd.RemoteAddr()) |
|
|
|
|
|
|
|
|
|
// Spawn the handler. It will give the slot back when the connection
|
|
|
|
|
// has been established.
|
|
|
|
@ -676,36 +687,37 @@ func (srv *Server) setupConn(fd net.Conn, flags connFlag, dialDest *discover.Nod |
|
|
|
|
// Run the encryption handshake.
|
|
|
|
|
var err error |
|
|
|
|
if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil { |
|
|
|
|
log.Debug(fmt.Sprintf("%v faild enc handshake: %v", c, err)) |
|
|
|
|
log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) |
|
|
|
|
c.close(err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
clog := log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags) |
|
|
|
|
// For dialed connections, check that the remote public key matches.
|
|
|
|
|
if dialDest != nil && c.id != dialDest.ID { |
|
|
|
|
c.close(DiscUnexpectedIdentity) |
|
|
|
|
log.Debug(fmt.Sprintf("%v dialed identity mismatch, want %x", c, dialDest.ID[:8])) |
|
|
|
|
clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
if err := srv.checkpoint(c, srv.posthandshake); err != nil { |
|
|
|
|
log.Debug(fmt.Sprintf("%v failed checkpoint posthandshake: %v", c, err)) |
|
|
|
|
clog.Trace("Rejected peer before protocol handshake", "err", err) |
|
|
|
|
c.close(err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
// Run the protocol handshake
|
|
|
|
|
phs, err := c.doProtoHandshake(srv.ourHandshake) |
|
|
|
|
if err != nil { |
|
|
|
|
log.Debug(fmt.Sprintf("%v failed proto handshake: %v", c, err)) |
|
|
|
|
clog.Trace("Failed proto handshake", "err", err) |
|
|
|
|
c.close(err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
if phs.ID != c.id { |
|
|
|
|
log.Debug(fmt.Sprintf("%v wrong proto handshake identity: %x", c, phs.ID[:8])) |
|
|
|
|
clog.Trace("Wrong devp2p handshake identity", "err", phs.ID) |
|
|
|
|
c.close(DiscUnexpectedIdentity) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
c.caps, c.name = phs.Caps, phs.Name |
|
|
|
|
if err := srv.checkpoint(c, srv.addpeer); err != nil { |
|
|
|
|
log.Debug(fmt.Sprintf("%v failed checkpoint addpeer: %v", c, err)) |
|
|
|
|
clog.Trace("Rejected peer", "err", err) |
|
|
|
|
c.close(err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
@ -713,6 +725,13 @@ func (srv *Server) setupConn(fd net.Conn, flags connFlag, dialDest *discover.Nod |
|
|
|
|
// launched by run.
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func truncateName(s string) string { |
|
|
|
|
if len(s) > 20 { |
|
|
|
|
return s[:20] + "..." |
|
|
|
|
} |
|
|
|
|
return s |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// checkpoint sends the conn to run, which performs the
|
|
|
|
|
// post-handshake checks for the stage (posthandshake, addpeer).
|
|
|
|
|
func (srv *Server) checkpoint(c *conn, stage chan<- *conn) error { |
|
|
|
@ -733,17 +752,13 @@ func (srv *Server) checkpoint(c *conn, stage chan<- *conn) error { |
|
|
|
|
// it waits until the Peer logic returns and removes
|
|
|
|
|
// the peer.
|
|
|
|
|
func (srv *Server) runPeer(p *Peer) { |
|
|
|
|
log.Debug(fmt.Sprintf("Added %v", p)) |
|
|
|
|
|
|
|
|
|
if srv.newPeerHook != nil { |
|
|
|
|
srv.newPeerHook(p) |
|
|
|
|
} |
|
|
|
|
discreason := p.run() |
|
|
|
|
remoteRequested, err := p.run() |
|
|
|
|
// Note: run waits for existing peers to be sent on srv.delpeer
|
|
|
|
|
// before returning, so this send should not select on srv.quit.
|
|
|
|
|
srv.delpeer <- p |
|
|
|
|
|
|
|
|
|
log.Debug(fmt.Sprintf("Removed %v (%v)", p, discreason)) |
|
|
|
|
srv.delpeer <- peerDrop{p, err, remoteRequested} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// NodeInfo represents a short summary of the information known about the host.
|
|
|
|
|