|
|
|
@ -4,106 +4,155 @@ import ( |
|
|
|
|
"fmt" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"github.com/ethereum/go-ethereum/common" |
|
|
|
|
"github.com/ethereum/go-ethereum/p2p" |
|
|
|
|
"github.com/ethereum/go-ethereum/rlp" |
|
|
|
|
"gopkg.in/fatih/set.v0" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// peer represents a whisper protocol peer connection.
|
|
|
|
|
type peer struct { |
|
|
|
|
host *Whisper |
|
|
|
|
peer *p2p.Peer |
|
|
|
|
ws p2p.MsgReadWriter |
|
|
|
|
|
|
|
|
|
// XXX Eventually this is going to reach exceptional large space. We need an expiry here
|
|
|
|
|
known *set.Set |
|
|
|
|
known *set.Set // Messages already known by the peer to avoid wasting bandwidth
|
|
|
|
|
|
|
|
|
|
quit chan struct{} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func NewPeer(host *Whisper, p *p2p.Peer, ws p2p.MsgReadWriter) *peer { |
|
|
|
|
return &peer{host, p, ws, set.New(), make(chan struct{})} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self *peer) init() error { |
|
|
|
|
if err := self.handleStatus(); err != nil { |
|
|
|
|
return err |
|
|
|
|
// newPeer creates and initializes a new whisper peer connection, returning either
|
|
|
|
|
// the newly constructed link or a failure reason.
|
|
|
|
|
func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) (*peer, error) { |
|
|
|
|
p := &peer{ |
|
|
|
|
host: host, |
|
|
|
|
peer: remote, |
|
|
|
|
ws: rw, |
|
|
|
|
known: set.New(), |
|
|
|
|
quit: make(chan struct{}), |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
if err := p.handshake(); err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
return p, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// start initiates the peer updater, periodically broadcasting the whisper packets
|
|
|
|
|
// into the network.
|
|
|
|
|
func (self *peer) start() { |
|
|
|
|
go self.update() |
|
|
|
|
self.peer.Debugln("whisper started") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// stop terminates the peer updater, stopping message forwarding to it.
|
|
|
|
|
func (self *peer) stop() { |
|
|
|
|
close(self.quit) |
|
|
|
|
self.peer.Debugln("whisper stopped") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
close(self.quit) |
|
|
|
|
// handshake sends the protocol initiation status message to the remote peer and
|
|
|
|
|
// verifies the remote status too.
|
|
|
|
|
func (self *peer) handshake() error { |
|
|
|
|
// Send own status message, fetch remote one
|
|
|
|
|
if err := p2p.SendItems(self.ws, statusCode, protocolVersion); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
packet, err := self.ws.ReadMsg() |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if packet.Code != statusCode { |
|
|
|
|
return fmt.Errorf("peer sent %x before status packet", packet.Code) |
|
|
|
|
} |
|
|
|
|
// Decode the rest of the status packet and verify protocol match
|
|
|
|
|
s := rlp.NewStream(packet.Payload) |
|
|
|
|
if _, err := s.List(); err != nil { |
|
|
|
|
return fmt.Errorf("bad status message: %v", err) |
|
|
|
|
} |
|
|
|
|
peerVersion, err := s.Uint() |
|
|
|
|
if err != nil { |
|
|
|
|
return fmt.Errorf("bad status message: %v", err) |
|
|
|
|
} |
|
|
|
|
if peerVersion != protocolVersion { |
|
|
|
|
return fmt.Errorf("protocol version mismatch %d != %d", peerVersion, protocolVersion) |
|
|
|
|
} |
|
|
|
|
return packet.Discard() // ignore anything after protocol version
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// update executes periodic operations on the peer, including message transmission
|
|
|
|
|
// and expiration.
|
|
|
|
|
func (self *peer) update() { |
|
|
|
|
relay := time.NewTicker(300 * time.Millisecond) |
|
|
|
|
out: |
|
|
|
|
// Start the tickers for the updates
|
|
|
|
|
expire := time.NewTicker(expirationTicks) |
|
|
|
|
transmit := time.NewTicker(transmissionTicks) |
|
|
|
|
|
|
|
|
|
// Loop and transmit until termination is requested
|
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-relay.C: |
|
|
|
|
err := self.broadcast(self.host.envelopes()) |
|
|
|
|
if err != nil { |
|
|
|
|
self.peer.Infoln("broadcast err:", err) |
|
|
|
|
break out |
|
|
|
|
case <-expire.C: |
|
|
|
|
self.expire() |
|
|
|
|
|
|
|
|
|
case <-transmit.C: |
|
|
|
|
if err := self.broadcast(); err != nil { |
|
|
|
|
self.peer.Infoln("broadcast failed:", err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
case <-self.quit: |
|
|
|
|
break out |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self *peer) broadcast(envelopes []*Envelope) error { |
|
|
|
|
envs := make([]*Envelope, 0, len(envelopes)) |
|
|
|
|
for _, env := range envelopes { |
|
|
|
|
if !self.known.Has(env.Hash()) { |
|
|
|
|
envs = append(envs, env) |
|
|
|
|
self.known.Add(env.Hash()) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if len(envs) > 0 { |
|
|
|
|
if err := p2p.Send(self.ws, envelopesMsg, envs); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
self.peer.DebugDetailln("broadcasted", len(envs), "message(s)") |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
// mark marks an envelope known to the peer so that it won't be sent back.
|
|
|
|
|
func (self *peer) mark(envelope *Envelope) { |
|
|
|
|
self.known.Add(envelope.Hash()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self *peer) addKnown(envelope *Envelope) { |
|
|
|
|
self.known.Add(envelope.Hash()) |
|
|
|
|
// marked checks if an envelope is already known to the remote peer.
|
|
|
|
|
func (self *peer) marked(envelope *Envelope) bool { |
|
|
|
|
return self.known.Has(envelope.Hash()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (self *peer) handleStatus() error { |
|
|
|
|
ws := self.ws |
|
|
|
|
if err := p2p.SendItems(ws, statusMsg, protocolVersion); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
msg, err := ws.ReadMsg() |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if msg.Code != statusMsg { |
|
|
|
|
return fmt.Errorf("peer send %x before status msg", msg.Code) |
|
|
|
|
// expire iterates over all the known envelopes in the host and removes all
|
|
|
|
|
// expired (unknown) ones from the known list.
|
|
|
|
|
func (self *peer) expire() { |
|
|
|
|
// Assemble the list of available envelopes
|
|
|
|
|
available := set.NewNonTS() |
|
|
|
|
for _, envelope := range self.host.envelopes() { |
|
|
|
|
available.Add(envelope.Hash()) |
|
|
|
|
} |
|
|
|
|
s := rlp.NewStream(msg.Payload) |
|
|
|
|
if _, err := s.List(); err != nil { |
|
|
|
|
return fmt.Errorf("bad status message: %v", err) |
|
|
|
|
// Cross reference availability with known status
|
|
|
|
|
unmark := make(map[common.Hash]struct{}) |
|
|
|
|
self.known.Each(func(v interface{}) bool { |
|
|
|
|
if !available.Has(v.(common.Hash)) { |
|
|
|
|
unmark[v.(common.Hash)] = struct{}{} |
|
|
|
|
} |
|
|
|
|
return true |
|
|
|
|
}) |
|
|
|
|
// Dump all known but unavailable
|
|
|
|
|
for hash, _ := range unmark { |
|
|
|
|
self.known.Remove(hash) |
|
|
|
|
} |
|
|
|
|
pv, err := s.Uint() |
|
|
|
|
if err != nil { |
|
|
|
|
return fmt.Errorf("bad status message: %v", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// broadcast iterates over the collection of envelopes and transmits yet unknown
|
|
|
|
|
// ones over the network.
|
|
|
|
|
func (self *peer) broadcast() error { |
|
|
|
|
// Fetch the envelopes and collect the unknown ones
|
|
|
|
|
envelopes := self.host.envelopes() |
|
|
|
|
transmit := make([]*Envelope, 0, len(envelopes)) |
|
|
|
|
for _, envelope := range envelopes { |
|
|
|
|
if !self.marked(envelope) { |
|
|
|
|
transmit = append(transmit, envelope) |
|
|
|
|
self.mark(envelope) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if pv != protocolVersion { |
|
|
|
|
return fmt.Errorf("protocol version mismatch %d != %d", pv, protocolVersion) |
|
|
|
|
// Transmit the unknown batch (potentially empty)
|
|
|
|
|
if err := p2p.Send(self.ws, messagesCode, transmit); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
return msg.Discard() // ignore anything after protocol version
|
|
|
|
|
self.peer.DebugDetailln("broadcasted", len(transmit), "message(s)") |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|