@ -39,6 +39,8 @@ import (
set "gopkg.in/fatih/set.v0"
)
// Statistics holds several message-related counter for analytics
// purposes.
type Statistics struct {
messagesCleared int
memoryCleared int
@ -130,8 +132,8 @@ func New(cfg *Config) *Whisper {
}
// MinPow returns the PoW value required by this node.
func ( w * Whisper ) MinPow ( ) float64 {
val , exist := w . settings . Load ( minPowIdx )
func ( whisper * Whisper ) MinPow ( ) float64 {
val , exist := whisper . settings . Load ( minPowIdx )
if ! exist || val == nil {
return DefaultMinimumPoW
}
@ -146,8 +148,8 @@ func (w *Whisper) MinPow() float64 {
// MinPowTolerance returns the value of minimum PoW which is tolerated for a limited
// time after PoW was changed. If sufficient time have elapsed or no change of PoW
// have ever occurred, the return value will be the same as return value of MinPow().
func ( w * Whisper ) MinPowTolerance ( ) float64 {
val , exist := w . settings . Load ( minPowToleranceIdx )
func ( whisper * Whisper ) MinPowTolerance ( ) float64 {
val , exist := whisper . settings . Load ( minPowToleranceIdx )
if ! exist || val == nil {
return DefaultMinimumPoW
}
@ -158,8 +160,8 @@ func (w *Whisper) MinPowTolerance() float64 {
// The nodes are required to send only messages that match the advertised bloom filter.
// If a message does not match the bloom, it will tantamount to spam, and the peer will
// be disconnected.
func ( w * Whisper ) BloomFilter ( ) [ ] byte {
val , exist := w . settings . Load ( bloomFilterIdx )
func ( whisper * Whisper ) BloomFilter ( ) [ ] byte {
val , exist := whisper . settings . Load ( bloomFilterIdx )
if ! exist || val == nil {
return nil
}
@ -170,8 +172,8 @@ func (w *Whisper) BloomFilter() []byte {
// time after new bloom was advertised to the peers. If sufficient time have elapsed
// or no change of bloom filter have ever occurred, the return value will be the same
// as return value of BloomFilter().
func ( w * Whisper ) BloomFilterTolerance ( ) [ ] byte {
val , exist := w . settings . Load ( bloomFilterToleranceIdx )
func ( whisper * Whisper ) BloomFilterTolerance ( ) [ ] byte {
val , exist := whisper . settings . Load ( bloomFilterToleranceIdx )
if ! exist || val == nil {
return nil
}
@ -179,24 +181,24 @@ func (w *Whisper) BloomFilterTolerance() []byte {
}
// MaxMessageSize returns the maximum accepted message size.
func ( w * Whisper ) MaxMessageSize ( ) uint32 {
val , _ := w . settings . Load ( maxMsgSizeIdx )
func ( whisper * Whisper ) MaxMessageSize ( ) uint32 {
val , _ := whisper . settings . Load ( maxMsgSizeIdx )
return val . ( uint32 )
}
// Overflow returns an indication if the message queue is full.
func ( w * Whisper ) Overflow ( ) bool {
val , _ := w . settings . Load ( overflowIdx )
func ( whisper * Whisper ) Overflow ( ) bool {
val , _ := whisper . settings . Load ( overflowIdx )
return val . ( bool )
}
// APIs returns the RPC descriptors the Whisper implementation offers
func ( w * Whisper ) APIs ( ) [ ] rpc . API {
func ( whisper * Whisper ) APIs ( ) [ ] rpc . API {
return [ ] rpc . API {
{
Namespace : ProtocolName ,
Version : ProtocolVersionStr ,
Service : NewPublicWhisperAPI ( w ) ,
Service : NewPublicWhisperAPI ( whisper ) ,
Public : true ,
} ,
}
@ -204,31 +206,31 @@ func (w *Whisper) APIs() []rpc.API {
// RegisterServer registers MailServer interface.
// MailServer will process all the incoming messages with p2pRequestCode.
func ( w * Whisper ) RegisterServer ( server MailServer ) {
w . mailServer = server
func ( whisper * Whisper ) RegisterServer ( server MailServer ) {
whisper . mailServer = server
}
// Protocols returns the whisper sub-protocols ran by this particular client.
func ( w * Whisper ) Protocols ( ) [ ] p2p . Protocol {
return [ ] p2p . Protocol { w . protocol }
func ( whisper * Whisper ) Protocols ( ) [ ] p2p . Protocol {
return [ ] p2p . Protocol { whisper . protocol }
}
// Version returns the whisper sub-protocols version number.
func ( w * Whisper ) Version ( ) uint {
return w . protocol . Version
func ( whisper * Whisper ) Version ( ) uint {
return whisper . protocol . Version
}
// SetMaxMessageSize sets the maximal message size allowed by this node
func ( w * Whisper ) SetMaxMessageSize ( size uint32 ) error {
func ( whisper * Whisper ) SetMaxMessageSize ( size uint32 ) error {
if size > MaxMessageSize {
return fmt . Errorf ( "message size too large [%d>%d]" , size , MaxMessageSize )
}
w . settings . Store ( maxMsgSizeIdx , size )
whisper . settings . Store ( maxMsgSizeIdx , size )
return nil
}
// SetBloomFilter sets the new bloom filter
func ( w * Whisper ) SetBloomFilter ( bloom [ ] byte ) error {
func ( whisper * Whisper ) SetBloomFilter ( bloom [ ] byte ) error {
if len ( bloom ) != bloomFilterSize {
return fmt . Errorf ( "invalid bloom filter size: %d" , len ( bloom ) )
}
@ -236,45 +238,45 @@ func (w *Whisper) SetBloomFilter(bloom []byte) error {
b := make ( [ ] byte , bloomFilterSize )
copy ( b , bloom )
w . settings . Store ( bloomFilterIdx , b )
w . notifyPeersAboutBloomFilterChange ( b )
whisper . settings . Store ( bloomFilterIdx , b )
whisper . notifyPeersAboutBloomFilterChange ( b )
go func ( ) {
// allow some time before all the peers have processed the notification
time . Sleep ( time . Duration ( w . syncAllowance ) * time . Second )
w . settings . Store ( bloomFilterToleranceIdx , b )
time . Sleep ( time . Duration ( whisper . syncAllowance ) * time . Second )
whisper . settings . Store ( bloomFilterToleranceIdx , b )
} ( )
return nil
}
// SetMinimumPoW sets the minimal PoW required by this node
func ( w * Whisper ) SetMinimumPoW ( val float64 ) error {
func ( whisper * Whisper ) SetMinimumPoW ( val float64 ) error {
if val < 0.0 {
return fmt . Errorf ( "invalid PoW: %f" , val )
}
w . settings . Store ( minPowIdx , val )
w . notifyPeersAboutPowRequirementChange ( val )
whisper . settings . Store ( minPowIdx , val )
whisper . notifyPeersAboutPowRequirementChange ( val )
go func ( ) {
// allow some time before all the peers have processed the notification
time . Sleep ( time . Duration ( w . syncAllowance ) * time . Second )
w . settings . Store ( minPowToleranceIdx , val )
time . Sleep ( time . Duration ( whisper . syncAllowance ) * time . Second )
whisper . settings . Store ( minPowToleranceIdx , val )
} ( )
return nil
}
// SetMinimumPoW sets the minimal PoW in test environment
func ( w * Whisper ) SetMinimumPowTest ( val float64 ) {
w . settings . Store ( minPowIdx , val )
w . notifyPeersAboutPowRequirementChange ( val )
w . settings . Store ( minPowToleranceIdx , val )
// SetMinimumPowTest sets the minimal PoW in test environment
func ( whisper * Whisper ) SetMinimumPowTest ( val float64 ) {
whisper . settings . Store ( minPowIdx , val )
whisper . notifyPeersAboutPowRequirementChange ( val )
whisper . settings . Store ( minPowToleranceIdx , val )
}
func ( w * Whisper ) notifyPeersAboutPowRequirementChange ( pow float64 ) {
arr := w . getPeers ( )
func ( whisper * Whisper ) notifyPeersAboutPowRequirementChange ( pow float64 ) {
arr := whisper . getPeers ( )
for _ , p := range arr {
err := p . notifyAboutPowRequirementChange ( pow )
if err != nil {
@ -287,8 +289,8 @@ func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) {
}
}
func ( w * Whisper ) notifyPeersAboutBloomFilterChange ( bloom [ ] byte ) {
arr := w . getPeers ( )
func ( whisper * Whisper ) notifyPeersAboutBloomFilterChange ( bloom [ ] byte ) {
arr := whisper . getPeers ( )
for _ , p := range arr {
err := p . notifyAboutBloomFilterChange ( bloom )
if err != nil {
@ -301,23 +303,23 @@ func (w *Whisper) notifyPeersAboutBloomFilterChange(bloom []byte) {
}
}
func ( w * Whisper ) getPeers ( ) [ ] * Peer {
arr := make ( [ ] * Peer , len ( w . peers ) )
func ( whisper * Whisper ) getPeers ( ) [ ] * Peer {
arr := make ( [ ] * Peer , len ( whisper . peers ) )
i := 0
w . peerMu . Lock ( )
for p := range w . peers {
whisper . peerMu . Lock ( )
for p := range whisper . peers {
arr [ i ] = p
i ++
}
w . peerMu . Unlock ( )
whisper . peerMu . Unlock ( )
return arr
}
// getPeer retrieves peer by ID
func ( w * Whisper ) getPeer ( peerID [ ] byte ) ( * Peer , error ) {
w . peerMu . Lock ( )
defer w . peerMu . Unlock ( )
for p := range w . peers {
func ( whisper * Whisper ) getPeer ( peerID [ ] byte ) ( * Peer , error ) {
whisper . peerMu . Lock ( )
defer whisper . peerMu . Unlock ( )
for p := range whisper . peers {
id := p . peer . ID ( )
if bytes . Equal ( peerID , id [ : ] ) {
return p , nil
@ -328,8 +330,8 @@ func (w *Whisper) getPeer(peerID []byte) (*Peer, error) {
// AllowP2PMessagesFromPeer marks specific peer trusted,
// which will allow it to send historic (expired) messages.
func ( w * Whisper ) AllowP2PMessagesFromPeer ( peerID [ ] byte ) error {
p , err := w . getPeer ( peerID )
func ( whisper * Whisper ) AllowP2PMessagesFromPeer ( peerID [ ] byte ) error {
p , err := whisper . getPeer ( peerID )
if err != nil {
return err
}
@ -342,8 +344,8 @@ func (w *Whisper) AllowP2PMessagesFromPeer(peerID []byte) error {
// request and respond with a number of peer-to-peer messages (possibly expired),
// which are not supposed to be forwarded any further.
// The whisper protocol is agnostic of the format and contents of envelope.
func ( w * Whisper ) RequestHistoricMessages ( peerID [ ] byte , envelope * Envelope ) error {
p , err := w . getPeer ( peerID )
func ( whisper * Whisper ) RequestHistoricMessages ( peerID [ ] byte , envelope * Envelope ) error {
p , err := whisper . getPeer ( peerID )
if err != nil {
return err
}
@ -352,22 +354,22 @@ func (w *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) err
}
// SendP2PMessage sends a peer-to-peer message to a specific peer.
func ( w * Whisper ) SendP2PMessage ( peerID [ ] byte , envelope * Envelope ) error {
p , err := w . getPeer ( peerID )
func ( whisper * Whisper ) SendP2PMessage ( peerID [ ] byte , envelope * Envelope ) error {
p , err := whisper . getPeer ( peerID )
if err != nil {
return err
}
return w . SendP2PDirect ( p , envelope )
return whisper . SendP2PDirect ( p , envelope )
}
// SendP2PDirect sends a peer-to-peer message to a specific peer.
func ( w * Whisper ) SendP2PDirect ( peer * Peer , envelope * Envelope ) error {
func ( whisper * Whisper ) SendP2PDirect ( peer * Peer , envelope * Envelope ) error {
return p2p . Send ( peer . ws , p2pMessageCode , envelope )
}
// NewKeyPair generates a new cryptographic identity for the client, and injects
// it into the known identities for message decryption. Returns ID of the new key pair.
func ( w * Whisper ) NewKeyPair ( ) ( string , error ) {
func ( whisper * Whisper ) NewKeyPair ( ) ( string , error ) {
key , err := crypto . GenerateKey ( )
if err != nil || ! validatePrivateKey ( key ) {
key , err = crypto . GenerateKey ( ) // retry once
@ -384,55 +386,55 @@ func (w *Whisper) NewKeyPair() (string, error) {
return "" , fmt . Errorf ( "failed to generate ID: %s" , err )
}
w . keyMu . Lock ( )
defer w . keyMu . Unlock ( )
whisper . keyMu . Lock ( )
defer whisper . keyMu . Unlock ( )
if w . privateKeys [ id ] != nil {
if whisper . privateKeys [ id ] != nil {
return "" , fmt . Errorf ( "failed to generate unique ID" )
}
w . privateKeys [ id ] = key
whisper . privateKeys [ id ] = key
return id , nil
}
// DeleteKeyPair deletes the specified key if it exists.
func ( w * Whisper ) DeleteKeyPair ( key string ) bool {
w . keyMu . Lock ( )
defer w . keyMu . Unlock ( )
func ( whisper * Whisper ) DeleteKeyPair ( key string ) bool {
whisper . keyMu . Lock ( )
defer whisper . keyMu . Unlock ( )
if w . privateKeys [ key ] != nil {
delete ( w . privateKeys , key )
if whisper . privateKeys [ key ] != nil {
delete ( whisper . privateKeys , key )
return true
}
return false
}
// AddKeyPair imports a asymmetric private key and returns it identifier.
func ( w * Whisper ) AddKeyPair ( key * ecdsa . PrivateKey ) ( string , error ) {
func ( whisper * Whisper ) AddKeyPair ( key * ecdsa . PrivateKey ) ( string , error ) {
id , err := GenerateRandomID ( )
if err != nil {
return "" , fmt . Errorf ( "failed to generate ID: %s" , err )
}
w . keyMu . Lock ( )
w . privateKeys [ id ] = key
w . keyMu . Unlock ( )
whisper . keyMu . Lock ( )
whisper . privateKeys [ id ] = key
whisper . keyMu . Unlock ( )
return id , nil
}
// HasKeyPair checks if the the whisper node is configured with the private key
// of the specified public pair.
func ( w * Whisper ) HasKeyPair ( id string ) bool {
w . keyMu . RLock ( )
defer w . keyMu . RUnlock ( )
return w . privateKeys [ id ] != nil
func ( whisper * Whisper ) HasKeyPair ( id string ) bool {
whisper . keyMu . RLock ( )
defer whisper . keyMu . RUnlock ( )
return whisper . privateKeys [ id ] != nil
}
// GetPrivateKey retrieves the private key of the specified identity.
func ( w * Whisper ) GetPrivateKey ( id string ) ( * ecdsa . PrivateKey , error ) {
w . keyMu . RLock ( )
defer w . keyMu . RUnlock ( )
key := w . privateKeys [ id ]
func ( whisper * Whisper ) GetPrivateKey ( id string ) ( * ecdsa . PrivateKey , error ) {
whisper . keyMu . RLock ( )
defer whisper . keyMu . RUnlock ( )
key := whisper . privateKeys [ id ]
if key == nil {
return nil , fmt . Errorf ( "invalid id" )
}
@ -441,7 +443,7 @@ func (w *Whisper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) {
// GenerateSymKey generates a random symmetric key and stores it under id,
// which is then returned. Will be used in the future for session key exchange.
func ( w * Whisper ) GenerateSymKey ( ) ( string , error ) {
func ( whisper * Whisper ) GenerateSymKey ( ) ( string , error ) {
key := make ( [ ] byte , aesKeyLength )
_ , err := crand . Read ( key )
if err != nil {
@ -455,18 +457,18 @@ func (w *Whisper) GenerateSymKey() (string, error) {
return "" , fmt . Errorf ( "failed to generate ID: %s" , err )
}
w . keyMu . Lock ( )
defer w . keyMu . Unlock ( )
whisper . keyMu . Lock ( )
defer whisper . keyMu . Unlock ( )
if w . symKeys [ id ] != nil {
if whisper . symKeys [ id ] != nil {
return "" , fmt . Errorf ( "failed to generate unique ID" )
}
w . symKeys [ id ] = key
whisper . symKeys [ id ] = key
return id , nil
}
// AddSymKeyDirect stores the key, and returns its id.
func ( w * Whisper ) AddSymKeyDirect ( key [ ] byte ) ( string , error ) {
func ( whisper * Whisper ) AddSymKeyDirect ( key [ ] byte ) ( string , error ) {
if len ( key ) != aesKeyLength {
return "" , fmt . Errorf ( "wrong key size: %d" , len ( key ) )
}
@ -476,23 +478,23 @@ func (w *Whisper) AddSymKeyDirect(key []byte) (string, error) {
return "" , fmt . Errorf ( "failed to generate ID: %s" , err )
}
w . keyMu . Lock ( )
defer w . keyMu . Unlock ( )
whisper . keyMu . Lock ( )
defer whisper . keyMu . Unlock ( )
if w . symKeys [ id ] != nil {
if whisper . symKeys [ id ] != nil {
return "" , fmt . Errorf ( "failed to generate unique ID" )
}
w . symKeys [ id ] = key
whisper . symKeys [ id ] = key
return id , nil
}
// AddSymKeyFromPassword generates the key from password, stores it, and returns its id.
func ( w * Whisper ) AddSymKeyFromPassword ( password string ) ( string , error ) {
func ( whisper * Whisper ) AddSymKeyFromPassword ( password string ) ( string , error ) {
id , err := GenerateRandomID ( )
if err != nil {
return "" , fmt . Errorf ( "failed to generate ID: %s" , err )
}
if w . HasSymKey ( id ) {
if whisper . HasSymKey ( id ) {
return "" , fmt . Errorf ( "failed to generate unique ID" )
}
@ -503,59 +505,59 @@ func (w *Whisper) AddSymKeyFromPassword(password string) (string, error) {
return "" , err
}
w . keyMu . Lock ( )
defer w . keyMu . Unlock ( )
whisper . keyMu . Lock ( )
defer whisper . keyMu . Unlock ( )
// double check is necessary, because deriveKeyMaterial() is very slow
if w . symKeys [ id ] != nil {
if whisper . symKeys [ id ] != nil {
return "" , fmt . Errorf ( "critical error: failed to generate unique ID" )
}
w . symKeys [ id ] = derived
whisper . symKeys [ id ] = derived
return id , nil
}
// HasSymKey returns true if there is a key associated with the given id.
// Otherwise returns false.
func ( w * Whisper ) HasSymKey ( id string ) bool {
w . keyMu . RLock ( )
defer w . keyMu . RUnlock ( )
return w . symKeys [ id ] != nil
func ( whisper * Whisper ) HasSymKey ( id string ) bool {
whisper . keyMu . RLock ( )
defer whisper . keyMu . RUnlock ( )
return whisper . symKeys [ id ] != nil
}
// DeleteSymKey deletes the key associated with the name string if it exists.
func ( w * Whisper ) DeleteSymKey ( id string ) bool {
w . keyMu . Lock ( )
defer w . keyMu . Unlock ( )
if w . symKeys [ id ] != nil {
delete ( w . symKeys , id )
func ( whisper * Whisper ) DeleteSymKey ( id string ) bool {
whisper . keyMu . Lock ( )
defer whisper . keyMu . Unlock ( )
if whisper . symKeys [ id ] != nil {
delete ( whisper . symKeys , id )
return true
}
return false
}
// GetSymKey returns the symmetric key associated with the given id.
func ( w * Whisper ) GetSymKey ( id string ) ( [ ] byte , error ) {
w . keyMu . RLock ( )
defer w . keyMu . RUnlock ( )
if w . symKeys [ id ] != nil {
return w . symKeys [ id ] , nil
func ( whisper * Whisper ) GetSymKey ( id string ) ( [ ] byte , error ) {
whisper . keyMu . RLock ( )
defer whisper . keyMu . RUnlock ( )
if whisper . symKeys [ id ] != nil {
return whisper . symKeys [ id ] , nil
}
return nil , fmt . Errorf ( "non-existent key ID" )
}
// Subscribe installs a new message handler used for filtering, decrypting
// and subsequent storing of incoming messages.
func ( w * Whisper ) Subscribe ( f * Filter ) ( string , error ) {
s , err := w . filters . Install ( f )
func ( whisper * Whisper ) Subscribe ( f * Filter ) ( string , error ) {
s , err := whisper . filters . Install ( f )
if err == nil {
w . updateBloomFilter ( f )
whisper . updateBloomFilter ( f )
}
return s , err
}
// updateBloomFilter recalculates the new value of bloom filter,
// and informs the peers if necessary.
func ( w * Whisper ) updateBloomFilter ( f * Filter ) {
func ( whisper * Whisper ) updateBloomFilter ( f * Filter ) {
aggregate := make ( [ ] byte , bloomFilterSize )
for _ , t := range f . Topics {
top := BytesToTopic ( t )
@ -563,21 +565,21 @@ func (w *Whisper) updateBloomFilter(f *Filter) {
aggregate = addBloom ( aggregate , b )
}
if ! bloomFilterMatch ( w . BloomFilter ( ) , aggregate ) {
if ! bloomFilterMatch ( whisper . BloomFilter ( ) , aggregate ) {
// existing bloom filter must be updated
aggregate = addBloom ( w . BloomFilter ( ) , aggregate )
w . SetBloomFilter ( aggregate )
aggregate = addBloom ( whisper . BloomFilter ( ) , aggregate )
whisper . SetBloomFilter ( aggregate )
}
}
// GetFilter returns the filter by id.
func ( w * Whisper ) GetFilter ( id string ) * Filter {
return w . filters . Get ( id )
func ( whisper * Whisper ) GetFilter ( id string ) * Filter {
return whisper . filters . Get ( id )
}
// Unsubscribe removes an installed message handler.
func ( w * Whisper ) Unsubscribe ( id string ) error {
ok := w . filters . Uninstall ( id )
func ( whisper * Whisper ) Unsubscribe ( id string ) error {
ok := whisper . filters . Uninstall ( id )
if ! ok {
return fmt . Errorf ( "Unsubscribe: Invalid ID" )
}
@ -586,8 +588,8 @@ func (w *Whisper) Unsubscribe(id string) error {
// Send injects a message into the whisper send queue, to be distributed in the
// network in the coming cycles.
func ( w * Whisper ) Send ( envelope * Envelope ) error {
ok , err := w . add ( envelope )
func ( whisper * Whisper ) Send ( envelope * Envelope ) error {
ok , err := whisper . add ( envelope )
if err != nil {
return err
}
@ -599,13 +601,13 @@ func (w *Whisper) Send(envelope *Envelope) error {
// Start implements node.Service, starting the background data propagation thread
// of the Whisper protocol.
func ( w * Whisper ) Start ( * p2p . Server ) error {
func ( whisper * Whisper ) Start ( * p2p . Server ) error {
log . Info ( "started whisper v." + ProtocolVersionStr )
go w . update ( )
go whisper . update ( )
numCPU := runtime . NumCPU ( )
for i := 0 ; i < numCPU ; i ++ {
go w . processQueue ( )
go whisper . processQueue ( )
}
return nil
@ -613,26 +615,26 @@ func (w *Whisper) Start(*p2p.Server) error {
// Stop implements node.Service, stopping the background data propagation thread
// of the Whisper protocol.
func ( w * Whisper ) Stop ( ) error {
close ( w . quit )
func ( whisper * Whisper ) Stop ( ) error {
close ( whisper . quit )
log . Info ( "whisper stopped" )
return nil
}
// HandlePeer is called by the underlying P2P layer when the whisper sub-protocol
// connection is negotiated.
func ( wh * Whisper ) HandlePeer ( peer * p2p . Peer , rw p2p . MsgReadWriter ) error {
func ( whisper * Whisper ) HandlePeer ( peer * p2p . Peer , rw p2p . MsgReadWriter ) error {
// Create the new peer and start tracking it
whisperPeer := newPeer ( wh , peer , rw )
whisperPeer := newPeer ( whisper , peer , rw )
wh . peerMu . Lock ( )
wh . peers [ whisperPeer ] = struct { } { }
wh . peerMu . Unlock ( )
whisper . peerMu . Lock ( )
whisper . peers [ whisperPeer ] = struct { } { }
whisper . peerMu . Unlock ( )
defer func ( ) {
wh . peerMu . Lock ( )
delete ( wh . peers , whisperPeer )
wh . peerMu . Unlock ( )
whisper . peerMu . Lock ( )
delete ( whisper . peers , whisperPeer )
whisper . peerMu . Unlock ( )
} ( )
// Run the peer handshake and state updates
@ -642,11 +644,11 @@ func (wh *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
whisperPeer . start ( )
defer whisperPeer . stop ( )
return wh . runMessageLoop ( whisperPeer , rw )
return whisper . runMessageLoop ( whisperPeer , rw )
}
// runMessageLoop reads and processes inbound messages directly to merge into client-global state.
func ( wh * Whisper ) runMessageLoop ( p * Peer , rw p2p . MsgReadWriter ) error {
func ( whisper * Whisper ) runMessageLoop ( p * Peer , rw p2p . MsgReadWriter ) error {
for {
// fetch the next packet
packet , err := rw . ReadMsg ( )
@ -654,7 +656,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
log . Warn ( "message loop" , "peer" , p . peer . ID ( ) , "err" , err )
return err
}
if packet . Size > wh . MaxMessageSize ( ) {
if packet . Size > whisper . MaxMessageSize ( ) {
log . Warn ( "oversized message received" , "peer" , p . peer . ID ( ) )
return errors . New ( "oversized message received" )
}
@ -673,7 +675,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
trouble := false
for _ , env := range envelopes {
cached , err := wh . add ( env )
cached , err := whisper . add ( env )
if err != nil {
trouble = true
log . Error ( "bad envelope received, peer will be disconnected" , "peer" , p . peer . ID ( ) , "err" , err )
@ -726,17 +728,17 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
log . Warn ( "failed to decode direct message, peer will be disconnected" , "peer" , p . peer . ID ( ) , "err" , err )
return errors . New ( "invalid direct message" )
}
wh . postEvent ( & envelope , true )
whisper . postEvent ( & envelope , true )
}
case p2pRequestCode :
// Must be processed if mail server is implemented. Otherwise ignore.
if wh . mailServer != nil {
if whisper . mailServer != nil {
var request Envelope
if err := packet . Decode ( & request ) ; err != nil {
log . Warn ( "failed to decode p2p request message, peer will be disconnected" , "peer" , p . peer . ID ( ) , "err" , err )
return errors . New ( "invalid p2p request" )
}
wh . mailServer . DeliverMail ( p , & request )
whisper . mailServer . DeliverMail ( p , & request )
}
default :
// New message types might be implemented in the future versions of Whisper.
@ -750,128 +752,126 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
// add inserts a new envelope into the message pool to be distributed within the
// whisper network. It also inserts the envelope into the expiration pool at the
// appropriate time-stamp. In case of error, connection should be dropped.
func ( wh * Whisper ) add ( envelope * Envelope ) ( bool , error ) {
func ( whisper * Whisper ) add ( envelope * Envelope ) ( bool , error ) {
now := uint32 ( time . Now ( ) . Unix ( ) )
sent := envelope . Expiry - envelope . TTL
if sent > now {
if sent - DefaultSyncAllowance > now {
return false , fmt . Errorf ( "envelope created in the future [%x]" , envelope . Hash ( ) )
} else {
}
// recalculate PoW, adjusted for the time difference, plus one second for latency
envelope . calculatePoW ( sent - now + 1 )
}
}
if envelope . Expiry < now {
if envelope . Expiry + DefaultSyncAllowance * 2 < now {
return false , fmt . Errorf ( "very old message" )
} else {
}
log . Debug ( "expired envelope dropped" , "hash" , envelope . Hash ( ) . Hex ( ) )
return false , nil // drop envelope without error
}
}
if uint32 ( envelope . size ( ) ) > wh . MaxMessageSize ( ) {
if uint32 ( envelope . size ( ) ) > whisper . MaxMessageSize ( ) {
return false , fmt . Errorf ( "huge messages are not allowed [%x]" , envelope . Hash ( ) )
}
if envelope . PoW ( ) < wh . MinPow ( ) {
if envelope . PoW ( ) < whisper . MinPow ( ) {
// maybe the value was recently changed, and the peers did not adjust yet.
// in this case the previous value is retrieved by MinPowTolerance()
// for a short period of peer synchronization.
if envelope . PoW ( ) < wh . MinPowTolerance ( ) {
if envelope . PoW ( ) < whisper . MinPowTolerance ( ) {
return false , fmt . Errorf ( "envelope with low PoW received: PoW=%f, hash=[%v]" , envelope . PoW ( ) , envelope . Hash ( ) . Hex ( ) )
}
}
if ! bloomFilterMatch ( wh . BloomFilter ( ) , envelope . Bloom ( ) ) {
if ! bloomFilterMatch ( whisper . BloomFilter ( ) , envelope . Bloom ( ) ) {
// maybe the value was recently changed, and the peers did not adjust yet.
// in this case the previous value is retrieved by BloomFilterTolerance()
// for a short period of peer synchronization.
if ! bloomFilterMatch ( wh . BloomFilterTolerance ( ) , envelope . Bloom ( ) ) {
if ! bloomFilterMatch ( whisper . BloomFilterTolerance ( ) , envelope . Bloom ( ) ) {
return false , fmt . Errorf ( "envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x" ,
envelope . Hash ( ) . Hex ( ) , wh . BloomFilter ( ) , envelope . Bloom ( ) , envelope . Topic )
envelope . Hash ( ) . Hex ( ) , whisper . BloomFilter ( ) , envelope . Bloom ( ) , envelope . Topic )
}
}
hash := envelope . Hash ( )
wh . poolMu . Lock ( )
_ , alreadyCached := wh . envelopes [ hash ]
whisper . poolMu . Lock ( )
_ , alreadyCached := whisper . envelopes [ hash ]
if ! alreadyCached {
wh . envelopes [ hash ] = envelope
if wh . expirations [ envelope . Expiry ] == nil {
wh . expirations [ envelope . Expiry ] = set . NewNonTS ( )
whisper . envelopes [ hash ] = envelope
if whisper . expirations [ envelope . Expiry ] == nil {
whisper . expirations [ envelope . Expiry ] = set . NewNonTS ( )
}
if ! wh . expirations [ envelope . Expiry ] . Has ( hash ) {
wh . expirations [ envelope . Expiry ] . Add ( hash )
if ! whisper . expirations [ envelope . Expiry ] . Has ( hash ) {
whisper . expirations [ envelope . Expiry ] . Add ( hash )
}
}
wh . poolMu . Unlock ( )
whisper . poolMu . Unlock ( )
if alreadyCached {
log . Trace ( "whisper envelope already cached" , "hash" , envelope . Hash ( ) . Hex ( ) )
} else {
log . Trace ( "cached whisper envelope" , "hash" , envelope . Hash ( ) . Hex ( ) )
wh . statsMu . Lock ( )
wh . stats . memoryUsed += envelope . size ( )
wh . statsMu . Unlock ( )
wh . postEvent ( envelope , false ) // notify the local node about the new message
if wh . mailServer != nil {
wh . mailServer . Archive ( envelope )
whisper . statsMu . Lock ( )
whisper . stats . memoryUsed += envelope . size ( )
whisper . statsMu . Unlock ( )
whisper . postEvent ( envelope , false ) // notify the local node about the new message
if whisper . mailServer != nil {
whisper . mailServer . Archive ( envelope )
}
}
return true , nil
}
// postEvent queues the message for further processing.
func ( w * Whisper ) postEvent ( envelope * Envelope , isP2P bool ) {
func ( whisper * Whisper ) postEvent ( envelope * Envelope , isP2P bool ) {
if isP2P {
w . p2pMsgQueue <- envelope
whisper . p2pMsgQueue <- envelope
} else {
w . checkOverflow ( )
w . messageQueue <- envelope
whisper . checkOverflow ( )
whisper . messageQueue <- envelope
}
}
// checkOverflow checks if message queue overflow occurs and reports it if necessary.
func ( w * Whisper ) checkOverflow ( ) {
queueSize := len ( w . messageQueue )
func ( whisper * Whisper ) checkOverflow ( ) {
queueSize := len ( whisper . messageQueue )
if queueSize == messageQueueLimit {
if ! w . Overflow ( ) {
w . settings . Store ( overflowIdx , true )
if ! whisper . Overflow ( ) {
whisper . settings . Store ( overflowIdx , true )
log . Warn ( "message queue overflow" )
}
} else if queueSize <= messageQueueLimit / 2 {
if w . Overflow ( ) {
w . settings . Store ( overflowIdx , false )
if whisper . Overflow ( ) {
whisper . settings . Store ( overflowIdx , false )
log . Warn ( "message queue overflow fixed (back to normal)" )
}
}
}
// processQueue delivers the messages to the watchers during the lifetime of the whisper node.
func ( w * Whisper ) processQueue ( ) {
func ( whisper * Whisper ) processQueue ( ) {
var e * Envelope
for {
select {
case <- w . quit :
case <- whisper . quit :
return
case e = <- w . messageQueue :
w . filters . NotifyWatchers ( e , false )
case e = <- whisper . messageQueue :
whisper . filters . NotifyWatchers ( e , false )
case e = <- w . p2pMsgQueue :
w . filters . NotifyWatchers ( e , true )
case e = <- whisper . p2pMsgQueue :
whisper . filters . NotifyWatchers ( e , true )
}
}
}
// update loops until the lifetime of the whisper node, updating its internal
// state by expiring stale messages from the pool.
func ( w * Whisper ) update ( ) {
func ( whisper * Whisper ) update ( ) {
// Start a ticker to check for expirations
expire := time . NewTicker ( expirationCycle )
@ -879,9 +879,9 @@ func (w *Whisper) update() {
for {
select {
case <- expire . C :
w . expire ( )
whisper . expire ( )
case <- w . quit :
case <- whisper . quit :
return
}
}
@ -889,46 +889,46 @@ func (w *Whisper) update() {
// expire iterates over all the expiration timestamps, removing all stale
// messages from the pools.
func ( w * Whisper ) expire ( ) {
w . poolMu . Lock ( )
defer w . poolMu . Unlock ( )
func ( whisper * Whisper ) expire ( ) {
whisper . poolMu . Lock ( )
defer whisper . poolMu . Unlock ( )
w . statsMu . Lock ( )
defer w . statsMu . Unlock ( )
w . stats . reset ( )
whisper . statsMu . Lock ( )
defer whisper . statsMu . Unlock ( )
whisper . stats . reset ( )
now := uint32 ( time . Now ( ) . Unix ( ) )
for expiry , hashSet := range w . expirations {
for expiry , hashSet := range whisper . expirations {
if expiry < now {
// Dump all expired messages and remove timestamp
hashSet . Each ( func ( v interface { } ) bool {
sz := w . envelopes [ v . ( common . Hash ) ] . size ( )
delete ( w . envelopes , v . ( common . Hash ) )
w . stats . messagesCleared ++
w . stats . memoryCleared += sz
w . stats . memoryUsed -= sz
sz := whisper . envelopes [ v . ( common . Hash ) ] . size ( )
delete ( whisper . envelopes , v . ( common . Hash ) )
whisper . stats . messagesCleared ++
whisper . stats . memoryCleared += sz
whisper . stats . memoryUsed -= sz
return true
} )
w . expirations [ expiry ] . Clear ( )
delete ( w . expirations , expiry )
whisper . expirations [ expiry ] . Clear ( )
delete ( whisper . expirations , expiry )
}
}
}
// Stats returns the whisper node statistics.
func ( w * Whisper ) Stats ( ) Statistics {
w . statsMu . Lock ( )
defer w . statsMu . Unlock ( )
func ( whisper * Whisper ) Stats ( ) Statistics {
whisper . statsMu . Lock ( )
defer whisper . statsMu . Unlock ( )
return w . stats
return whisper . stats
}
// Envelopes retrieves all the messages currently pooled by the node.
func ( w * Whisper ) Envelopes ( ) [ ] * Envelope {
w . poolMu . RLock ( )
defer w . poolMu . RUnlock ( )
func ( whisper * Whisper ) Envelopes ( ) [ ] * Envelope {
whisper . poolMu . RLock ( )
defer whisper . poolMu . RUnlock ( )
all := make ( [ ] * Envelope , 0 , len ( w . envelopes ) )
for _ , envelope := range w . envelopes {
all := make ( [ ] * Envelope , 0 , len ( whisper . envelopes ) )
for _ , envelope := range whisper . envelopes {
all = append ( all , envelope )
}
return all
@ -936,13 +936,13 @@ func (w *Whisper) Envelopes() []*Envelope {
// Messages iterates through all currently floating envelopes
// and retrieves all the messages, that this filter could decrypt.
func ( w * Whisper ) Messages ( id string ) [ ] * ReceivedMessage {
func ( whisper * Whisper ) Messages ( id string ) [ ] * ReceivedMessage {
result := make ( [ ] * ReceivedMessage , 0 )
w . poolMu . RLock ( )
defer w . poolMu . RUnlock ( )
whisper . poolMu . RLock ( )
defer whisper . poolMu . RUnlock ( )
if filter := w . filters . Get ( id ) ; filter != nil {
for _ , env := range w . envelopes {
if filter := whisper . filters . Get ( id ) ; filter != nil {
for _ , env := range whisper . envelopes {
msg := filter . processEnvelope ( env )
if msg != nil {
result = append ( result , msg )
@ -953,11 +953,11 @@ func (w *Whisper) Messages(id string) []*ReceivedMessage {
}
// isEnvelopeCached checks if envelope with specific hash has already been received and cached.
func ( w * Whisper ) isEnvelopeCached ( hash common . Hash ) bool {
w . poolMu . Lock ( )
defer w . poolMu . Unlock ( )
func ( whisper * Whisper ) isEnvelopeCached ( hash common . Hash ) bool {
whisper . poolMu . Lock ( )
defer whisper . poolMu . Unlock ( )
_ , exist := w . envelopes [ hash ]
_ , exist := whisper . envelopes [ hash ]
return exist
}
@ -1019,7 +1019,7 @@ func BytesToUintBigEndian(b []byte) (res uint64) {
// GenerateRandomID generates a random string, which is then returned to be used as a key id
func GenerateRandomID ( ) ( id string , err error ) {
buf := make ( [ ] byte , keyId Size )
buf := make ( [ ] byte , keyID Size )
_ , err = crand . Read ( buf )
if err != nil {
return "" , err