|
|
|
@ -5,6 +5,8 @@ import ( |
|
|
|
|
"github.com/ethereum/ethwire-go" |
|
|
|
|
"log" |
|
|
|
|
"net" |
|
|
|
|
"sync/atomic" |
|
|
|
|
"time" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
type Peer struct { |
|
|
|
@ -16,8 +18,12 @@ type Peer struct { |
|
|
|
|
outputQueue chan *ethwire.InOutMsg |
|
|
|
|
// Quit channel
|
|
|
|
|
quit chan bool |
|
|
|
|
|
|
|
|
|
inbound bool // Determines whether it's an inbound or outbound peer
|
|
|
|
|
// Determines whether it's an inbound or outbound peer
|
|
|
|
|
inbound bool |
|
|
|
|
// Flag for checking the peer's connectivity state
|
|
|
|
|
connected int32 |
|
|
|
|
disconnect int32 |
|
|
|
|
lastSend time.Time |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func NewPeer(conn net.Conn, server *Server, inbound bool) *Peer { |
|
|
|
@ -27,12 +33,57 @@ func NewPeer(conn net.Conn, server *Server, inbound bool) *Peer { |
|
|
|
|
server: server, |
|
|
|
|
conn: conn, |
|
|
|
|
inbound: inbound, |
|
|
|
|
disconnect: 0, |
|
|
|
|
connected: 1, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func NewOutboundPeer(addr string, server *Server) *Peer { |
|
|
|
|
p := &Peer{ |
|
|
|
|
outputQueue: make(chan *ethwire.InOutMsg, 1), // Buffered chan of 1 is enough
|
|
|
|
|
quit: make(chan bool), |
|
|
|
|
server: server, |
|
|
|
|
inbound: false, |
|
|
|
|
connected: 0, |
|
|
|
|
disconnect: 1, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Set up the connection in another goroutine so we don't block the main thread
|
|
|
|
|
go func() { |
|
|
|
|
conn, err := net.Dial("tcp", addr) |
|
|
|
|
if err != nil { |
|
|
|
|
p.Stop() |
|
|
|
|
} |
|
|
|
|
p.conn = conn |
|
|
|
|
|
|
|
|
|
// Atomically set the connection state
|
|
|
|
|
atomic.StoreInt32(&p.connected, 1) |
|
|
|
|
atomic.StoreInt32(&p.disconnect, 0) |
|
|
|
|
|
|
|
|
|
log.Println("Connected to peer ::", conn.RemoteAddr()) |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
return p |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Outputs any RLP encoded data to the peer
|
|
|
|
|
func (p *Peer) QueueMessage(msg *ethwire.InOutMsg) { |
|
|
|
|
p.outputQueue <- msg //ethwire.InOutMsg{MsgType: msgType, Nonce: ethutil.RandomUint64(), Data: data}
|
|
|
|
|
p.outputQueue <- msg |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p *Peer) writeMessage(msg *ethwire.InOutMsg) { |
|
|
|
|
// Ignore the write if we're not connected
|
|
|
|
|
if atomic.LoadInt32(&p.connected) != 1 { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
err := ethwire.WriteMessage(p.conn, msg) |
|
|
|
|
if err != nil { |
|
|
|
|
log.Println("Can't send message:", err) |
|
|
|
|
// Stop the client if there was an error writing to it
|
|
|
|
|
p.Stop() |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Outbound message handler. Outbound messages are handled here
|
|
|
|
@ -42,28 +93,32 @@ out: |
|
|
|
|
select { |
|
|
|
|
// Main message queue. All outbound messages are processed through here
|
|
|
|
|
case msg := <-p.outputQueue: |
|
|
|
|
// TODO Message checking and handle accordingly
|
|
|
|
|
err := ethwire.WriteMessage(p.conn, msg) |
|
|
|
|
if err != nil { |
|
|
|
|
log.Println(err) |
|
|
|
|
|
|
|
|
|
// Stop the client if there was an error writing to it
|
|
|
|
|
p.Stop() |
|
|
|
|
} |
|
|
|
|
p.writeMessage(msg) |
|
|
|
|
|
|
|
|
|
p.lastSend = time.Now() |
|
|
|
|
// Break out of the for loop if a quit message is posted
|
|
|
|
|
case <-p.quit: |
|
|
|
|
break out |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
clean: |
|
|
|
|
// This loop is for draining the output queue and anybody waiting for us
|
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <- p.outputQueue: |
|
|
|
|
// TODO
|
|
|
|
|
default: |
|
|
|
|
break clean |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Inbound handler. Inbound messages are received here and passed to the appropriate methods
|
|
|
|
|
func (p *Peer) HandleInbound() { |
|
|
|
|
defer p.Stop() |
|
|
|
|
|
|
|
|
|
out: |
|
|
|
|
for { |
|
|
|
|
for atomic.LoadInt32(&p.disconnect) == 0 { |
|
|
|
|
// Wait for a message from the peer
|
|
|
|
|
msg, err := ethwire.ReadMessage(p.conn) |
|
|
|
|
if err != nil { |
|
|
|
@ -90,8 +145,7 @@ out: |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Notify the out handler we're quiting
|
|
|
|
|
p.quit <- true |
|
|
|
|
p.Stop() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p *Peer) Start() { |
|
|
|
@ -111,9 +165,16 @@ func (p *Peer) Start() { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p *Peer) Stop() { |
|
|
|
|
p.conn.Close() |
|
|
|
|
if atomic.AddInt32(&p.disconnect, 1) != 1 { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
close(p.quit) |
|
|
|
|
if atomic.LoadInt32(&p.connected) != 0 { |
|
|
|
|
p.conn.Close() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
p.quit <- true |
|
|
|
|
log.Println("Peer shutdown") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p *Peer) pushVersionAck() error { |
|
|
|
|