From 7b501906db5b4bed0cf9972a1b103cc343d7f2d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 13 Apr 2015 11:31:51 +0300 Subject: [PATCH 01/20] whisper: separate out magic number from the code --- whisper/envelope.go | 18 ++++++------------ whisper/message.go | 17 ++++++++++++----- whisper/message_test.go | 16 ++++++++-------- whisper/whisper.go | 3 +++ 4 files changed, 29 insertions(+), 25 deletions(-) diff --git a/whisper/envelope.go b/whisper/envelope.go index f35a40a42e..c51c6e6004 100644 --- a/whisper/envelope.go +++ b/whisper/envelope.go @@ -11,7 +11,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/crypto/ecies" "github.com/ethereum/go-ethereum/rlp" ) @@ -85,27 +84,22 @@ func (self *Envelope) Open(key *ecdsa.PrivateKey) (msg *Message, err error) { } data = data[1:] - if message.Flags&128 == 128 { - if len(data) < 65 { - return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < 65") + if message.Flags&signatureFlag == signatureFlag { + if len(data) < signatureLength { + return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < len(signature)") } - message.Signature, data = data[:65], data[65:] + message.Signature, data = data[:signatureLength], data[signatureLength:] } message.Payload = data - // Short circuit if the encryption was requested + // Decrypt the message, if requested if key == nil { return message, nil } - // Otherwise try to decrypt the message - message.Payload, err = crypto.Decrypt(key, message.Payload) - switch err { + switch message.decrypt(key) { case nil: return message, nil - case ecies.ErrInvalidPublicKey: // Payload isn't encrypted - return message, err - default: return nil, fmt.Errorf("unable to open envelope, decrypt failed: %v", err) } diff --git a/whisper/message.go b/whisper/message.go index 2666ee6e00..457cf6def5 100644 --- a/whisper/message.go +++ b/whisper/message.go @@ -35,8 +35,9 @@ type Options struct { // NewMessage creates and initializes a non-signed, non-encrypted Whisper message. func NewMessage(payload []byte) *Message { - // Construct an initial flag set: bit #1 = 0 (no signature), rest random - flags := byte(rand.Intn(128)) + // Construct an initial flag set: no signature, rest random + flags := byte(rand.Intn(256)) + flags &= ^signatureFlag // Assemble and return the message return &Message{ @@ -84,7 +85,7 @@ func (self *Message) Wrap(pow time.Duration, options Options) (*Envelope, error) // sign calculates and sets the cryptographic signature for the message , also // setting the sign flag. func (self *Message) sign(key *ecdsa.PrivateKey) (err error) { - self.Flags |= 1 << 7 + self.Flags |= signatureFlag self.Signature, err = crypto.Sign(self.hash(), key) return } @@ -102,8 +103,14 @@ func (self *Message) Recover() *ecdsa.PublicKey { } // encrypt encrypts a message payload with a public key. -func (self *Message) encrypt(to *ecdsa.PublicKey) (err error) { - self.Payload, err = crypto.Encrypt(to, self.Payload) +func (self *Message) encrypt(key *ecdsa.PublicKey) (err error) { + self.Payload, err = crypto.Encrypt(key, self.Payload) + return +} + +// decrypt decrypts an encrypted payload with a private key. +func (self *Message) decrypt(key *ecdsa.PrivateKey) (err error) { + self.Payload, err = crypto.Decrypt(key, self.Payload) return } diff --git a/whisper/message_test.go b/whisper/message_test.go index 8d4c5e9907..319bc6025b 100644 --- a/whisper/message_test.go +++ b/whisper/message_test.go @@ -16,8 +16,8 @@ func TestMessageSimpleWrap(t *testing.T) { if _, err := msg.Wrap(DefaultProofOfWork, Options{}); err != nil { t.Fatalf("failed to wrap message: %v", err) } - if msg.Flags&128 != 0 { - t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 0) + if msg.Flags&signatureFlag != 0 { + t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, 0) } if len(msg.Signature) != 0 { t.Fatalf("signature found for simple wrapping: 0x%x", msg.Signature) @@ -41,8 +41,8 @@ func TestMessageCleartextSignRecover(t *testing.T) { }); err != nil { t.Fatalf("failed to sign message: %v", err) } - if msg.Flags&128 != 128 { - t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 1) + if msg.Flags&signatureFlag != signatureFlag { + t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, signatureFlag) } if bytes.Compare(msg.Payload, payload) != 0 { t.Fatalf("payload mismatch after signing: have 0x%x, want 0x%x", msg.Payload, payload) @@ -75,8 +75,8 @@ func TestMessageAnonymousEncryptDecrypt(t *testing.T) { if err != nil { t.Fatalf("failed to encrypt message: %v", err) } - if msg.Flags&128 != 0 { - t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 0) + if msg.Flags&signatureFlag != 0 { + t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, 0) } if len(msg.Signature) != 0 { t.Fatalf("signature found for anonymous message: 0x%x", msg.Signature) @@ -111,8 +111,8 @@ func TestMessageFullCrypto(t *testing.T) { if err != nil { t.Fatalf("failed to encrypt message: %v", err) } - if msg.Flags&128 != 128 { - t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 1) + if msg.Flags&signatureFlag != signatureFlag { + t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, signatureFlag) } if len(msg.Signature) == 0 { t.Fatalf("no signature found for signed message") diff --git a/whisper/whisper.go b/whisper/whisper.go index d803e27d46..ad29fe16a5 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -20,6 +20,9 @@ const ( statusMsg = 0x0 envelopesMsg = 0x01 whisperVersion = 0x02 + + signatureFlag = byte(1 << 7) + signatureLength = 65 ) type MessageEvent struct { From 9a53390f49b9667db162bf2ef487d0af64b3363d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 13 Apr 2015 12:16:51 +0300 Subject: [PATCH 02/20] whisper: clean up and integrate topics --- whisper/envelope.go | 4 ++-- whisper/filter.go | 2 +- whisper/message.go | 7 ++++++- whisper/topic.go | 35 +++++++++++++++++++++++++++++++++++ whisper/topic_test.go | 38 ++++++++++++++++++++++++++++++++++++++ whisper/util.go | 36 ------------------------------------ whisper/whisper.go | 6 +++--- 7 files changed, 85 insertions(+), 43 deletions(-) create mode 100644 whisper/topic.go create mode 100644 whisper/topic_test.go delete mode 100644 whisper/util.go diff --git a/whisper/envelope.go b/whisper/envelope.go index c51c6e6004..93e3ea1a3f 100644 --- a/whisper/envelope.go +++ b/whisper/envelope.go @@ -19,7 +19,7 @@ import ( type Envelope struct { Expiry uint32 // Whisper protocol specifies int32, really should be int64 TTL uint32 // ^^^^^^ - Topics [][]byte + Topics []Topic Data []byte Nonce uint32 @@ -28,7 +28,7 @@ type Envelope struct { // NewEnvelope wraps a Whisper message with expiration and destination data // included into an envelope for network forwarding. -func NewEnvelope(ttl time.Duration, topics [][]byte, msg *Message) *Envelope { +func NewEnvelope(ttl time.Duration, topics []Topic, msg *Message) *Envelope { return &Envelope{ Expiry: uint32(time.Now().Add(ttl).Unix()), TTL: uint32(ttl.Seconds()), diff --git a/whisper/filter.go b/whisper/filter.go index b33f2c1a25..7258de3e79 100644 --- a/whisper/filter.go +++ b/whisper/filter.go @@ -5,6 +5,6 @@ import "crypto/ecdsa" type Filter struct { To *ecdsa.PublicKey From *ecdsa.PublicKey - Topics [][]byte + Topics []Topic Fn func(*Message) } diff --git a/whisper/message.go b/whisper/message.go index 457cf6def5..ad31aa5927 100644 --- a/whisper/message.go +++ b/whisper/message.go @@ -75,8 +75,13 @@ func (self *Message) Wrap(pow time.Duration, options Options) (*Envelope, error) return nil, err } } + // Convert the user topic into whisper ones + topics := make([]Topic, len(options.Topics)) + for i, topic := range options.Topics { + topics[i] = NewTopic(topic) + } // Wrap the processed message, seal it and return - envelope := NewEnvelope(options.TTL, options.Topics, self) + envelope := NewEnvelope(options.TTL, topics, self) envelope.Seal(pow) return envelope, nil diff --git a/whisper/topic.go b/whisper/topic.go new file mode 100644 index 0000000000..10069c9020 --- /dev/null +++ b/whisper/topic.go @@ -0,0 +1,35 @@ +// Contains the Whisper protocol Topic element. For formal details please see +// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#topics. + +package whisper + +import "github.com/ethereum/go-ethereum/crypto" + +// Topic represents a cryptographically secure, probabilistic partial +// classifications of a message, determined as the first (left) 4 bytes of the +// SHA3 hash of some arbitrary data given by the original author of the message. +type Topic [4]byte + +// NewTopic creates a topic from the 4 byte prefix of the SHA3 hash of the data. +func NewTopic(data []byte) Topic { + prefix := [4]byte{} + copy(prefix[:], crypto.Sha3(data)[:4]) + return Topic(prefix) +} + +// String converts a topic byte array to a string representation. +func (self *Topic) String() string { + return string(self[:]) +} + +// TopicSet represents a hash set to check if a topic exists or not. +type TopicSet map[string]struct{} + +// NewTopicSet creates a topic hash set from a slice of topics. +func NewTopicSet(topics []Topic) TopicSet { + set := make(map[string]struct{}) + for _, topic := range topics { + set[topic.String()] = struct{}{} + } + return TopicSet(set) +} diff --git a/whisper/topic_test.go b/whisper/topic_test.go new file mode 100644 index 0000000000..4626e2ae55 --- /dev/null +++ b/whisper/topic_test.go @@ -0,0 +1,38 @@ +package whisper + +import ( + "bytes" + "testing" +) + +var topicCreationTests = []struct { + data []byte + hash [4]byte +}{ + {hash: [4]byte{0xc5, 0xd2, 0x46, 0x01}, data: nil}, + {hash: [4]byte{0xc5, 0xd2, 0x46, 0x01}, data: []byte{}}, + {hash: [4]byte{0x8f, 0x9a, 0x2b, 0x7d}, data: []byte("test name")}, +} + +func TestTopicCreation(t *testing.T) { + for i, tt := range topicCreationTests { + topic := NewTopic(tt.data) + if bytes.Compare(topic[:], tt.hash[:]) != 0 { + t.Errorf("test %d: hash mismatch: have %v, want %v.", i, topic, tt.hash) + } + } +} + +func TestTopicSetCreation(t *testing.T) { + topics := make([]Topic, len(topicCreationTests)) + for i, tt := range topicCreationTests { + topics[i] = NewTopic(tt.data) + } + set := NewTopicSet(topics) + for i, tt := range topicCreationTests { + topic := NewTopic(tt.data) + if _, ok := set[topic.String()]; !ok { + t.Errorf("topic %d: not found in set", i) + } + } +} diff --git a/whisper/util.go b/whisper/util.go deleted file mode 100644 index 7a222395fe..0000000000 --- a/whisper/util.go +++ /dev/null @@ -1,36 +0,0 @@ -package whisper - -import "github.com/ethereum/go-ethereum/crypto" - -func hashTopic(topic []byte) []byte { - return crypto.Sha3(topic)[:4] -} - -// NOTE this isn't DRY, but I don't want to iterate twice. - -// Returns a formatted topics byte slice. -// data: unformatted data (e.g., no hashes needed) -func Topics(data [][]byte) [][]byte { - d := make([][]byte, len(data)) - for i, byts := range data { - d[i] = hashTopic(byts) - } - return d -} - -func TopicsFromString(data ...string) [][]byte { - d := make([][]byte, len(data)) - for i, str := range data { - d[i] = hashTopic([]byte(str)) - } - return d -} - -func bytesToMap(s [][]byte) map[string]struct{} { - m := make(map[string]struct{}) - for _, topic := range s { - m[string(topic)] = struct{}{} - } - - return m -} diff --git a/whisper/whisper.go b/whisper/whisper.go index ad29fe16a5..d9affe09b1 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -119,7 +119,7 @@ func (self *Whisper) Watch(opts Filter) int { return self.filters.Install(filter.Generic{ Str1: string(crypto.FromECDSAPub(opts.To)), Str2: string(crypto.FromECDSAPub(opts.From)), - Data: bytesToMap(opts.Topics), + Data: NewTopicSet(opts.Topics), Fn: func(data interface{}) { opts.Fn(data.(*Message)) }, @@ -272,9 +272,9 @@ func (self *Whisper) Protocol() p2p.Protocol { return self.protocol } -func createFilter(message *Message, topics [][]byte, key *ecdsa.PrivateKey) filter.Filter { +func createFilter(message *Message, topics []Topic, key *ecdsa.PrivateKey) filter.Filter { return filter.Generic{ Str1: string(crypto.FromECDSAPub(&key.PublicKey)), Str2: string(crypto.FromECDSAPub(message.Recover())), - Data: bytesToMap(topics), + Data: NewTopicSet(topics), } } From 89358d25a4e27194638bfd1c685cc8d88910786e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 13 Apr 2015 13:15:01 +0300 Subject: [PATCH 03/20] whisper: start adding integration tests --- whisper/whisper_test.go | 136 ++++++++++++++++++++++++++++++++++------ 1 file changed, 118 insertions(+), 18 deletions(-) diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go index b29e34a5e3..27c57eee18 100644 --- a/whisper/whisper_test.go +++ b/whisper/whisper_test.go @@ -4,35 +4,135 @@ import ( "fmt" "testing" "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/nat" ) -func TestEvent(t *testing.T) { - res := make(chan *Message, 1) - whisper := New() - id := whisper.NewIdentity() - whisper.Watch(Filter{ - To: &id.PublicKey, +type testNode struct { + server *p2p.Server + client *Whisper +} + +func startNodes(n int) ([]*testNode, error) { + cluster := make([]*testNode, 0, n) + for i := 0; i < n; i++ { + shh := New() + + // Generate the node identity + key, err := crypto.GenerateKey() + if err != nil { + return nil, err + } + name := common.MakeName(fmt.Sprintf("whisper-go-test-%d", i), "1.0") + + // Create an Ethereum server to communicate through + server := &p2p.Server{ + PrivateKey: key, + MaxPeers: 10, + Name: name, + Protocols: []p2p.Protocol{shh.Protocol()}, + ListenAddr: fmt.Sprintf(":%d", 30300+i), + NAT: nat.Any(), + } + if err := server.Start(); err != nil { + return nil, err + } + // Peer online, store and iterate + cluster = append(cluster, &testNode{ + server: server, + client: shh, + }) + } + return cluster, nil +} + +func stopNodes(cluster []*testNode) { + for _, node := range cluster { + node.server.Stop() + } +} + +func TestSelfMessage(t *testing.T) { + cluster, err := startNodes(1) + if err != nil { + t.Fatalf("failed to boot test cluster: %v", err) + } + defer stopNodes(cluster) + + client := cluster[0].client + + // Start watching for self messages, signal any arrivals + self := client.NewIdentity() + done := make(chan struct{}) + + client.Watch(Filter{ + To: &self.PublicKey, Fn: func(msg *Message) { - res <- msg + close(done) }, }) - - msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now()))) + // Send a dummy message to oneself + msg := NewMessage([]byte("hello whisper")) envelope, err := msg.Wrap(DefaultProofOfWork, Options{ + From: self, + To: &self.PublicKey, TTL: DefaultTimeToLive, - From: id, - To: &id.PublicKey, }) if err != nil { - fmt.Println(err) - t.FailNow() + t.Fatalf("failed to wrap message: %v", err) + } + // Dump the message into the system and wait for it to pop back out + if err := client.Send(envelope); err != nil { + t.Fatalf("failed to send self-message: %v", err) } + select { + case <-done: + case <-time.After(time.Second): + t.Fatalf("self-message receive timeout") + } +} - tick := time.NewTicker(time.Second) - whisper.postEvent(envelope) +func TestDirectMessage(t *testing.T) { + cluster, err := startNodes(2) + if err != nil { + t.Fatalf("failed to boot test cluster: %v", err) + } + defer stopNodes(cluster) + + sender := cluster[0].client + senderId := sender.NewIdentity() + + recipient := cluster[1].client + recipientId := recipient.NewIdentity() + + // Watch for arriving messages on the recipient + done := make(chan struct{}) + recipient.Watch(Filter{ + To: &recipientId.PublicKey, + Fn: func(msg *Message) { + close(done) + }, + }) + // Send a dummy message from the sender + msg := NewMessage([]byte("hello whisper")) + envelope, err := msg.Wrap(DefaultProofOfWork, Options{ + From: senderId, + To: &recipientId.PublicKey, + TTL: DefaultTimeToLive, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := sender.Send(envelope); err != nil { + t.Fatalf("failed to send direct message: %v", err) + } + // Wait for an arrival or a timeout select { - case <-res: - case <-tick.C: - t.Error("did not receive message") + case <-done: + case <-time.After(time.Second): + t.Fatalf("direct message receive timeout") } } From cb707ba50ce8626aa1b0e87d7526416a9592852a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 13 Apr 2015 16:19:34 +0300 Subject: [PATCH 04/20] whisper: push work in progress for bug report --- whisper/whisper.go | 13 +++++++------ whisper/whisper_test.go | 12 ++++++++++++ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/whisper/whisper.go b/whisper/whisper.go index d9affe09b1..f3b539d2c0 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -25,17 +25,19 @@ const ( signatureLength = 65 ) +const ( + DefaultTimeToLive = 50 * time.Second + DefaultProofOfWork = 50 * time.Millisecond +) + type MessageEvent struct { To *ecdsa.PrivateKey From *ecdsa.PublicKey Message *Message } -const ( - DefaultTimeToLive = 50 * time.Second - DefaultProofOfWork = 50 * time.Millisecond -) - +// Whisper represents a dark communication interface through the Ethereum +// network, using its very own P2P communication layer. type Whisper struct { protocol p2p.Protocol filters *filter.Filters @@ -199,7 +201,6 @@ func (self *Whisper) add(envelope *Envelope) error { self.expiry[envelope.Expiry].Add(hash) go self.postEvent(envelope) } - glog.V(logger.Detail).Infof("added whisper envelope %x\n", envelope) return nil diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go index 27c57eee18..5c29956cfa 100644 --- a/whisper/whisper_test.go +++ b/whisper/whisper_test.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/nat" ) @@ -17,6 +18,7 @@ type testNode struct { } func startNodes(n int) ([]*testNode, error) { + // Start up the cluster of nodes cluster := make([]*testNode, 0, n) for i := 0; i < n; i++ { shh := New() @@ -46,6 +48,11 @@ func startNodes(n int) ([]*testNode, error) { client: shh, }) } + // Manually wire together the cluster nodes + root := cluster[0].server.Self() + for _, node := range cluster[1:] { + node.server.SuggestPeer(root) + } return cluster, nil } @@ -56,6 +63,7 @@ func stopNodes(cluster []*testNode) { } func TestSelfMessage(t *testing.T) { + // Start the single node cluster cluster, err := startNodes(1) if err != nil { t.Fatalf("failed to boot test cluster: %v", err) @@ -96,6 +104,10 @@ func TestSelfMessage(t *testing.T) { } func TestDirectMessage(t *testing.T) { + glog.SetV(6) + glog.SetToStderr(true) + + // Start the sender-recipient cluster cluster, err := startNodes(2) if err != nil { t.Fatalf("failed to boot test cluster: %v", err) From 4af7743663fa3e444668b90878f64d0df4316deb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 14 Apr 2015 11:12:09 +0300 Subject: [PATCH 05/20] whisper: add utility functions for creating topics --- whisper/message.go | 9 ++------- whisper/topic.go | 26 ++++++++++++++++++++++++++ whisper/topic_test.go | 31 ++++++++++++++++++++++++++++++- 3 files changed, 58 insertions(+), 8 deletions(-) diff --git a/whisper/message.go b/whisper/message.go index ad31aa5927..a4de18f650 100644 --- a/whisper/message.go +++ b/whisper/message.go @@ -30,7 +30,7 @@ type Options struct { From *ecdsa.PrivateKey To *ecdsa.PublicKey TTL time.Duration - Topics [][]byte + Topics []Topic } // NewMessage creates and initializes a non-signed, non-encrypted Whisper message. @@ -75,13 +75,8 @@ func (self *Message) Wrap(pow time.Duration, options Options) (*Envelope, error) return nil, err } } - // Convert the user topic into whisper ones - topics := make([]Topic, len(options.Topics)) - for i, topic := range options.Topics { - topics[i] = NewTopic(topic) - } // Wrap the processed message, seal it and return - envelope := NewEnvelope(options.TTL, topics, self) + envelope := NewEnvelope(options.TTL, options.Topics, self) envelope.Seal(pow) return envelope, nil diff --git a/whisper/topic.go b/whisper/topic.go index 10069c9020..7792e437fa 100644 --- a/whisper/topic.go +++ b/whisper/topic.go @@ -17,6 +17,32 @@ func NewTopic(data []byte) Topic { return Topic(prefix) } +// NewTopics creates a list of topics from a list of binary data elements, by +// iteratively calling NewTopic on each of them. +func NewTopics(data ...[]byte) []Topic { + topics := make([]Topic, len(data)) + for i, element := range data { + topics[i] = NewTopic(element) + } + return topics +} + +// NewTopicFromString creates a topic using the binary data contents of the +// specified string. +func NewTopicFromString(data string) Topic { + return NewTopic([]byte(data)) +} + +// NewTopicsFromStrings creates a list of topics from a list of textual data +// elements, by iteratively calling NewTopicFromString on each of them. +func NewTopicsFromStrings(data ...string) []Topic { + topics := make([]Topic, len(data)) + for i, element := range data { + topics[i] = NewTopicFromString(element) + } + return topics +} + // String converts a topic byte array to a string representation. func (self *Topic) String() string { return string(self[:]) diff --git a/whisper/topic_test.go b/whisper/topic_test.go index 4626e2ae55..5f85839872 100644 --- a/whisper/topic_test.go +++ b/whisper/topic_test.go @@ -15,10 +15,39 @@ var topicCreationTests = []struct { } func TestTopicCreation(t *testing.T) { + // Create the topics individually for i, tt := range topicCreationTests { topic := NewTopic(tt.data) if bytes.Compare(topic[:], tt.hash[:]) != 0 { - t.Errorf("test %d: hash mismatch: have %v, want %v.", i, topic, tt.hash) + t.Errorf("binary test %d: hash mismatch: have %v, want %v.", i, topic, tt.hash) + } + } + for i, tt := range topicCreationTests { + topic := NewTopicFromString(string(tt.data)) + if bytes.Compare(topic[:], tt.hash[:]) != 0 { + t.Errorf("textual test %d: hash mismatch: have %v, want %v.", i, topic, tt.hash) + } + } + // Create the topics in batches + binaryData := make([][]byte, len(topicCreationTests)) + for i, tt := range topicCreationTests { + binaryData[i] = tt.data + } + textualData := make([]string, len(topicCreationTests)) + for i, tt := range topicCreationTests { + textualData[i] = string(tt.data) + } + + topics := NewTopics(binaryData...) + for i, tt := range topicCreationTests { + if bytes.Compare(topics[i][:], tt.hash[:]) != 0 { + t.Errorf("binary batch test %d: hash mismatch: have %v, want %v.", i, topics[i], tt.hash) + } + } + topics = NewTopicsFromStrings(textualData...) + for i, tt := range topicCreationTests { + if bytes.Compare(topics[i][:], tt.hash[:]) != 0 { + t.Errorf("textual batch test %d: hash mismatch: have %v, want %v.", i, topics[i], tt.hash) } } } From 5205b2f19b9173580f9a9e727d74e202b8dd0f67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 14 Apr 2015 12:12:47 +0300 Subject: [PATCH 06/20] whisper: fix anonymous broadcast drop, add broadcast tests --- whisper/envelope.go | 7 ++++- whisper/whisper.go | 46 +++++++++++++++++---------- whisper/whisper_test.go | 70 ++++++++++++++++++++++++++++++++++++----- 3 files changed, 99 insertions(+), 24 deletions(-) diff --git a/whisper/envelope.go b/whisper/envelope.go index 93e3ea1a3f..9daaf64908 100644 --- a/whisper/envelope.go +++ b/whisper/envelope.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/ecies" "github.com/ethereum/go-ethereum/rlp" ) @@ -96,10 +97,14 @@ func (self *Envelope) Open(key *ecdsa.PrivateKey) (msg *Message, err error) { if key == nil { return message, nil } - switch message.decrypt(key) { + err = message.decrypt(key) + switch err { case nil: return message, nil + case ecies.ErrInvalidPublicKey: // Payload isn't encrypted + return message, err + default: return nil, fmt.Errorf("unable to open envelope, decrypt failed: %v", err) } diff --git a/whisper/whisper.go b/whisper/whisper.go index f3b539d2c0..a4ec943e8f 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -136,8 +136,8 @@ func (self *Whisper) Messages(id int) (messages []*Message) { filter := self.filters.Get(id) if filter != nil { for _, e := range self.messages { - if msg, key := self.open(e); msg != nil { - f := createFilter(msg, e.Topics, key) + if msg := self.open(e); msg != nil { + f := createFilter(msg, e.Topics) if self.filters.Match(filter, f) { messages = append(messages, msg) } @@ -251,31 +251,45 @@ func (self *Whisper) envelopes() (envelopes []*Envelope) { return } +func (self *Whisper) Protocol() p2p.Protocol { + return self.protocol +} + +// postEvent opens an envelope with the configured identities and delivers the +// message upstream from application processing. func (self *Whisper) postEvent(envelope *Envelope) { - if message, key := self.open(envelope); message != nil { - self.filters.Notify(createFilter(message, envelope.Topics, key), message) + if message := self.open(envelope); message != nil { + self.filters.Notify(createFilter(message, envelope.Topics), message) } } -func (self *Whisper) open(envelope *Envelope) (*Message, *ecdsa.PrivateKey) { +// open tries to decrypt a whisper envelope with all the configured identities, +// returning the decrypted message and the key used to achieve it. If not keys +// are configured, open will return the payload as if non encrypted. +func (self *Whisper) open(envelope *Envelope) *Message { + // Short circuit if no identity is set, and assume clear-text + if len(self.keys) == 0 { + if message, err := envelope.Open(nil); err == nil { + return message + } + } + // Iterate over the keys and try to decrypt the message for _, key := range self.keys { - if message, err := envelope.Open(key); err == nil || (err != nil && err == ecies.ErrInvalidPublicKey) { + message, err := envelope.Open(key) + if err == nil || err == ecies.ErrInvalidPublicKey { message.To = &key.PublicKey - - return message, key + return message } } - - return nil, nil -} - -func (self *Whisper) Protocol() p2p.Protocol { - return self.protocol + // Failed to decrypt, don't return anything + return nil } -func createFilter(message *Message, topics []Topic, key *ecdsa.PrivateKey) filter.Filter { +// createFilter creates a message filter to check against installed handlers. +func createFilter(message *Message, topics []Topic) filter.Filter { return filter.Generic{ - Str1: string(crypto.FromECDSAPub(&key.PublicKey)), Str2: string(crypto.FromECDSAPub(message.Recover())), + Str1: string(crypto.FromECDSAPub(message.To)), + Str2: string(crypto.FromECDSAPub(message.Recover())), Data: NewTopicSet(topics), } } diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go index 5c29956cfa..3f903a9dc5 100644 --- a/whisper/whisper_test.go +++ b/whisper/whisper_test.go @@ -7,7 +7,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/nat" ) @@ -83,7 +82,7 @@ func TestSelfMessage(t *testing.T) { }, }) // Send a dummy message to oneself - msg := NewMessage([]byte("hello whisper")) + msg := NewMessage([]byte("self whisper")) envelope, err := msg.Wrap(DefaultProofOfWork, Options{ From: self, To: &self.PublicKey, @@ -104,9 +103,6 @@ func TestSelfMessage(t *testing.T) { } func TestDirectMessage(t *testing.T) { - glog.SetV(6) - glog.SetToStderr(true) - // Start the sender-recipient cluster cluster, err := startNodes(2) if err != nil { @@ -129,7 +125,7 @@ func TestDirectMessage(t *testing.T) { }, }) // Send a dummy message from the sender - msg := NewMessage([]byte("hello whisper")) + msg := NewMessage([]byte("direct whisper")) envelope, err := msg.Wrap(DefaultProofOfWork, Options{ From: senderId, To: &recipientId.PublicKey, @@ -139,7 +135,7 @@ func TestDirectMessage(t *testing.T) { t.Fatalf("failed to wrap message: %v", err) } if err := sender.Send(envelope); err != nil { - t.Fatalf("failed to send direct message: %v", err) + t.Fatalf("failed to send direct message: %v", err) } // Wait for an arrival or a timeout select { @@ -148,3 +144,63 @@ func TestDirectMessage(t *testing.T) { t.Fatalf("direct message receive timeout") } } + +func TestAnonymousBroadcast(t *testing.T) { + testBroadcast(true, t) +} + +func TestIdentifiedBroadcast(t *testing.T) { + testBroadcast(false, t) +} + +func testBroadcast(anonymous bool, t *testing.T) { + // Start the single sender multi recipient cluster + cluster, err := startNodes(3) + if err != nil { + t.Fatalf("failed to boot test cluster: %v", err) + } + defer stopNodes(cluster) + + sender := cluster[0].client + targets := make([]*Whisper, len(cluster)-1) + for i, node := range cluster[1:] { + targets[i] = node.client + if !anonymous { + targets[i].NewIdentity() + } + } + // Watch for arriving messages on the recipients + dones := make([]chan struct{}, len(targets)) + for i := 0; i < len(targets); i++ { + done := make(chan struct{}) // need for the closure + dones[i] = done + + targets[i].Watch(Filter{ + Topics: NewTopicsFromStrings("broadcast topic"), + Fn: func(msg *Message) { + close(done) + }, + }) + } + // Send a dummy message from the sender + msg := NewMessage([]byte("broadcast whisper")) + envelope, err := msg.Wrap(DefaultProofOfWork, Options{ + Topics: NewTopicsFromStrings("broadcast topic"), + TTL: DefaultTimeToLive, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := sender.Send(envelope); err != nil { + t.Fatalf("failed to send broadcast message: %v", err) + } + // Wait for an arrival on each recipient, or timeouts + timeout := time.After(time.Second) + for _, done := range dones { + select { + case <-done: + case <-timeout: + t.Fatalf("broadcast message receive timeout") + } + } +} From 59bff465053312013253f8c4288b6fb0c1e3e4e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 14 Apr 2015 13:24:43 +0300 Subject: [PATCH 07/20] whisper: general cleanups, documentation --- whisper/envelope.go | 12 +-- whisper/filter.go | 11 +- whisper/peer.go | 4 - whisper/whisper.go | 248 ++++++++++++++++++++++++-------------------- 4 files changed, 146 insertions(+), 129 deletions(-) diff --git a/whisper/envelope.go b/whisper/envelope.go index 9daaf64908..0a817e26ef 100644 --- a/whisper/envelope.go +++ b/whisper/envelope.go @@ -24,7 +24,7 @@ type Envelope struct { Data []byte Nonce uint32 - hash common.Hash + hash common.Hash // Cached hash of the envelope to avoid rehashing every time } // NewEnvelope wraps a Whisper message with expiration and destination data @@ -59,16 +59,6 @@ func (self *Envelope) Seal(pow time.Duration) { } } -// valid checks whether the claimed proof of work was indeed executed. -// TODO: Is this really useful? Isn't this always true? -func (self *Envelope) valid() bool { - d := make([]byte, 64) - copy(d[:32], self.rlpWithoutNonce()) - binary.BigEndian.PutUint32(d[60:], self.Nonce) - - return common.FirstBitSet(common.BigD(crypto.Sha3(d))) > 0 -} - // rlpWithoutNonce returns the RLP encoded envelope contents, except the nonce. func (self *Envelope) rlpWithoutNonce() []byte { enc, _ := rlp.EncodeToBytes([]interface{}{self.Expiry, self.TTL, self.Topics, self.Data}) diff --git a/whisper/filter.go b/whisper/filter.go index 7258de3e79..8fcc45afd2 100644 --- a/whisper/filter.go +++ b/whisper/filter.go @@ -1,10 +1,13 @@ +// Contains the message filter for fine grained subscriptions. + package whisper import "crypto/ecdsa" +// Filter is used to subscribe to specific types of whisper messages. type Filter struct { - To *ecdsa.PublicKey - From *ecdsa.PublicKey - Topics []Topic - Fn func(*Message) + To *ecdsa.PublicKey // Recipient of the message + From *ecdsa.PublicKey // Sender of the message + Topics []Topic // Topics to watch messages on + Fn func(*Message) // Handler in case of a match } diff --git a/whisper/peer.go b/whisper/peer.go index 338166c25f..e50c9ec37a 100644 --- a/whisper/peer.go +++ b/whisper/peer.go @@ -9,10 +9,6 @@ import ( "gopkg.in/fatih/set.v0" ) -const ( - protocolVersion uint64 = 0x02 -) - type peer struct { host *Whisper peer *p2p.Peer diff --git a/whisper/whisper.go b/whisper/whisper.go index a4ec943e8f..f51f14a9f9 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -2,7 +2,6 @@ package whisper import ( "crypto/ecdsa" - "errors" "sync" "time" @@ -17,12 +16,16 @@ import ( ) const ( - statusMsg = 0x0 - envelopesMsg = 0x01 - whisperVersion = 0x02 + statusMsg = 0x00 + envelopesMsg = 0x01 + + protocolVersion uint64 = 0x02 + protocolName = "shh" signatureFlag = byte(1 << 7) signatureLength = 65 + + expirationTicks = 800 * time.Millisecond ) const ( @@ -42,9 +45,9 @@ type Whisper struct { protocol p2p.Protocol filters *filter.Filters - mmu sync.RWMutex - messages map[common.Hash]*Envelope - expiry map[uint32]*set.SetNonTS + mmu sync.RWMutex // Message mutex to sync the below pool + messages map[common.Hash]*Envelope // Pool of messages currently tracked by this node + expiry map[uint32]*set.SetNonTS // Message expiration pool (TODO: something lighter) quit chan struct{} @@ -63,8 +66,8 @@ func New() *Whisper { // p2p whisper sub protocol handler whisper.protocol = p2p.Protocol{ - Name: "shh", - Version: uint(whisperVersion), + Name: protocolName, + Version: uint(protocolVersion), Length: 2, Run: whisper.msgHandler, } @@ -72,42 +75,74 @@ func New() *Whisper { return whisper } -func (self *Whisper) Version() uint { - return self.protocol.Version -} - -func (self *Whisper) Start() { - glog.V(logger.Info).Infoln("Whisper started") - go self.update() -} - -func (self *Whisper) Stop() { - close(self.quit) +// Protocol returns the whisper sub-protocol handler for this particular client. +func (self *Whisper) Protocol() p2p.Protocol { + return self.protocol } -func (self *Whisper) Send(envelope *Envelope) error { - return self.add(envelope) +// Version returns the whisper sub-protocols version number. +func (self *Whisper) Version() uint { + return self.protocol.Version } +// NewIdentity generates a new cryptographic identity for the client, and injects +// it into the known identities for message decryption. func (self *Whisper) NewIdentity() *ecdsa.PrivateKey { key, err := crypto.GenerateKey() if err != nil { panic(err) } - self.keys[string(crypto.FromECDSAPub(&key.PublicKey))] = key return key } +// HasIdentity checks if the the whisper node is configured with the private key +// of the specified public pair. func (self *Whisper) HasIdentity(key *ecdsa.PublicKey) bool { return self.keys[string(crypto.FromECDSAPub(key))] != nil } +// GetIdentity retrieves the private key of the specified public identity. func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey { return self.keys[string(crypto.FromECDSAPub(key))] } +// Watch installs a new message handler to run in case a matching packet arrives +// from the whisper network. +func (self *Whisper) Watch(options Filter) int { + filter := filter.Generic{ + Str1: string(crypto.FromECDSAPub(options.To)), + Str2: string(crypto.FromECDSAPub(options.From)), + Data: NewTopicSet(options.Topics), + Fn: func(data interface{}) { + options.Fn(data.(*Message)) + }, + } + return self.filters.Install(filter) +} + +// Unwatch removes an installed message handler. +func (self *Whisper) Unwatch(id int) { + self.filters.Uninstall(id) +} + +// Send injects a message into the whisper send queue, to be distributed in the +// network in the coming cycles. +func (self *Whisper) Send(envelope *Envelope) error { + return self.add(envelope) +} + +func (self *Whisper) Start() { + glog.V(logger.Info).Infoln("Whisper started") + go self.update() +} + +func (self *Whisper) Stop() { + close(self.quit) + glog.V(logger.Info).Infoln("Whisper stopped") +} + // func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool { // k := string(crypto.FromECDSAPub(key)) // if _, ok := self.keys[k]; ok { @@ -117,22 +152,7 @@ func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey { // return false // } -func (self *Whisper) Watch(opts Filter) int { - return self.filters.Install(filter.Generic{ - Str1: string(crypto.FromECDSAPub(opts.To)), - Str2: string(crypto.FromECDSAPub(opts.From)), - Data: NewTopicSet(opts.Topics), - Fn: func(data interface{}) { - opts.Fn(data.(*Message)) - }, - }) -} - -func (self *Whisper) Unwatch(id int) { - self.filters.Uninstall(id) -} - -func (self *Whisper) Messages(id int) (messages []*Message) { +/*func (self *Whisper) Messages(id int) (messages []*Message) { filter := self.filters.Get(id) if filter != nil { for _, e := range self.messages { @@ -146,6 +166,36 @@ func (self *Whisper) Messages(id int) (messages []*Message) { } return +}*/ + +// add inserts a new envelope into the message pool to be distributed within the +// whisper network. It also inserts the envelope into the expiration pool at the +// appropriate time-stamp. +func (self *Whisper) add(envelope *Envelope) error { + self.mmu.Lock() + defer self.mmu.Unlock() + + // Insert the message into the tracked pool + hash := envelope.Hash() + if _, ok := self.messages[hash]; ok { + glog.V(logger.Detail).Infof("whisper envelope already cached: %x\n", envelope) + return nil + } + self.messages[hash] = envelope + + // Insert the message into the expiration pool for later removal + if self.expiry[envelope.Expiry] == nil { + self.expiry[envelope.Expiry] = set.NewNonTS() + } + if !self.expiry[envelope.Expiry].Has(hash) { + self.expiry[envelope.Expiry].Add(hash) + + // Notify the local node of a message arrival + go self.postEvent(envelope) + } + glog.V(logger.Detail).Infof("cached whisper envelope %x\n", envelope) + + return nil } // Main handler for passing whisper messages to whisper peer objects @@ -182,53 +232,76 @@ func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { } } -// takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed. -func (self *Whisper) add(envelope *Envelope) error { - if !envelope.valid() { - return errors.New("invalid pow provided for envelope") +// postEvent opens an envelope with the configured identities and delivers the +// message upstream from application processing. +func (self *Whisper) postEvent(envelope *Envelope) { + if message := self.open(envelope); message != nil { + self.filters.Notify(createFilter(message, envelope.Topics), message) } +} - self.mmu.Lock() - defer self.mmu.Unlock() - - hash := envelope.Hash() - self.messages[hash] = envelope - if self.expiry[envelope.Expiry] == nil { - self.expiry[envelope.Expiry] = set.NewNonTS() +// open tries to decrypt a whisper envelope with all the configured identities, +// returning the decrypted message and the key used to achieve it. If not keys +// are configured, open will return the payload as if non encrypted. +func (self *Whisper) open(envelope *Envelope) *Message { + // Short circuit if no identity is set, and assume clear-text + if len(self.keys) == 0 { + if message, err := envelope.Open(nil); err == nil { + return message + } } - - if !self.expiry[envelope.Expiry].Has(hash) { - self.expiry[envelope.Expiry].Add(hash) - go self.postEvent(envelope) + // Iterate over the keys and try to decrypt the message + for _, key := range self.keys { + message, err := envelope.Open(key) + if err == nil || err == ecies.ErrInvalidPublicKey { + message.To = &key.PublicKey + return message + } } - glog.V(logger.Detail).Infof("added whisper envelope %x\n", envelope) - + // Failed to decrypt, don't return anything return nil } +// createFilter creates a message filter to check against installed handlers. +func createFilter(message *Message, topics []Topic) filter.Filter { + return filter.Generic{ + Str1: string(crypto.FromECDSAPub(message.To)), + Str2: string(crypto.FromECDSAPub(message.Recover())), + Data: NewTopicSet(topics), + } +} + +// update loops until the lifetime of the whisper node, updating its internal +// state by expiring stale messages from the pool. func (self *Whisper) update() { - expire := time.NewTicker(800 * time.Millisecond) -out: + // Start a ticker to check for expirations + expire := time.NewTicker(expirationTicks) + + // Repeat updates until termination is requested for { select { case <-expire.C: self.expire() + case <-self.quit: - break out + return } } } +// expire iterates over all the expiration timestamps, removing all stale +// messages from the pools. func (self *Whisper) expire() { self.mmu.Lock() defer self.mmu.Unlock() now := uint32(time.Now().Unix()) for then, hashSet := range self.expiry { + // Short circuit if a future time if then > now { continue } - + // Dump all expired messages and remove timestamp hashSet.Each(func(v interface{}) bool { delete(self.messages, v.(common.Hash)) return true @@ -237,59 +310,14 @@ func (self *Whisper) expire() { } } -func (self *Whisper) envelopes() (envelopes []*Envelope) { +// envelopes retrieves all the messages currently pooled by the node. +func (self *Whisper) envelopes() []*Envelope { self.mmu.RLock() defer self.mmu.RUnlock() - envelopes = make([]*Envelope, len(self.messages)) - i := 0 + envelopes := make([]*Envelope, 0, len(self.messages)) for _, envelope := range self.messages { - envelopes[i] = envelope - i++ - } - - return -} - -func (self *Whisper) Protocol() p2p.Protocol { - return self.protocol -} - -// postEvent opens an envelope with the configured identities and delivers the -// message upstream from application processing. -func (self *Whisper) postEvent(envelope *Envelope) { - if message := self.open(envelope); message != nil { - self.filters.Notify(createFilter(message, envelope.Topics), message) - } -} - -// open tries to decrypt a whisper envelope with all the configured identities, -// returning the decrypted message and the key used to achieve it. If not keys -// are configured, open will return the payload as if non encrypted. -func (self *Whisper) open(envelope *Envelope) *Message { - // Short circuit if no identity is set, and assume clear-text - if len(self.keys) == 0 { - if message, err := envelope.Open(nil); err == nil { - return message - } - } - // Iterate over the keys and try to decrypt the message - for _, key := range self.keys { - message, err := envelope.Open(key) - if err == nil || err == ecies.ErrInvalidPublicKey { - message.To = &key.PublicKey - return message - } - } - // Failed to decrypt, don't return anything - return nil -} - -// createFilter creates a message filter to check against installed handlers. -func createFilter(message *Message, topics []Topic) filter.Filter { - return filter.Generic{ - Str1: string(crypto.FromECDSAPub(message.To)), - Str2: string(crypto.FromECDSAPub(message.Recover())), - Data: NewTopicSet(topics), + envelopes = append(envelopes, envelope) } + return envelopes } From e2b7498c9dca34a1ebe29ba2eb6d5e1e2b48df5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 14 Apr 2015 14:28:59 +0300 Subject: [PATCH 08/20] whisper: add known message expiration to peers, cleanup --- whisper/peer.go | 163 +++++++++++++++++++++++++++++---------------- whisper/whisper.go | 77 ++++++++++----------- 2 files changed, 145 insertions(+), 95 deletions(-) diff --git a/whisper/peer.go b/whisper/peer.go index e50c9ec37a..f077dbe70b 100644 --- a/whisper/peer.go +++ b/whisper/peer.go @@ -4,106 +4,155 @@ import ( "fmt" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" "gopkg.in/fatih/set.v0" ) +// peer represents a whisper protocol peer connection. type peer struct { host *Whisper peer *p2p.Peer ws p2p.MsgReadWriter - // XXX Eventually this is going to reach exceptional large space. We need an expiry here - known *set.Set + known *set.Set // Messages already known by the peer to avoid wasting bandwidth quit chan struct{} } -func NewPeer(host *Whisper, p *p2p.Peer, ws p2p.MsgReadWriter) *peer { - return &peer{host, p, ws, set.New(), make(chan struct{})} -} - -func (self *peer) init() error { - if err := self.handleStatus(); err != nil { - return err +// newPeer creates and initializes a new whisper peer connection, returning either +// the newly constructed link or a failure reason. +func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) (*peer, error) { + p := &peer{ + host: host, + peer: remote, + ws: rw, + known: set.New(), + quit: make(chan struct{}), } - - return nil + if err := p.handshake(); err != nil { + return nil, err + } + return p, nil } +// start initiates the peer updater, periodically broadcasting the whisper packets +// into the network. func (self *peer) start() { go self.update() self.peer.Debugln("whisper started") } +// stop terminates the peer updater, stopping message forwarding to it. func (self *peer) stop() { + close(self.quit) self.peer.Debugln("whisper stopped") +} - close(self.quit) +// handshake sends the protocol initiation status message to the remote peer and +// verifies the remote status too. +func (self *peer) handshake() error { + // Send own status message, fetch remote one + if err := p2p.SendItems(self.ws, statusCode, protocolVersion); err != nil { + return err + } + packet, err := self.ws.ReadMsg() + if err != nil { + return err + } + if packet.Code != statusCode { + return fmt.Errorf("peer sent %x before status packet", packet.Code) + } + // Decode the rest of the status packet and verify protocol match + s := rlp.NewStream(packet.Payload) + if _, err := s.List(); err != nil { + return fmt.Errorf("bad status message: %v", err) + } + peerVersion, err := s.Uint() + if err != nil { + return fmt.Errorf("bad status message: %v", err) + } + if peerVersion != protocolVersion { + return fmt.Errorf("protocol version mismatch %d != %d", peerVersion, protocolVersion) + } + return packet.Discard() // ignore anything after protocol version } +// update executes periodic operations on the peer, including message transmission +// and expiration. func (self *peer) update() { - relay := time.NewTicker(300 * time.Millisecond) -out: + // Start the tickers for the updates + expire := time.NewTicker(expirationTicks) + transmit := time.NewTicker(transmissionTicks) + + // Loop and transmit until termination is requested for { select { - case <-relay.C: - err := self.broadcast(self.host.envelopes()) - if err != nil { - self.peer.Infoln("broadcast err:", err) - break out + case <-expire.C: + self.expire() + + case <-transmit.C: + if err := self.broadcast(); err != nil { + self.peer.Infoln("broadcast failed:", err) + return } case <-self.quit: - break out + return } } } -func (self *peer) broadcast(envelopes []*Envelope) error { - envs := make([]*Envelope, 0, len(envelopes)) - for _, env := range envelopes { - if !self.known.Has(env.Hash()) { - envs = append(envs, env) - self.known.Add(env.Hash()) - } - } - if len(envs) > 0 { - if err := p2p.Send(self.ws, envelopesMsg, envs); err != nil { - return err - } - self.peer.DebugDetailln("broadcasted", len(envs), "message(s)") - } - return nil +// mark marks an envelope known to the peer so that it won't be sent back. +func (self *peer) mark(envelope *Envelope) { + self.known.Add(envelope.Hash()) } -func (self *peer) addKnown(envelope *Envelope) { - self.known.Add(envelope.Hash()) +// marked checks if an envelope is already known to the remote peer. +func (self *peer) marked(envelope *Envelope) bool { + return self.known.Has(envelope.Hash()) } -func (self *peer) handleStatus() error { - ws := self.ws - if err := p2p.SendItems(ws, statusMsg, protocolVersion); err != nil { - return err - } - msg, err := ws.ReadMsg() - if err != nil { - return err - } - if msg.Code != statusMsg { - return fmt.Errorf("peer send %x before status msg", msg.Code) +// expire iterates over all the known envelopes in the host and removes all +// expired (unknown) ones from the known list. +func (self *peer) expire() { + // Assemble the list of available envelopes + available := set.NewNonTS() + for _, envelope := range self.host.envelopes() { + available.Add(envelope.Hash()) } - s := rlp.NewStream(msg.Payload) - if _, err := s.List(); err != nil { - return fmt.Errorf("bad status message: %v", err) + // Cross reference availability with known status + unmark := make(map[common.Hash]struct{}) + self.known.Each(func(v interface{}) bool { + if !available.Has(v.(common.Hash)) { + unmark[v.(common.Hash)] = struct{}{} + } + return true + }) + // Dump all known but unavailable + for hash, _ := range unmark { + self.known.Remove(hash) } - pv, err := s.Uint() - if err != nil { - return fmt.Errorf("bad status message: %v", err) +} + +// broadcast iterates over the collection of envelopes and transmits yet unknown +// ones over the network. +func (self *peer) broadcast() error { + // Fetch the envelopes and collect the unknown ones + envelopes := self.host.envelopes() + transmit := make([]*Envelope, 0, len(envelopes)) + for _, envelope := range envelopes { + if !self.marked(envelope) { + transmit = append(transmit, envelope) + self.mark(envelope) + } } - if pv != protocolVersion { - return fmt.Errorf("protocol version mismatch %d != %d", pv, protocolVersion) + // Transmit the unknown batch (potentially empty) + if err := p2p.Send(self.ws, messagesCode, transmit); err != nil { + return err } - return msg.Discard() // ignore anything after protocol version + self.peer.DebugDetailln("broadcasted", len(transmit), "message(s)") + + return nil } diff --git a/whisper/whisper.go b/whisper/whisper.go index f51f14a9f9..e56c457861 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -16,8 +16,8 @@ import ( ) const ( - statusMsg = 0x00 - envelopesMsg = 0x01 + statusCode = 0x00 + messagesCode = 0x01 protocolVersion uint64 = 0x02 protocolName = "shh" @@ -25,7 +25,8 @@ const ( signatureFlag = byte(1 << 7) signatureLength = 65 - expirationTicks = 800 * time.Millisecond + expirationTicks = 800 * time.Millisecond + transmissionTicks = 300 * time.Millisecond ) const ( @@ -69,7 +70,7 @@ func New() *Whisper { Name: protocolName, Version: uint(protocolVersion), Length: 2, - Run: whisper.msgHandler, + Run: whisper.handlePeer, } return whisper @@ -168,6 +169,40 @@ func (self *Whisper) Stop() { return }*/ +// handlePeer is called by the underlying P2P layer when the whisper sub-protocol +// connection is negotiated. +func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { + // Create, initialize and start the whisper peer + whisperPeer, err := newPeer(self, peer, rw) + if err != nil { + return err + } + whisperPeer.start() + defer whisperPeer.stop() + + // Read and process inbound messages directly to merge into client-global state + for { + // Fetch the next packet and decode the contained envelopes + packet, err := rw.ReadMsg() + if err != nil { + return err + } + var envelopes []*Envelope + if err := packet.Decode(&envelopes); err != nil { + peer.Infof("failed to decode enveloped: %v", err) + continue + } + // Inject all envelopes into the internal pool + for _, envelope := range envelopes { + if err := self.add(envelope); err != nil { + // TODO Punish peer here. Invalid envelope. + peer.Debugf("failed to pool envelope: %f", err) + } + whisperPeer.mark(envelope) + } + } +} + // add inserts a new envelope into the message pool to be distributed within the // whisper network. It also inserts the envelope into the expiration pool at the // appropriate time-stamp. @@ -198,40 +233,6 @@ func (self *Whisper) add(envelope *Envelope) error { return nil } -// Main handler for passing whisper messages to whisper peer objects -func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { - wpeer := NewPeer(self, peer, ws) - // initialise whisper peer (handshake/status) - if err := wpeer.init(); err != nil { - return err - } - // kick of the main handler for broadcasting/managing envelopes - go wpeer.start() - defer wpeer.stop() - - // Main *read* loop. Writing is done by the peer it self. - for { - msg, err := ws.ReadMsg() - if err != nil { - return err - } - - var envelopes []*Envelope - if err := msg.Decode(&envelopes); err != nil { - peer.Infoln(err) - continue - } - - for _, envelope := range envelopes { - if err := self.add(envelope); err != nil { - // TODO Punish peer here. Invalid envelope. - peer.Debugln(err) - } - wpeer.addKnown(envelope) - } - } -} - // postEvent opens an envelope with the configured identities and delivers the // message upstream from application processing. func (self *Whisper) postEvent(envelope *Envelope) { From 1a4cfc173eb3f62c5859d25744f3c0de119e1b59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 14 Apr 2015 15:02:31 +0300 Subject: [PATCH 09/20] whisper, xeth/whisper, ui/qt/qwhispe: fix API polish breakages --- ui/qt/qwhisper/whisper.go | 4 ++-- whisper/whisper.go | 31 +++++++++++++++---------------- xeth/whisper.go | 4 ++-- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/ui/qt/qwhisper/whisper.go b/ui/qt/qwhisper/whisper.go index 3c2d0a4b91..50b0626f56 100644 --- a/ui/qt/qwhisper/whisper.go +++ b/ui/qt/qwhisper/whisper.go @@ -41,7 +41,7 @@ func (self *Whisper) Post(payload []string, to, from string, topics []string, pr TTL: time.Duration(ttl) * time.Second, To: crypto.ToECDSAPub(common.FromHex(to)), From: key, - Topics: whisper.TopicsFromString(topics...), + Topics: whisper.NewTopicsFromStrings(topics...), }) if err != nil { @@ -106,7 +106,7 @@ func filterFromMap(opts map[string]interface{}) (f whisper.Filter) { if topicList, ok := opts["topics"].(*qml.List); ok { var topics []string topicList.Convert(&topics) - f.Topics = whisper.TopicsFromString(topics...) + f.Topics = whisper.NewTopicsFromStrings(topics...) } return diff --git a/whisper/whisper.go b/whisper/whisper.go index e56c457861..2634a23a4a 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -144,6 +144,21 @@ func (self *Whisper) Stop() { glog.V(logger.Info).Infoln("Whisper stopped") } +// Messages retrieves the currently pooled messages matching a filter id. +func (self *Whisper) Messages(id int) []*Message { + messages := make([]*Message, 0) + if filter := self.filters.Get(id); filter != nil { + for _, envelope := range self.messages { + if message := self.open(envelope); message != nil { + if self.filters.Match(filter, createFilter(message, envelope.Topics)) { + messages = append(messages, message) + } + } + } + } + return messages +} + // func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool { // k := string(crypto.FromECDSAPub(key)) // if _, ok := self.keys[k]; ok { @@ -153,22 +168,6 @@ func (self *Whisper) Stop() { // return false // } -/*func (self *Whisper) Messages(id int) (messages []*Message) { - filter := self.filters.Get(id) - if filter != nil { - for _, e := range self.messages { - if msg := self.open(e); msg != nil { - f := createFilter(msg, e.Topics) - if self.filters.Match(filter, f) { - messages = append(messages, msg) - } - } - } - } - - return -}*/ - // handlePeer is called by the underlying P2P layer when the whisper sub-protocol // connection is negotiated. func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { diff --git a/xeth/whisper.go b/xeth/whisper.go index 51caec8d62..342910b5c3 100644 --- a/xeth/whisper.go +++ b/xeth/whisper.go @@ -36,7 +36,7 @@ func (self *Whisper) Post(payload string, to, from string, topics []string, prio TTL: time.Duration(ttl) * time.Second, To: crypto.ToECDSAPub(common.FromHex(to)), From: key, - Topics: whisper.TopicsFromString(topics...), + Topics: whisper.NewTopicsFromStrings(topics...), }) if err != nil { @@ -71,7 +71,7 @@ func (self *Whisper) Watch(opts *Options) int { filter := whisper.Filter{ To: crypto.ToECDSAPub(common.FromHex(opts.To)), From: crypto.ToECDSAPub(common.FromHex(opts.From)), - Topics: whisper.TopicsFromString(opts.Topics...), + Topics: whisper.NewTopicsFromStrings(opts.Topics...), } var i int From f6efdd8aad96db24f29268b4393e95644dbbd18a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 14 Apr 2015 15:16:02 +0300 Subject: [PATCH 10/20] whisper: shorten constants to TTL and PoW --- whisper/main.go | 4 ++-- whisper/message.go | 2 +- whisper/message_test.go | 8 ++++---- whisper/whisper.go | 4 ++-- whisper/whisper_test.go | 12 ++++++------ 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/whisper/main.go b/whisper/main.go index 422f0fa3bc..3c8c3801f7 100644 --- a/whisper/main.go +++ b/whisper/main.go @@ -69,10 +69,10 @@ func selfSend(shh *whisper.Whisper, payload []byte) error { }) // Wrap the payload and encrypt it msg := whisper.NewMessage(payload) - envelope, err := msg.Wrap(whisper.DefaultProofOfWork, whisper.Options{ + envelope, err := msg.Wrap(whisper.DefaultPoW, whisper.Options{ From: id, To: &id.PublicKey, - TTL: whisper.DefaultTimeToLive, + TTL: whisper.DefaultTTL, }) if err != nil { return fmt.Errorf("failed to seal message: %v", err) diff --git a/whisper/message.go b/whisper/message.go index a4de18f650..0513627b49 100644 --- a/whisper/message.go +++ b/whisper/message.go @@ -62,7 +62,7 @@ func NewMessage(payload []byte) *Message { func (self *Message) Wrap(pow time.Duration, options Options) (*Envelope, error) { // Use the default TTL if non was specified if options.TTL == 0 { - options.TTL = DefaultTimeToLive + options.TTL = DefaultTTL } // Sign and encrypt the message if requested if options.From != nil { diff --git a/whisper/message_test.go b/whisper/message_test.go index 319bc6025b..18a254e5c9 100644 --- a/whisper/message_test.go +++ b/whisper/message_test.go @@ -13,7 +13,7 @@ func TestMessageSimpleWrap(t *testing.T) { payload := []byte("hello world") msg := NewMessage(payload) - if _, err := msg.Wrap(DefaultProofOfWork, Options{}); err != nil { + if _, err := msg.Wrap(DefaultPoW, Options{}); err != nil { t.Fatalf("failed to wrap message: %v", err) } if msg.Flags&signatureFlag != 0 { @@ -36,7 +36,7 @@ func TestMessageCleartextSignRecover(t *testing.T) { payload := []byte("hello world") msg := NewMessage(payload) - if _, err := msg.Wrap(DefaultProofOfWork, Options{ + if _, err := msg.Wrap(DefaultPoW, Options{ From: key, }); err != nil { t.Fatalf("failed to sign message: %v", err) @@ -69,7 +69,7 @@ func TestMessageAnonymousEncryptDecrypt(t *testing.T) { payload := []byte("hello world") msg := NewMessage(payload) - envelope, err := msg.Wrap(DefaultProofOfWork, Options{ + envelope, err := msg.Wrap(DefaultPoW, Options{ To: &key.PublicKey, }) if err != nil { @@ -104,7 +104,7 @@ func TestMessageFullCrypto(t *testing.T) { payload := []byte("hello world") msg := NewMessage(payload) - envelope, err := msg.Wrap(DefaultProofOfWork, Options{ + envelope, err := msg.Wrap(DefaultPoW, Options{ From: fromKey, To: &toKey.PublicKey, }) diff --git a/whisper/whisper.go b/whisper/whisper.go index 2634a23a4a..b83e3f1c62 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -30,8 +30,8 @@ const ( ) const ( - DefaultTimeToLive = 50 * time.Second - DefaultProofOfWork = 50 * time.Millisecond + DefaultTTL = 50 * time.Second + DefaultPoW = 50 * time.Millisecond ) type MessageEvent struct { diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go index 3f903a9dc5..97644fd27d 100644 --- a/whisper/whisper_test.go +++ b/whisper/whisper_test.go @@ -83,10 +83,10 @@ func TestSelfMessage(t *testing.T) { }) // Send a dummy message to oneself msg := NewMessage([]byte("self whisper")) - envelope, err := msg.Wrap(DefaultProofOfWork, Options{ + envelope, err := msg.Wrap(DefaultPoW, Options{ From: self, To: &self.PublicKey, - TTL: DefaultTimeToLive, + TTL: DefaultTTL, }) if err != nil { t.Fatalf("failed to wrap message: %v", err) @@ -126,10 +126,10 @@ func TestDirectMessage(t *testing.T) { }) // Send a dummy message from the sender msg := NewMessage([]byte("direct whisper")) - envelope, err := msg.Wrap(DefaultProofOfWork, Options{ + envelope, err := msg.Wrap(DefaultPoW, Options{ From: senderId, To: &recipientId.PublicKey, - TTL: DefaultTimeToLive, + TTL: DefaultTTL, }) if err != nil { t.Fatalf("failed to wrap message: %v", err) @@ -184,9 +184,9 @@ func testBroadcast(anonymous bool, t *testing.T) { } // Send a dummy message from the sender msg := NewMessage([]byte("broadcast whisper")) - envelope, err := msg.Wrap(DefaultProofOfWork, Options{ + envelope, err := msg.Wrap(DefaultPoW, Options{ Topics: NewTopicsFromStrings("broadcast topic"), - TTL: DefaultTimeToLive, + TTL: DefaultTTL, }) if err != nil { t.Fatalf("failed to wrap message: %v", err) From 86372b20c0995abe0343ce5f453be113e5f192d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 14 Apr 2015 18:21:56 +0300 Subject: [PATCH 11/20] whisper: add basic tests for the whiper peers --- whisper/common_test.go | 38 +++++++++ whisper/peer_test.go | 183 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 221 insertions(+) create mode 100644 whisper/common_test.go create mode 100644 whisper/peer_test.go diff --git a/whisper/common_test.go b/whisper/common_test.go new file mode 100644 index 0000000000..8c12f8aebd --- /dev/null +++ b/whisper/common_test.go @@ -0,0 +1,38 @@ +// Contains some common utility functions for testing. + +package whisper + +import ( + "fmt" + "math/rand" + + "github.com/ethereum/go-ethereum/p2p" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +// randomNodeID generates and returns a random P2P discovery node id for the +// whisper tests. +func randomNodeID() (id discover.NodeID) { + for i := range id { + id[i] = byte(rand.Intn(255)) + } + return id +} + +// randomNodeName generates and returns a random P2P node name for the whisper +// tests. +func randomNodeName() string { + return common.MakeName(fmt.Sprintf("whisper-go-test-%3d", rand.Intn(999)), "1.0") +} + +// whisperCaps returns the node capabilities for running the whisper sub-protocol. +func whisperCaps() []p2p.Cap { + return []p2p.Cap{ + p2p.Cap{ + Name: protocolName, + Version: uint(protocolVersion), + }, + } +} diff --git a/whisper/peer_test.go b/whisper/peer_test.go new file mode 100644 index 0000000000..c9092dd238 --- /dev/null +++ b/whisper/peer_test.go @@ -0,0 +1,183 @@ +package whisper + +import ( + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p" +) + +type testPeer struct { + client *Whisper + stream *p2p.MsgPipeRW + termed chan struct{} +} + +func startTestPeer() *testPeer { + // Create a simulated P2P remote peer and data streams to it + remote := p2p.NewPeer(randomNodeID(), randomNodeName(), whisperCaps()) + tester, tested := p2p.MsgPipe() + + // Create a whisper client and connect with it to the tester peer + client := New() + client.Start() + + termed := make(chan struct{}) + go func() { + defer client.Stop() + defer close(termed) + defer tested.Close() + + client.handlePeer(remote, tested) + }() + // Assemble and return the test peer + return &testPeer{ + client: client, + stream: tester, + termed: termed, + } +} + +func TestPeerStatusMessage(t *testing.T) { + tester := startTestPeer() + + // Wait for the handshake status message and check it + if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil { + t.Fatalf("status message mismatch: %v", err) + } + // Terminate the node + tester.stream.Close() + + select { + case <-tester.termed: + case <-time.After(time.Second): + t.Fatalf("local close timed out") + } +} + +func TestPeerHandshakeFail(t *testing.T) { + tester := startTestPeer() + + // Wait for and check the handshake + if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil { + t.Fatalf("status message mismatch: %v", err) + } + // Send an invalid handshake status and verify disconnect + if err := p2p.SendItems(tester.stream, messagesCode); err != nil { + t.Fatalf("failed to send malformed status: %v", err) + } + select { + case <-tester.termed: + case <-time.After(time.Second): + t.Fatalf("remote close timed out") + } +} + +func TestPeerHandshakeSuccess(t *testing.T) { + tester := startTestPeer() + + // Wait for and check the handshake + if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil { + t.Fatalf("status message mismatch: %v", err) + } + // Send a valid handshake status and make sure connection stays live + if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil { + t.Fatalf("failed to send status: %v", err) + } + select { + case <-tester.termed: + t.Fatalf("valid handshake disconnected") + + case <-time.After(100 * time.Millisecond): + } + // Clean up the test + tester.stream.Close() + + select { + case <-tester.termed: + case <-time.After(time.Second): + t.Fatalf("local close timed out") + } +} + +func TestPeerSend(t *testing.T) { + // Start a tester and execute the handshake + tester := startTestPeer() + defer tester.stream.Close() + + if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil { + t.Fatalf("status message mismatch: %v", err) + } + if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil { + t.Fatalf("failed to send status: %v", err) + } + // Construct a message and inject into the tester + message := NewMessage([]byte("peer broadcast test message")) + envelope, err := message.Wrap(DefaultPoW, Options{ + TTL: DefaultTTL, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := tester.client.Send(envelope); err != nil { + t.Fatalf("failed to send message: %v", err) + } + // Check that the message is eventually forwarded + payload := []interface{}{envelope} + if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil { + t.Fatalf("message mismatch: %v", err) + } + // Make sure that even with a re-insert, an empty batch is received + if err := tester.client.Send(envelope); err != nil { + t.Fatalf("failed to send message: %v", err) + } + if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil { + t.Fatalf("message mismatch: %v", err) + } +} + +func TestPeerDeliver(t *testing.T) { + // Start a tester and execute the handshake + tester := startTestPeer() + defer tester.stream.Close() + + if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil { + t.Fatalf("status message mismatch: %v", err) + } + if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil { + t.Fatalf("failed to send status: %v", err) + } + // Watch for all inbound messages + arrived := make(chan struct{}, 1) + tester.client.Watch(Filter{ + Fn: func(message *Message) { + arrived <- struct{}{} + }, + }) + // Construct a message and deliver it to the tester peer + message := NewMessage([]byte("peer broadcast test message")) + envelope, err := message.Wrap(DefaultPoW, Options{ + TTL: DefaultTTL, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil { + t.Fatalf("failed to transfer message: %v", err) + } + // Check that the message is delivered upstream + select { + case <-arrived: + case <-time.After(time.Second): + t.Fatalf("message delivery timeout") + } + // Check that a resend is not delivered + if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil { + t.Fatalf("failed to transfer message: %v", err) + } + select { + case <-time.After(2 * transmissionTicks): + case <-arrived: + t.Fatalf("repeating message arrived") + } +} From 4fb7ab5d090a49837ca50318fab468b056f2ec9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 14 Apr 2015 19:00:57 +0300 Subject: [PATCH 12/20] whisper: mock tests to use simulated peers --- whisper/common_test.go | 32 ++++++++++++++ whisper/whisper_test.go | 94 ++++++++++------------------------------- 2 files changed, 55 insertions(+), 71 deletions(-) diff --git a/whisper/common_test.go b/whisper/common_test.go index 8c12f8aebd..4e221d6b1f 100644 --- a/whisper/common_test.go +++ b/whisper/common_test.go @@ -3,7 +3,9 @@ package whisper import ( + "bytes" "fmt" + "io/ioutil" "math/rand" "github.com/ethereum/go-ethereum/p2p" @@ -36,3 +38,33 @@ func whisperCaps() []p2p.Cap { }, } } + +// bufMsgPipe creates a buffered message pipe between two endpoints. +func bufMsgPipe() (*p2p.MsgPipeRW, *p2p.MsgPipeRW) { + A, midA := p2p.MsgPipe() + midB, B := p2p.MsgPipe() + + go copyMsgPipe(midA, midB) + go copyMsgPipe(midB, midA) + + return A, B +} + +// copyMsgPipe copies messages from the src pipe to the dest. +func copyMsgPipe(dst, src *p2p.MsgPipeRW) { + defer dst.Close() + for { + msg, err := src.ReadMsg() + if err != nil { + return + } + data, err := ioutil.ReadAll(msg.Payload) + if err != nil { + return + } + msg.Payload = bytes.NewReader(data) + if err := dst.WriteMsg(msg); err != nil { + return + } + } +} diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go index 97644fd27d..df27a945ec 100644 --- a/whisper/whisper_test.go +++ b/whisper/whisper_test.go @@ -1,75 +1,36 @@ package whisper import ( - "fmt" "testing" "time" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/nat" ) -type testNode struct { - server *p2p.Server - client *Whisper -} - -func startNodes(n int) ([]*testNode, error) { - // Start up the cluster of nodes - cluster := make([]*testNode, 0, n) +func startTestCluster(n int) []*Whisper { + // Create the batch of simulated peers + nodes := make([]*p2p.Peer, n) for i := 0; i < n; i++ { - shh := New() - - // Generate the node identity - key, err := crypto.GenerateKey() - if err != nil { - return nil, err - } - name := common.MakeName(fmt.Sprintf("whisper-go-test-%d", i), "1.0") - - // Create an Ethereum server to communicate through - server := &p2p.Server{ - PrivateKey: key, - MaxPeers: 10, - Name: name, - Protocols: []p2p.Protocol{shh.Protocol()}, - ListenAddr: fmt.Sprintf(":%d", 30300+i), - NAT: nat.Any(), - } - if err := server.Start(); err != nil { - return nil, err - } - // Peer online, store and iterate - cluster = append(cluster, &testNode{ - server: server, - client: shh, - }) + nodes[i] = p2p.NewPeer(randomNodeID(), randomNodeName(), whisperCaps()) } - // Manually wire together the cluster nodes - root := cluster[0].server.Self() - for _, node := range cluster[1:] { - node.server.SuggestPeer(root) + whispers := make([]*Whisper, n) + for i := 0; i < n; i++ { + whispers[i] = New() + whispers[i].Start() } - return cluster, nil -} + // Wire all the peers to the root one + for i := 1; i < n; i++ { + src, dst := bufMsgPipe() -func stopNodes(cluster []*testNode) { - for _, node := range cluster { - node.server.Stop() + go whispers[0].handlePeer(nodes[i], src) + go whispers[i].handlePeer(nodes[0], dst) } + return whispers } func TestSelfMessage(t *testing.T) { // Start the single node cluster - cluster, err := startNodes(1) - if err != nil { - t.Fatalf("failed to boot test cluster: %v", err) - } - defer stopNodes(cluster) - - client := cluster[0].client + client := startTestCluster(1)[0] // Start watching for self messages, signal any arrivals self := client.NewIdentity() @@ -104,16 +65,12 @@ func TestSelfMessage(t *testing.T) { func TestDirectMessage(t *testing.T) { // Start the sender-recipient cluster - cluster, err := startNodes(2) - if err != nil { - t.Fatalf("failed to boot test cluster: %v", err) - } - defer stopNodes(cluster) + cluster := startTestCluster(2) - sender := cluster[0].client + sender := cluster[0] senderId := sender.NewIdentity() - recipient := cluster[1].client + recipient := cluster[1] recipientId := recipient.NewIdentity() // Watch for arriving messages on the recipient @@ -155,18 +112,13 @@ func TestIdentifiedBroadcast(t *testing.T) { func testBroadcast(anonymous bool, t *testing.T) { // Start the single sender multi recipient cluster - cluster, err := startNodes(3) - if err != nil { - t.Fatalf("failed to boot test cluster: %v", err) - } - defer stopNodes(cluster) + cluster := startTestCluster(3) - sender := cluster[0].client - targets := make([]*Whisper, len(cluster)-1) - for i, node := range cluster[1:] { - targets[i] = node.client + sender := cluster[1] + targets := cluster[1:] + for _, target := range targets { if !anonymous { - targets[i].NewIdentity() + target.NewIdentity() } } // Watch for arriving messages on the recipients From bcf41797cacac35879d6bc153d3e4ce3cd9896f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 15 Apr 2015 10:50:31 +0300 Subject: [PATCH 13/20] whisper: global message expiration tests, polishes --- whisper/common_test.go | 3 +-- whisper/peer_test.go | 36 ++++++++++++++++++++++-------------- whisper/whisper_test.go | 26 ++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 16 deletions(-) diff --git a/whisper/common_test.go b/whisper/common_test.go index 4e221d6b1f..76e6463e68 100644 --- a/whisper/common_test.go +++ b/whisper/common_test.go @@ -8,9 +8,8 @@ import ( "io/ioutil" "math/rand" - "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" ) diff --git a/whisper/peer_test.go b/whisper/peer_test.go index c9092dd238..5f24504da5 100644 --- a/whisper/peer_test.go +++ b/whisper/peer_test.go @@ -38,6 +38,20 @@ func startTestPeer() *testPeer { } } +func startTestPeerInited() (*testPeer, error) { + peer := startTestPeer() + + if err := p2p.ExpectMsg(peer.stream, statusCode, []uint64{protocolVersion}); err != nil { + peer.stream.Close() + return nil, err + } + if err := p2p.SendItems(peer.stream, statusCode, protocolVersion); err != nil { + peer.stream.Close() + return nil, err + } + return peer, nil +} + func TestPeerStatusMessage(t *testing.T) { tester := startTestPeer() @@ -102,15 +116,12 @@ func TestPeerHandshakeSuccess(t *testing.T) { func TestPeerSend(t *testing.T) { // Start a tester and execute the handshake - tester := startTestPeer() + tester, err := startTestPeerInited() + if err != nil { + t.Fatalf("failed to start initialized peer: %v", err) + } defer tester.stream.Close() - if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil { - t.Fatalf("status message mismatch: %v", err) - } - if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil { - t.Fatalf("failed to send status: %v", err) - } // Construct a message and inject into the tester message := NewMessage([]byte("peer broadcast test message")) envelope, err := message.Wrap(DefaultPoW, Options{ @@ -138,15 +149,12 @@ func TestPeerSend(t *testing.T) { func TestPeerDeliver(t *testing.T) { // Start a tester and execute the handshake - tester := startTestPeer() + tester, err := startTestPeerInited() + if err != nil { + t.Fatalf("failed to start initialized peer: %v", err) + } defer tester.stream.Close() - if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil { - t.Fatalf("status message mismatch: %v", err) - } - if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil { - t.Fatalf("failed to send status: %v", err) - } // Watch for all inbound messages arrived := make(chan struct{}, 1) tester.client.Watch(Filter{ diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go index df27a945ec..c072ba26b3 100644 --- a/whisper/whisper_test.go +++ b/whisper/whisper_test.go @@ -156,3 +156,29 @@ func testBroadcast(anonymous bool, t *testing.T) { } } } + +func TestMessageExpiration(t *testing.T) { + // Start the single node cluster and inject a dummy message + node := startTestCluster(1)[0] + + message := NewMessage([]byte("expiring message")) + envelope, err := message.Wrap(DefaultPoW, Options{ + TTL: time.Second, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := node.Send(envelope); err != nil { + t.Fatalf("failed to inject message: %v", err) + } + // Check that the message is inside the cache + if _, ok := node.messages[envelope.Hash()]; !ok { + t.Fatalf("message not found in cache") + } + // Wait for expiration and check cache again + time.Sleep(time.Second) // wait for expiration + time.Sleep(expirationTicks) // wait for cleanup cycle + if _, ok := node.messages[envelope.Hash()]; ok { + t.Fatalf("message not expired from cache") + } +} From 46ea193a49f60bb54cd5fc083adcc6fdf58dbdaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 15 Apr 2015 12:50:10 +0300 Subject: [PATCH 14/20] whisper: remove some unneeded testing complexity --- whisper/common_test.go | 29 ----------------------------- whisper/peer_test.go | 5 +++-- whisper/whisper_test.go | 3 ++- 3 files changed, 5 insertions(+), 32 deletions(-) diff --git a/whisper/common_test.go b/whisper/common_test.go index 76e6463e68..a5df762e12 100644 --- a/whisper/common_test.go +++ b/whisper/common_test.go @@ -4,40 +4,11 @@ package whisper import ( "bytes" - "fmt" "io/ioutil" - "math/rand" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/discover" ) -// randomNodeID generates and returns a random P2P discovery node id for the -// whisper tests. -func randomNodeID() (id discover.NodeID) { - for i := range id { - id[i] = byte(rand.Intn(255)) - } - return id -} - -// randomNodeName generates and returns a random P2P node name for the whisper -// tests. -func randomNodeName() string { - return common.MakeName(fmt.Sprintf("whisper-go-test-%3d", rand.Intn(999)), "1.0") -} - -// whisperCaps returns the node capabilities for running the whisper sub-protocol. -func whisperCaps() []p2p.Cap { - return []p2p.Cap{ - p2p.Cap{ - Name: protocolName, - Version: uint(protocolVersion), - }, - } -} - // bufMsgPipe creates a buffered message pipe between two endpoints. func bufMsgPipe() (*p2p.MsgPipeRW, *p2p.MsgPipeRW) { A, midA := p2p.MsgPipe() diff --git a/whisper/peer_test.go b/whisper/peer_test.go index 5f24504da5..53aff7c554 100644 --- a/whisper/peer_test.go +++ b/whisper/peer_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" ) type testPeer struct { @@ -15,7 +16,7 @@ type testPeer struct { func startTestPeer() *testPeer { // Create a simulated P2P remote peer and data streams to it - remote := p2p.NewPeer(randomNodeID(), randomNodeName(), whisperCaps()) + remote := p2p.NewPeer(discover.NodeID{}, "", nil) tester, tested := p2p.MsgPipe() // Create a whisper client and connect with it to the tester peer @@ -30,7 +31,7 @@ func startTestPeer() *testPeer { client.handlePeer(remote, tested) }() - // Assemble and return the test peer + return &testPeer{ client: client, stream: tester, diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go index c072ba26b3..35e2f05245 100644 --- a/whisper/whisper_test.go +++ b/whisper/whisper_test.go @@ -5,13 +5,14 @@ import ( "time" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" ) func startTestCluster(n int) []*Whisper { // Create the batch of simulated peers nodes := make([]*p2p.Peer, n) for i := 0; i < n; i++ { - nodes[i] = p2p.NewPeer(randomNodeID(), randomNodeName(), whisperCaps()) + nodes[i] = p2p.NewPeer(discover.NodeID{}, "", nil) } whispers := make([]*Whisper, n) for i := 0; i < n; i++ { From 6ceb253f743ec0d2bdd9a676c7f365de2201470c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 15 Apr 2015 13:01:22 +0300 Subject: [PATCH 15/20] whisper: use async handshakes to handle blocking peers --- whisper/common_test.go | 40 ---------------------------------------- whisper/peer.go | 17 +++++++++++------ whisper/whisper_test.go | 2 +- 3 files changed, 12 insertions(+), 47 deletions(-) delete mode 100644 whisper/common_test.go diff --git a/whisper/common_test.go b/whisper/common_test.go deleted file mode 100644 index a5df762e12..0000000000 --- a/whisper/common_test.go +++ /dev/null @@ -1,40 +0,0 @@ -// Contains some common utility functions for testing. - -package whisper - -import ( - "bytes" - "io/ioutil" - - "github.com/ethereum/go-ethereum/p2p" -) - -// bufMsgPipe creates a buffered message pipe between two endpoints. -func bufMsgPipe() (*p2p.MsgPipeRW, *p2p.MsgPipeRW) { - A, midA := p2p.MsgPipe() - midB, B := p2p.MsgPipe() - - go copyMsgPipe(midA, midB) - go copyMsgPipe(midB, midA) - - return A, B -} - -// copyMsgPipe copies messages from the src pipe to the dest. -func copyMsgPipe(dst, src *p2p.MsgPipeRW) { - defer dst.Close() - for { - msg, err := src.ReadMsg() - if err != nil { - return - } - data, err := ioutil.ReadAll(msg.Payload) - if err != nil { - return - } - msg.Payload = bytes.NewReader(data) - if err := dst.WriteMsg(msg); err != nil { - return - } - } -} diff --git a/whisper/peer.go b/whisper/peer.go index f077dbe70b..8bf8488557 100644 --- a/whisper/peer.go +++ b/whisper/peer.go @@ -53,10 +53,12 @@ func (self *peer) stop() { // handshake sends the protocol initiation status message to the remote peer and // verifies the remote status too. func (self *peer) handshake() error { - // Send own status message, fetch remote one - if err := p2p.SendItems(self.ws, statusCode, protocolVersion); err != nil { - return err - } + // Send the handshake status message asynchronously + errc := make(chan error, 1) + go func() { + errc <- p2p.SendItems(self.ws, statusCode, protocolVersion) + }() + // Fetch the remote status packet and verify protocol match packet, err := self.ws.ReadMsg() if err != nil { return err @@ -64,7 +66,6 @@ func (self *peer) handshake() error { if packet.Code != statusCode { return fmt.Errorf("peer sent %x before status packet", packet.Code) } - // Decode the rest of the status packet and verify protocol match s := rlp.NewStream(packet.Payload) if _, err := s.List(); err != nil { return fmt.Errorf("bad status message: %v", err) @@ -76,7 +77,11 @@ func (self *peer) handshake() error { if peerVersion != protocolVersion { return fmt.Errorf("protocol version mismatch %d != %d", peerVersion, protocolVersion) } - return packet.Discard() // ignore anything after protocol version + // Wait until out own status is consumed too + if err := <-errc; err != nil { + return fmt.Errorf("failed to send status packet: %v", err) + } + return nil } // update executes periodic operations on the peer, including message transmission diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go index 35e2f05245..554a12cb11 100644 --- a/whisper/whisper_test.go +++ b/whisper/whisper_test.go @@ -21,7 +21,7 @@ func startTestCluster(n int) []*Whisper { } // Wire all the peers to the root one for i := 1; i < n; i++ { - src, dst := bufMsgPipe() + src, dst := p2p.MsgPipe() go whispers[0].handlePeer(nodes[i], src) go whispers[i].handlePeer(nodes[0], dst) From ee6531c5ff712307325e8866b73397179f4bb8cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 16 Apr 2015 11:20:01 +0300 Subject: [PATCH 16/20] whisper: remove dead code, rename a few constants --- whisper/peer.go | 4 ++-- whisper/peer_test.go | 2 +- whisper/sort.go | 29 ----------------------------- whisper/sort_test.go | 23 ----------------------- whisper/whisper.go | 6 +++--- whisper/whisper_test.go | 2 +- 6 files changed, 7 insertions(+), 59 deletions(-) delete mode 100644 whisper/sort.go delete mode 100644 whisper/sort_test.go diff --git a/whisper/peer.go b/whisper/peer.go index 8bf8488557..e4301f37c3 100644 --- a/whisper/peer.go +++ b/whisper/peer.go @@ -88,8 +88,8 @@ func (self *peer) handshake() error { // and expiration. func (self *peer) update() { // Start the tickers for the updates - expire := time.NewTicker(expirationTicks) - transmit := time.NewTicker(transmissionTicks) + expire := time.NewTicker(expirationCycle) + transmit := time.NewTicker(transmissionCycle) // Loop and transmit until termination is requested for { diff --git a/whisper/peer_test.go b/whisper/peer_test.go index 53aff7c554..de67b2463a 100644 --- a/whisper/peer_test.go +++ b/whisper/peer_test.go @@ -185,7 +185,7 @@ func TestPeerDeliver(t *testing.T) { t.Fatalf("failed to transfer message: %v", err) } select { - case <-time.After(2 * transmissionTicks): + case <-time.After(2 * transmissionCycle): case <-arrived: t.Fatalf("repeating message arrived") } diff --git a/whisper/sort.go b/whisper/sort.go deleted file mode 100644 index 313ba5ac0a..0000000000 --- a/whisper/sort.go +++ /dev/null @@ -1,29 +0,0 @@ -package whisper - -import ( - "sort" - - "github.com/ethereum/go-ethereum/common" -) - -type sortedKeys struct { - k []int32 -} - -func (self *sortedKeys) Len() int { return len(self.k) } -func (self *sortedKeys) Less(i, j int) bool { return self.k[i] < self.k[j] } -func (self *sortedKeys) Swap(i, j int) { self.k[i], self.k[j] = self.k[j], self.k[i] } - -func sortKeys(m map[int32]common.Hash) []int32 { - sorted := new(sortedKeys) - sorted.k = make([]int32, len(m)) - i := 0 - for key, _ := range m { - sorted.k[i] = key - i++ - } - - sort.Sort(sorted) - - return sorted.k -} diff --git a/whisper/sort_test.go b/whisper/sort_test.go deleted file mode 100644 index a61fde4c2d..0000000000 --- a/whisper/sort_test.go +++ /dev/null @@ -1,23 +0,0 @@ -package whisper - -import ( - "testing" - - "github.com/ethereum/go-ethereum/common" -) - -func TestSorting(t *testing.T) { - m := map[int32]common.Hash{ - 1: {1}, - 3: {3}, - 2: {2}, - 5: {5}, - } - exp := []int32{1, 2, 3, 5} - res := sortKeys(m) - for i, k := range res { - if k != exp[i] { - t.Error(k, "failed. Expected", exp[i]) - } - } -} diff --git a/whisper/whisper.go b/whisper/whisper.go index b83e3f1c62..f04075e1f4 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -25,8 +25,8 @@ const ( signatureFlag = byte(1 << 7) signatureLength = 65 - expirationTicks = 800 * time.Millisecond - transmissionTicks = 300 * time.Millisecond + expirationCycle = 800 * time.Millisecond + transmissionCycle = 300 * time.Millisecond ) const ( @@ -275,7 +275,7 @@ func createFilter(message *Message, topics []Topic) filter.Filter { // state by expiring stale messages from the pool. func (self *Whisper) update() { // Start a ticker to check for expirations - expire := time.NewTicker(expirationTicks) + expire := time.NewTicker(expirationCycle) // Repeat updates until termination is requested for { diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go index 554a12cb11..def8e68d8b 100644 --- a/whisper/whisper_test.go +++ b/whisper/whisper_test.go @@ -178,7 +178,7 @@ func TestMessageExpiration(t *testing.T) { } // Wait for expiration and check cache again time.Sleep(time.Second) // wait for expiration - time.Sleep(expirationTicks) // wait for cleanup cycle + time.Sleep(expirationCycle) // wait for cleanup cycle if _, ok := node.messages[envelope.Hash()]; ok { t.Fatalf("message not expired from cache") } From e5e91e9eb394e7b495666834586073052dc58265 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 16 Apr 2015 13:05:35 +0300 Subject: [PATCH 17/20] whisper: track active peers, add peer cache expiry test --- whisper/peer_test.go | 50 ++++++++++++++++++++++++++++++++++++++ whisper/whisper.go | 58 +++++++++++++++++++++++++++----------------- 2 files changed, 86 insertions(+), 22 deletions(-) diff --git a/whisper/peer_test.go b/whisper/peer_test.go index de67b2463a..9008cdc593 100644 --- a/whisper/peer_test.go +++ b/whisper/peer_test.go @@ -190,3 +190,53 @@ func TestPeerDeliver(t *testing.T) { t.Fatalf("repeating message arrived") } } + +func TestPeerMessageExpiration(t *testing.T) { + // Start a tester and execute the handshake + tester, err := startTestPeerInited() + if err != nil { + t.Fatalf("failed to start initialized peer: %v", err) + } + defer tester.stream.Close() + + // Fetch the peer instance for later inspection + tester.client.peerMu.RLock() + if peers := len(tester.client.peers); peers != 1 { + t.Fatalf("peer pool size mismatch: have %v, want %v", peers, 1) + } + var peer *peer + for peer, _ = range tester.client.peers { + break + } + tester.client.peerMu.RUnlock() + + // Construct a message and pass it through the tester + message := NewMessage([]byte("peer test message")) + envelope, err := message.Wrap(DefaultPoW, Options{ + TTL: time.Second, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := tester.client.Send(envelope); err != nil { + t.Fatalf("failed to send message: %v", err) + } + payload := []interface{}{envelope} + if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil { + t.Fatalf("message mismatch: %v", err) + } + // Check that the message is inside the cache + if !peer.known.Has(envelope.Hash()) { + t.Fatalf("message not found in cache") + } + // Discard messages until expiration and check cache again + exp := time.Now().Add(time.Second + expirationCycle) + for time.Now().Before(exp) { + if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil { + t.Fatalf("message mismatch: %v", err) + } + } + if peer.known.Has(envelope.Hash()) { + t.Fatalf("message not expired from cache") + } +} diff --git a/whisper/whisper.go b/whisper/whisper.go index f04075e1f4..48efff6229 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -46,22 +46,26 @@ type Whisper struct { protocol p2p.Protocol filters *filter.Filters - mmu sync.RWMutex // Message mutex to sync the below pool - messages map[common.Hash]*Envelope // Pool of messages currently tracked by this node - expiry map[uint32]*set.SetNonTS // Message expiration pool (TODO: something lighter) + keys map[string]*ecdsa.PrivateKey - quit chan struct{} + messages map[common.Hash]*Envelope // Pool of messages currently tracked by this node + expirations map[uint32]*set.SetNonTS // Message expiration pool (TODO: something lighter) + poolMu sync.RWMutex // Mutex to sync the message and expiration pools - keys map[string]*ecdsa.PrivateKey + peers map[*peer]struct{} // Set of currently active peers + peerMu sync.RWMutex // Mutex to sync the active peer set + + quit chan struct{} } func New() *Whisper { whisper := &Whisper{ - messages: make(map[common.Hash]*Envelope), - filters: filter.New(), - expiry: make(map[uint32]*set.SetNonTS), - quit: make(chan struct{}), - keys: make(map[string]*ecdsa.PrivateKey), + filters: filter.New(), + keys: make(map[string]*ecdsa.PrivateKey), + messages: make(map[common.Hash]*Envelope), + expirations: make(map[uint32]*set.SetNonTS), + peers: make(map[*peer]struct{}), + quit: make(chan struct{}), } whisper.filters.Start() @@ -179,6 +183,16 @@ func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { whisperPeer.start() defer whisperPeer.stop() + // Start tracking the active peer + self.peerMu.Lock() + self.peers[whisperPeer] = struct{}{} + self.peerMu.Unlock() + + defer func() { + self.peerMu.Lock() + delete(self.peers, whisperPeer) + self.peerMu.Unlock() + }() // Read and process inbound messages directly to merge into client-global state for { // Fetch the next packet and decode the contained envelopes @@ -206,8 +220,8 @@ func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { // whisper network. It also inserts the envelope into the expiration pool at the // appropriate time-stamp. func (self *Whisper) add(envelope *Envelope) error { - self.mmu.Lock() - defer self.mmu.Unlock() + self.poolMu.Lock() + defer self.poolMu.Unlock() // Insert the message into the tracked pool hash := envelope.Hash() @@ -218,11 +232,11 @@ func (self *Whisper) add(envelope *Envelope) error { self.messages[hash] = envelope // Insert the message into the expiration pool for later removal - if self.expiry[envelope.Expiry] == nil { - self.expiry[envelope.Expiry] = set.NewNonTS() + if self.expirations[envelope.Expiry] == nil { + self.expirations[envelope.Expiry] = set.NewNonTS() } - if !self.expiry[envelope.Expiry].Has(hash) { - self.expiry[envelope.Expiry].Add(hash) + if !self.expirations[envelope.Expiry].Has(hash) { + self.expirations[envelope.Expiry].Add(hash) // Notify the local node of a message arrival go self.postEvent(envelope) @@ -292,11 +306,11 @@ func (self *Whisper) update() { // expire iterates over all the expiration timestamps, removing all stale // messages from the pools. func (self *Whisper) expire() { - self.mmu.Lock() - defer self.mmu.Unlock() + self.poolMu.Lock() + defer self.poolMu.Unlock() now := uint32(time.Now().Unix()) - for then, hashSet := range self.expiry { + for then, hashSet := range self.expirations { // Short circuit if a future time if then > now { continue @@ -306,14 +320,14 @@ func (self *Whisper) expire() { delete(self.messages, v.(common.Hash)) return true }) - self.expiry[then].Clear() + self.expirations[then].Clear() } } // envelopes retrieves all the messages currently pooled by the node. func (self *Whisper) envelopes() []*Envelope { - self.mmu.RLock() - defer self.mmu.RUnlock() + self.poolMu.RLock() + defer self.poolMu.RUnlock() envelopes := make([]*Envelope, 0, len(self.messages)) for _, envelope := range self.messages { From e5a03eb0661a70c9a068fdf85f1cb31effbf26f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 16 Apr 2015 18:24:39 +0300 Subject: [PATCH 18/20] whisper: don't issue signature warning if none present --- whisper/message.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/whisper/message.go b/whisper/message.go index 0513627b49..07c6735677 100644 --- a/whisper/message.go +++ b/whisper/message.go @@ -94,6 +94,11 @@ func (self *Message) sign(key *ecdsa.PrivateKey) (err error) { func (self *Message) Recover() *ecdsa.PublicKey { defer func() { recover() }() // in case of invalid signature + // Short circuit if no signature is present + if self.Signature == nil { + return nil + } + // Otherwise try and recover the signature pub, err := crypto.SigToPub(self.hash(), self.Signature) if err != nil { glog.V(logger.Error).Infof("Could not get public key from signature: %v", err) From bd14bd6c5b8fcfb2430720da58de152dbbcb84ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 17 Apr 2015 13:25:18 +0300 Subject: [PATCH 19/20] whisper: hide some internal types --- whisper/topic.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/whisper/topic.go b/whisper/topic.go index 7792e437fa..7c26cfba9b 100644 --- a/whisper/topic.go +++ b/whisper/topic.go @@ -49,13 +49,13 @@ func (self *Topic) String() string { } // TopicSet represents a hash set to check if a topic exists or not. -type TopicSet map[string]struct{} +type topicSet map[string]struct{} // NewTopicSet creates a topic hash set from a slice of topics. -func NewTopicSet(topics []Topic) TopicSet { +func NewTopicSet(topics []Topic) topicSet { set := make(map[string]struct{}) for _, topic := range topics { set[topic.String()] = struct{}{} } - return TopicSet(set) + return topicSet(set) } From 4afc22ba6e79fe951256251444b4a5b8d38b50fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 17 Apr 2015 14:11:46 +0300 Subject: [PATCH 20/20] whisper: cleanup lefover scoping --- whisper/topic.go | 2 +- whisper/topic_test.go | 2 +- whisper/whisper.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/whisper/topic.go b/whisper/topic.go index 7c26cfba9b..a965c7cc2a 100644 --- a/whisper/topic.go +++ b/whisper/topic.go @@ -52,7 +52,7 @@ func (self *Topic) String() string { type topicSet map[string]struct{} // NewTopicSet creates a topic hash set from a slice of topics. -func NewTopicSet(topics []Topic) topicSet { +func newTopicSet(topics []Topic) topicSet { set := make(map[string]struct{}) for _, topic := range topics { set[topic.String()] = struct{}{} diff --git a/whisper/topic_test.go b/whisper/topic_test.go index 5f85839872..4015079dcf 100644 --- a/whisper/topic_test.go +++ b/whisper/topic_test.go @@ -57,7 +57,7 @@ func TestTopicSetCreation(t *testing.T) { for i, tt := range topicCreationTests { topics[i] = NewTopic(tt.data) } - set := NewTopicSet(topics) + set := newTopicSet(topics) for i, tt := range topicCreationTests { topic := NewTopic(tt.data) if _, ok := set[topic.String()]; !ok { diff --git a/whisper/whisper.go b/whisper/whisper.go index 48efff6229..9317fad50e 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -119,7 +119,7 @@ func (self *Whisper) Watch(options Filter) int { filter := filter.Generic{ Str1: string(crypto.FromECDSAPub(options.To)), Str2: string(crypto.FromECDSAPub(options.From)), - Data: NewTopicSet(options.Topics), + Data: newTopicSet(options.Topics), Fn: func(data interface{}) { options.Fn(data.(*Message)) }, @@ -281,7 +281,7 @@ func createFilter(message *Message, topics []Topic) filter.Filter { return filter.Generic{ Str1: string(crypto.FromECDSAPub(message.To)), Str2: string(crypto.FromECDSAPub(message.Recover())), - Data: NewTopicSet(topics), + Data: newTopicSet(topics), } }