whisper: expiry refactoring (#3706)

pull/3632/merge
gluk256 8 years ago committed by Felix Lange
parent 357732a840
commit 11539030cd
  1. 8
      whisper/whisperv5/api.go
  2. 4
      whisper/whisperv5/filter.go
  3. 10
      whisper/whisperv5/peer.go
  4. 2
      whisper/whisperv5/peer_test.go
  5. 72
      whisper/whisperv5/whisper.go

@ -64,6 +64,14 @@ func (api *PublicWhisperAPI) Version() (hexutil.Uint, error) {
return hexutil.Uint(api.whisper.Version()), nil return hexutil.Uint(api.whisper.Version()), nil
} }
// Stats returns the Whisper statistics for diagnostics.
func (api *PublicWhisperAPI) Stats() (string, error) {
if api.whisper == nil {
return "", whisperOffLineErr
}
return api.whisper.Stats(), nil
}
// MarkPeerTrusted marks specific peer trusted, which will allow it // MarkPeerTrusted marks specific peer trusted, which will allow it
// to send historic (expired) messages. // to send historic (expired) messages.
func (api *PublicWhisperAPI) MarkPeerTrusted(peerID hexutil.Bytes) error { func (api *PublicWhisperAPI) MarkPeerTrusted(peerID hexutil.Bytes) error {

@ -18,7 +18,7 @@ package whisperv5
import ( import (
"crypto/ecdsa" "crypto/ecdsa"
"crypto/rand" crand "crypto/rand"
"fmt" "fmt"
"sync" "sync"
@ -55,7 +55,7 @@ func NewFilters(w *Whisper) *Filters {
func (fs *Filters) generateRandomID() (id string, err error) { func (fs *Filters) generateRandomID() (id string, err error) {
buf := make([]byte, 20) buf := make([]byte, 20)
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
_, err = rand.Read(buf) _, err = crand.Read(buf)
if err != nil { if err != nil {
continue continue
} }

@ -133,20 +133,14 @@ func (peer *Peer) marked(envelope *Envelope) bool {
// expire iterates over all the known envelopes in the host and removes all // expire iterates over all the known envelopes in the host and removes all
// expired (unknown) ones from the known list. // expired (unknown) ones from the known list.
func (peer *Peer) expire() { func (peer *Peer) expire() {
// Assemble the list of available envelopes
available := set.NewNonTS()
for _, envelope := range peer.host.Envelopes() {
available.Add(envelope.Hash())
}
// Cross reference availability with known status
unmark := make(map[common.Hash]struct{}) unmark := make(map[common.Hash]struct{})
peer.known.Each(func(v interface{}) bool { peer.known.Each(func(v interface{}) bool {
if !available.Has(v.(common.Hash)) { if !peer.host.isEnvelopeCached(v.(common.Hash)) {
unmark[v.(common.Hash)] = struct{}{} unmark[v.(common.Hash)] = struct{}{}
} }
return true return true
}) })
// Dump all known but unavailable // Dump all known but no longer cached
for hash := range unmark { for hash := range unmark {
peer.known.Remove(hash) peer.known.Remove(hash)
} }

@ -107,8 +107,6 @@ func TestSimulation(t *testing.T) {
} }
func initialize(t *testing.T) { func initialize(t *testing.T) {
// log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat())))
var err error var err error
ip := net.IPv4(127, 0, 0, 1) ip := net.IPv4(127, 0, 0, 1)
port0 := 30303 port0 := 30303

@ -35,6 +35,12 @@ import (
set "gopkg.in/fatih/set.v0" set "gopkg.in/fatih/set.v0"
) )
type Statistics struct {
messagesCleared int
memoryCleared int
totalMemoryUsed int
}
// Whisper represents a dark communication interface through the Ethereum // Whisper represents a dark communication interface through the Ethereum
// network, using its very own P2P communication layer. // network, using its very own P2P communication layer.
type Whisper struct { type Whisper struct {
@ -59,6 +65,8 @@ type Whisper struct {
p2pMsgQueue chan *Envelope p2pMsgQueue chan *Envelope
quit chan struct{} quit chan struct{}
stats Statistics
overflow bool overflow bool
test bool test bool
} }
@ -287,7 +295,8 @@ func (w *Whisper) Unwatch(id string) {
// Send injects a message into the whisper send queue, to be distributed in the // Send injects a message into the whisper send queue, to be distributed in the
// network in the coming cycles. // network in the coming cycles.
func (w *Whisper) Send(envelope *Envelope) error { func (w *Whisper) Send(envelope *Envelope) error {
return w.add(envelope) _, err := w.add(envelope)
return err
} }
// Start implements node.Service, starting the background data propagation thread // Start implements node.Service, starting the background data propagation thread
@ -360,12 +369,15 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
} }
// inject all envelopes into the internal pool // inject all envelopes into the internal pool
for _, envelope := range envelopes { for _, envelope := range envelopes {
if err := wh.add(envelope); err != nil { cached, err := wh.add(envelope)
if err != nil {
log.Warn(fmt.Sprintf("%v: bad envelope received: [%v], peer will be disconnected", p.peer, err)) log.Warn(fmt.Sprintf("%v: bad envelope received: [%v], peer will be disconnected", p.peer, err))
return fmt.Errorf("invalid envelope") return fmt.Errorf("invalid envelope")
} }
if cached {
p.mark(envelope) p.mark(envelope)
} }
}
case p2pCode: case p2pCode:
// peer-to-peer message, sent directly to peer bypassing PoW checks, etc. // peer-to-peer message, sent directly to peer bypassing PoW checks, etc.
// this message is not supposed to be forwarded to other peers, and // this message is not supposed to be forwarded to other peers, and
@ -401,13 +413,13 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
// add inserts a new envelope into the message pool to be distributed within the // add inserts a new envelope into the message pool to be distributed within the
// whisper network. It also inserts the envelope into the expiration pool at the // whisper network. It also inserts the envelope into the expiration pool at the
// appropriate time-stamp. In case of error, connection should be dropped. // appropriate time-stamp. In case of error, connection should be dropped.
func (wh *Whisper) add(envelope *Envelope) error { func (wh *Whisper) add(envelope *Envelope) (bool, error) {
now := uint32(time.Now().Unix()) now := uint32(time.Now().Unix())
sent := envelope.Expiry - envelope.TTL sent := envelope.Expiry - envelope.TTL
if sent > now { if sent > now {
if sent-SynchAllowance > now { if sent-SynchAllowance > now {
return fmt.Errorf("envelope created in the future [%x]", envelope.Hash()) return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash())
} else { } else {
// recalculate PoW, adjusted for the time difference, plus one second for latency // recalculate PoW, adjusted for the time difference, plus one second for latency
envelope.calculatePoW(sent - now + 1) envelope.calculatePoW(sent - now + 1)
@ -416,34 +428,34 @@ func (wh *Whisper) add(envelope *Envelope) error {
if envelope.Expiry < now { if envelope.Expiry < now {
if envelope.Expiry+SynchAllowance*2 < now { if envelope.Expiry+SynchAllowance*2 < now {
return fmt.Errorf("very old message") return false, fmt.Errorf("very old message")
} else { } else {
log.Debug(fmt.Sprintf("expired envelope dropped [%x]", envelope.Hash())) log.Debug(fmt.Sprintf("expired envelope dropped [%x]", envelope.Hash()))
return nil // drop envelope without error return false, nil // drop envelope without error
} }
} }
if len(envelope.Data) > MaxMessageLength { if len(envelope.Data) > MaxMessageLength {
return fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash()) return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash())
} }
if len(envelope.Version) > 4 { if len(envelope.Version) > 4 {
return fmt.Errorf("oversized version [%x]", envelope.Hash()) return false, fmt.Errorf("oversized version [%x]", envelope.Hash())
} }
if len(envelope.AESNonce) > AESNonceMaxLength { if len(envelope.AESNonce) > AESNonceMaxLength {
// the standard AES GSM nonce size is 12, // the standard AES GSM nonce size is 12,
// but const gcmStandardNonceSize cannot be accessed directly // but const gcmStandardNonceSize cannot be accessed directly
return fmt.Errorf("oversized AESNonce [%x]", envelope.Hash()) return false, fmt.Errorf("oversized AESNonce [%x]", envelope.Hash())
} }
if len(envelope.Salt) > saltLength { if len(envelope.Salt) > saltLength {
return fmt.Errorf("oversized salt [%x]", envelope.Hash()) return false, fmt.Errorf("oversized salt [%x]", envelope.Hash())
} }
if envelope.PoW() < MinimumPoW && !wh.test { if envelope.PoW() < MinimumPoW && !wh.test {
log.Debug(fmt.Sprintf("envelope with low PoW dropped: %f [%x]", envelope.PoW(), envelope.Hash())) log.Debug(fmt.Sprintf("envelope with low PoW dropped: %f [%x]", envelope.PoW(), envelope.Hash()))
return nil // drop envelope without error return false, nil // drop envelope without error
} }
hash := envelope.Hash() hash := envelope.Hash()
@ -465,12 +477,13 @@ func (wh *Whisper) add(envelope *Envelope) error {
log.Trace(fmt.Sprintf("whisper envelope already cached [%x]\n", envelope.Hash())) log.Trace(fmt.Sprintf("whisper envelope already cached [%x]\n", envelope.Hash()))
} else { } else {
log.Trace(fmt.Sprintf("cached whisper envelope [%x]: %v\n", envelope.Hash(), envelope)) log.Trace(fmt.Sprintf("cached whisper envelope [%x]: %v\n", envelope.Hash(), envelope))
wh.stats.totalMemoryUsed += envelope.size()
wh.postEvent(envelope, false) // notify the local node about the new message wh.postEvent(envelope, false) // notify the local node about the new message
if wh.mailServer != nil { if wh.mailServer != nil {
wh.mailServer.Archive(envelope) wh.mailServer.Archive(envelope)
} }
} }
return nil return true, nil
} }
// postEvent queues the message for further processing. // postEvent queues the message for further processing.
@ -545,22 +558,32 @@ func (w *Whisper) expire() {
w.poolMu.Lock() w.poolMu.Lock()
defer w.poolMu.Unlock() defer w.poolMu.Unlock()
w.stats.clear()
now := uint32(time.Now().Unix()) now := uint32(time.Now().Unix())
for then, hashSet := range w.expirations { for expiry, hashSet := range w.expirations {
// Short circuit if a future time if expiry < now {
if then > now { w.stats.messagesCleared++
continue
}
// Dump all expired messages and remove timestamp // Dump all expired messages and remove timestamp
hashSet.Each(func(v interface{}) bool { hashSet.Each(func(v interface{}) bool {
sz := w.envelopes[v.(common.Hash)].size()
w.stats.memoryCleared += sz
w.stats.totalMemoryUsed -= sz
delete(w.envelopes, v.(common.Hash)) delete(w.envelopes, v.(common.Hash))
delete(w.messages, v.(common.Hash)) delete(w.messages, v.(common.Hash))
return true return true
}) })
w.expirations[then].Clear() w.expirations[expiry].Clear()
delete(w.expirations, expiry)
}
} }
} }
func (w *Whisper) Stats() string {
return fmt.Sprintf("Latest expiry cycle cleared %d messages (%d bytes). Memory usage: %d bytes.",
w.stats.messagesCleared, w.stats.memoryCleared, w.stats.totalMemoryUsed)
}
// envelopes retrieves all the messages currently pooled by the node. // envelopes retrieves all the messages currently pooled by the node.
func (w *Whisper) Envelopes() []*Envelope { func (w *Whisper) Envelopes() []*Envelope {
w.poolMu.RLock() w.poolMu.RLock()
@ -589,6 +612,14 @@ func (w *Whisper) Messages(id string) []*ReceivedMessage {
return result return result
} }
func (w *Whisper) isEnvelopeCached(hash common.Hash) bool {
w.poolMu.Lock()
defer w.poolMu.Unlock()
_, exist := w.envelopes[hash]
return exist
}
func (w *Whisper) addDecryptedMessage(msg *ReceivedMessage) { func (w *Whisper) addDecryptedMessage(msg *ReceivedMessage) {
w.poolMu.Lock() w.poolMu.Lock()
defer w.poolMu.Unlock() defer w.poolMu.Unlock()
@ -596,6 +627,11 @@ func (w *Whisper) addDecryptedMessage(msg *ReceivedMessage) {
w.messages[msg.EnvelopeHash] = msg w.messages[msg.EnvelopeHash] = msg
} }
func (s *Statistics) clear() {
s.memoryCleared = 0
s.messagesCleared = 0
}
func ValidatePublicKey(k *ecdsa.PublicKey) bool { func ValidatePublicKey(k *ecdsa.PublicKey) bool {
return k != nil && k.X != nil && k.Y != nil && k.X.Sign() != 0 && k.Y.Sign() != 0 return k != nil && k.X != nil && k.Y != nil && k.X.Sign() != 0 && k.Y.Sign() != 0
} }

Loading…
Cancel
Save