whisper: fix anonymous broadcast drop, add broadcast tests

pull/718/head
Péter Szilágyi 10 years ago
parent 4af7743663
commit 5205b2f19b
  1. 7
      whisper/envelope.go
  2. 46
      whisper/whisper.go
  3. 70
      whisper/whisper_test.go

@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/ecies"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
) )
@ -96,10 +97,14 @@ func (self *Envelope) Open(key *ecdsa.PrivateKey) (msg *Message, err error) {
if key == nil { if key == nil {
return message, nil return message, nil
} }
switch message.decrypt(key) { err = message.decrypt(key)
switch err {
case nil: case nil:
return message, nil return message, nil
case ecies.ErrInvalidPublicKey: // Payload isn't encrypted
return message, err
default: default:
return nil, fmt.Errorf("unable to open envelope, decrypt failed: %v", err) return nil, fmt.Errorf("unable to open envelope, decrypt failed: %v", err)
} }

@ -136,8 +136,8 @@ func (self *Whisper) Messages(id int) (messages []*Message) {
filter := self.filters.Get(id) filter := self.filters.Get(id)
if filter != nil { if filter != nil {
for _, e := range self.messages { for _, e := range self.messages {
if msg, key := self.open(e); msg != nil { if msg := self.open(e); msg != nil {
f := createFilter(msg, e.Topics, key) f := createFilter(msg, e.Topics)
if self.filters.Match(filter, f) { if self.filters.Match(filter, f) {
messages = append(messages, msg) messages = append(messages, msg)
} }
@ -251,31 +251,45 @@ func (self *Whisper) envelopes() (envelopes []*Envelope) {
return return
} }
func (self *Whisper) Protocol() p2p.Protocol {
return self.protocol
}
// postEvent opens an envelope with the configured identities and delivers the
// message upstream from application processing.
func (self *Whisper) postEvent(envelope *Envelope) { func (self *Whisper) postEvent(envelope *Envelope) {
if message, key := self.open(envelope); message != nil { if message := self.open(envelope); message != nil {
self.filters.Notify(createFilter(message, envelope.Topics, key), message) self.filters.Notify(createFilter(message, envelope.Topics), message)
} }
} }
func (self *Whisper) open(envelope *Envelope) (*Message, *ecdsa.PrivateKey) { // open tries to decrypt a whisper envelope with all the configured identities,
// returning the decrypted message and the key used to achieve it. If not keys
// are configured, open will return the payload as if non encrypted.
func (self *Whisper) open(envelope *Envelope) *Message {
// Short circuit if no identity is set, and assume clear-text
if len(self.keys) == 0 {
if message, err := envelope.Open(nil); err == nil {
return message
}
}
// Iterate over the keys and try to decrypt the message
for _, key := range self.keys { for _, key := range self.keys {
if message, err := envelope.Open(key); err == nil || (err != nil && err == ecies.ErrInvalidPublicKey) { message, err := envelope.Open(key)
if err == nil || err == ecies.ErrInvalidPublicKey {
message.To = &key.PublicKey message.To = &key.PublicKey
return message
return message, key
} }
} }
// Failed to decrypt, don't return anything
return nil, nil return nil
}
func (self *Whisper) Protocol() p2p.Protocol {
return self.protocol
} }
func createFilter(message *Message, topics []Topic, key *ecdsa.PrivateKey) filter.Filter { // createFilter creates a message filter to check against installed handlers.
func createFilter(message *Message, topics []Topic) filter.Filter {
return filter.Generic{ return filter.Generic{
Str1: string(crypto.FromECDSAPub(&key.PublicKey)), Str2: string(crypto.FromECDSAPub(message.Recover())), Str1: string(crypto.FromECDSAPub(message.To)),
Str2: string(crypto.FromECDSAPub(message.Recover())),
Data: NewTopicSet(topics), Data: NewTopicSet(topics),
} }
} }

@ -7,7 +7,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/nat" "github.com/ethereum/go-ethereum/p2p/nat"
) )
@ -83,7 +82,7 @@ func TestSelfMessage(t *testing.T) {
}, },
}) })
// Send a dummy message to oneself // Send a dummy message to oneself
msg := NewMessage([]byte("hello whisper")) msg := NewMessage([]byte("self whisper"))
envelope, err := msg.Wrap(DefaultProofOfWork, Options{ envelope, err := msg.Wrap(DefaultProofOfWork, Options{
From: self, From: self,
To: &self.PublicKey, To: &self.PublicKey,
@ -104,9 +103,6 @@ func TestSelfMessage(t *testing.T) {
} }
func TestDirectMessage(t *testing.T) { func TestDirectMessage(t *testing.T) {
glog.SetV(6)
glog.SetToStderr(true)
// Start the sender-recipient cluster // Start the sender-recipient cluster
cluster, err := startNodes(2) cluster, err := startNodes(2)
if err != nil { if err != nil {
@ -129,7 +125,7 @@ func TestDirectMessage(t *testing.T) {
}, },
}) })
// Send a dummy message from the sender // Send a dummy message from the sender
msg := NewMessage([]byte("hello whisper")) msg := NewMessage([]byte("direct whisper"))
envelope, err := msg.Wrap(DefaultProofOfWork, Options{ envelope, err := msg.Wrap(DefaultProofOfWork, Options{
From: senderId, From: senderId,
To: &recipientId.PublicKey, To: &recipientId.PublicKey,
@ -139,7 +135,7 @@ func TestDirectMessage(t *testing.T) {
t.Fatalf("failed to wrap message: %v", err) t.Fatalf("failed to wrap message: %v", err)
} }
if err := sender.Send(envelope); err != nil { if err := sender.Send(envelope); err != nil {
t.Fatalf("failed to send direct message: %v", err) t.Fatalf("failed to send direct message: %v", err)
} }
// Wait for an arrival or a timeout // Wait for an arrival or a timeout
select { select {
@ -148,3 +144,63 @@ func TestDirectMessage(t *testing.T) {
t.Fatalf("direct message receive timeout") t.Fatalf("direct message receive timeout")
} }
} }
func TestAnonymousBroadcast(t *testing.T) {
testBroadcast(true, t)
}
func TestIdentifiedBroadcast(t *testing.T) {
testBroadcast(false, t)
}
func testBroadcast(anonymous bool, t *testing.T) {
// Start the single sender multi recipient cluster
cluster, err := startNodes(3)
if err != nil {
t.Fatalf("failed to boot test cluster: %v", err)
}
defer stopNodes(cluster)
sender := cluster[0].client
targets := make([]*Whisper, len(cluster)-1)
for i, node := range cluster[1:] {
targets[i] = node.client
if !anonymous {
targets[i].NewIdentity()
}
}
// Watch for arriving messages on the recipients
dones := make([]chan struct{}, len(targets))
for i := 0; i < len(targets); i++ {
done := make(chan struct{}) // need for the closure
dones[i] = done
targets[i].Watch(Filter{
Topics: NewTopicsFromStrings("broadcast topic"),
Fn: func(msg *Message) {
close(done)
},
})
}
// Send a dummy message from the sender
msg := NewMessage([]byte("broadcast whisper"))
envelope, err := msg.Wrap(DefaultProofOfWork, Options{
Topics: NewTopicsFromStrings("broadcast topic"),
TTL: DefaultTimeToLive,
})
if err != nil {
t.Fatalf("failed to wrap message: %v", err)
}
if err := sender.Send(envelope); err != nil {
t.Fatalf("failed to send broadcast message: %v", err)
}
// Wait for an arrival on each recipient, or timeouts
timeout := time.After(time.Second)
for _, done := range dones {
select {
case <-done:
case <-timeout:
t.Fatalf("broadcast message receive timeout")
}
}
}

Loading…
Cancel
Save