|
|
|
@ -469,18 +469,18 @@ func (w *Whisper) Stop() error { |
|
|
|
|
|
|
|
|
|
// HandlePeer is called by the underlying P2P layer when the whisper sub-protocol
|
|
|
|
|
// connection is negotiated.
|
|
|
|
|
func (wh *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { |
|
|
|
|
func (w *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { |
|
|
|
|
// Create the new peer and start tracking it
|
|
|
|
|
whisperPeer := newPeer(wh, peer, rw) |
|
|
|
|
whisperPeer := newPeer(w, peer, rw) |
|
|
|
|
|
|
|
|
|
wh.peerMu.Lock() |
|
|
|
|
wh.peers[whisperPeer] = struct{}{} |
|
|
|
|
wh.peerMu.Unlock() |
|
|
|
|
w.peerMu.Lock() |
|
|
|
|
w.peers[whisperPeer] = struct{}{} |
|
|
|
|
w.peerMu.Unlock() |
|
|
|
|
|
|
|
|
|
defer func() { |
|
|
|
|
wh.peerMu.Lock() |
|
|
|
|
delete(wh.peers, whisperPeer) |
|
|
|
|
wh.peerMu.Unlock() |
|
|
|
|
w.peerMu.Lock() |
|
|
|
|
delete(w.peers, whisperPeer) |
|
|
|
|
w.peerMu.Unlock() |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
// Run the peer handshake and state updates
|
|
|
|
@ -490,11 +490,11 @@ func (wh *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { |
|
|
|
|
whisperPeer.start() |
|
|
|
|
defer whisperPeer.stop() |
|
|
|
|
|
|
|
|
|
return wh.runMessageLoop(whisperPeer, rw) |
|
|
|
|
return w.runMessageLoop(whisperPeer, rw) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// runMessageLoop reads and processes inbound messages directly to merge into client-global state.
|
|
|
|
|
func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { |
|
|
|
|
func (w *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { |
|
|
|
|
for { |
|
|
|
|
// fetch the next packet
|
|
|
|
|
packet, err := rw.ReadMsg() |
|
|
|
@ -502,7 +502,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { |
|
|
|
|
log.Warn("message loop", "peer", p.peer.ID(), "err", err) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if packet.Size > wh.MaxMessageSize() { |
|
|
|
|
if packet.Size > w.MaxMessageSize() { |
|
|
|
|
log.Warn("oversized message received", "peer", p.peer.ID()) |
|
|
|
|
return errors.New("oversized message received") |
|
|
|
|
} |
|
|
|
@ -518,7 +518,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { |
|
|
|
|
log.Warn("failed to decode envelope, peer will be disconnected", "peer", p.peer.ID(), "err", err) |
|
|
|
|
return errors.New("invalid envelope") |
|
|
|
|
} |
|
|
|
|
cached, err := wh.add(&envelope) |
|
|
|
|
cached, err := w.add(&envelope) |
|
|
|
|
if err != nil { |
|
|
|
|
log.Warn("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err) |
|
|
|
|
return errors.New("invalid envelope") |
|
|
|
@ -537,17 +537,17 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { |
|
|
|
|
log.Warn("failed to decode direct message, peer will be disconnected", "peer", p.peer.ID(), "err", err) |
|
|
|
|
return errors.New("invalid direct message") |
|
|
|
|
} |
|
|
|
|
wh.postEvent(&envelope, true) |
|
|
|
|
w.postEvent(&envelope, true) |
|
|
|
|
} |
|
|
|
|
case p2pRequestCode: |
|
|
|
|
// Must be processed if mail server is implemented. Otherwise ignore.
|
|
|
|
|
if wh.mailServer != nil { |
|
|
|
|
if w.mailServer != nil { |
|
|
|
|
var request Envelope |
|
|
|
|
if err := packet.Decode(&request); err != nil { |
|
|
|
|
log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err) |
|
|
|
|
return errors.New("invalid p2p request") |
|
|
|
|
} |
|
|
|
|
wh.mailServer.DeliverMail(p, &request) |
|
|
|
|
w.mailServer.DeliverMail(p, &request) |
|
|
|
|
} |
|
|
|
|
default: |
|
|
|
|
// New message types might be implemented in the future versions of Whisper.
|
|
|
|
@ -561,29 +561,27 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { |
|
|
|
|
// add inserts a new envelope into the message pool to be distributed within the
|
|
|
|
|
// whisper network. It also inserts the envelope into the expiration pool at the
|
|
|
|
|
// appropriate time-stamp. In case of error, connection should be dropped.
|
|
|
|
|
func (wh *Whisper) add(envelope *Envelope) (bool, error) { |
|
|
|
|
func (w *Whisper) add(envelope *Envelope) (bool, error) { |
|
|
|
|
now := uint32(time.Now().Unix()) |
|
|
|
|
sent := envelope.Expiry - envelope.TTL |
|
|
|
|
|
|
|
|
|
if sent > now { |
|
|
|
|
if sent-SynchAllowance > now { |
|
|
|
|
return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash()) |
|
|
|
|
} else { |
|
|
|
|
// recalculate PoW, adjusted for the time difference, plus one second for latency
|
|
|
|
|
envelope.calculatePoW(sent - now + 1) |
|
|
|
|
} |
|
|
|
|
// recalculate PoW, adjusted for the time difference, plus one second for latency
|
|
|
|
|
envelope.calculatePoW(sent - now + 1) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if envelope.Expiry < now { |
|
|
|
|
if envelope.Expiry+SynchAllowance*2 < now { |
|
|
|
|
return false, fmt.Errorf("very old message") |
|
|
|
|
} else { |
|
|
|
|
log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex()) |
|
|
|
|
return false, nil // drop envelope without error
|
|
|
|
|
} |
|
|
|
|
log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex()) |
|
|
|
|
return false, nil // drop envelope without error
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if uint32(envelope.size()) > wh.MaxMessageSize() { |
|
|
|
|
if uint32(envelope.size()) > w.MaxMessageSize() { |
|
|
|
|
return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -598,36 +596,36 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { |
|
|
|
|
return false, fmt.Errorf("wrong size of AESNonce: %d bytes [env: %x]", aesNonceSize, envelope.Hash()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if envelope.PoW() < wh.MinPow() { |
|
|
|
|
if envelope.PoW() < w.MinPow() { |
|
|
|
|
log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex()) |
|
|
|
|
return false, nil // drop envelope without error
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
hash := envelope.Hash() |
|
|
|
|
|
|
|
|
|
wh.poolMu.Lock() |
|
|
|
|
_, alreadyCached := wh.envelopes[hash] |
|
|
|
|
w.poolMu.Lock() |
|
|
|
|
_, alreadyCached := w.envelopes[hash] |
|
|
|
|
if !alreadyCached { |
|
|
|
|
wh.envelopes[hash] = envelope |
|
|
|
|
if wh.expirations[envelope.Expiry] == nil { |
|
|
|
|
wh.expirations[envelope.Expiry] = set.NewNonTS() |
|
|
|
|
w.envelopes[hash] = envelope |
|
|
|
|
if w.expirations[envelope.Expiry] == nil { |
|
|
|
|
w.expirations[envelope.Expiry] = set.NewNonTS() |
|
|
|
|
} |
|
|
|
|
if !wh.expirations[envelope.Expiry].Has(hash) { |
|
|
|
|
wh.expirations[envelope.Expiry].Add(hash) |
|
|
|
|
if !w.expirations[envelope.Expiry].Has(hash) { |
|
|
|
|
w.expirations[envelope.Expiry].Add(hash) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
wh.poolMu.Unlock() |
|
|
|
|
w.poolMu.Unlock() |
|
|
|
|
|
|
|
|
|
if alreadyCached { |
|
|
|
|
log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex()) |
|
|
|
|
} else { |
|
|
|
|
log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex()) |
|
|
|
|
wh.statsMu.Lock() |
|
|
|
|
wh.stats.memoryUsed += envelope.size() |
|
|
|
|
wh.statsMu.Unlock() |
|
|
|
|
wh.postEvent(envelope, false) // notify the local node about the new message
|
|
|
|
|
if wh.mailServer != nil { |
|
|
|
|
wh.mailServer.Archive(envelope) |
|
|
|
|
w.statsMu.Lock() |
|
|
|
|
w.stats.memoryUsed += envelope.size() |
|
|
|
|
w.statsMu.Unlock() |
|
|
|
|
w.postEvent(envelope, false) // notify the local node about the new message
|
|
|
|
|
if w.mailServer != nil { |
|
|
|
|
w.mailServer.Archive(envelope) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return true, nil |
|
|
|
@ -838,9 +836,8 @@ func deriveKeyMaterial(key []byte, version uint64) (derivedKey []byte, err error |
|
|
|
|
// because it's a once in a session experience
|
|
|
|
|
derivedKey := pbkdf2.Key(key, nil, 65356, aesKeyLength, sha256.New) |
|
|
|
|
return derivedKey, nil |
|
|
|
|
} else { |
|
|
|
|
return nil, unknownVersionError(version) |
|
|
|
|
} |
|
|
|
|
return nil, unknownVersionError(version) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GenerateRandomID generates a random string, which is then returned to be used as a key id
|
|
|
|
|