mirror of https://github.com/ethereum/go-ethereum
commit
ff67fbf964
@ -1,10 +1,13 @@ |
|||||||
|
// Contains the message filter for fine grained subscriptions.
|
||||||
|
|
||||||
package whisper |
package whisper |
||||||
|
|
||||||
import "crypto/ecdsa" |
import "crypto/ecdsa" |
||||||
|
|
||||||
|
// Filter is used to subscribe to specific types of whisper messages.
|
||||||
type Filter struct { |
type Filter struct { |
||||||
To *ecdsa.PublicKey |
To *ecdsa.PublicKey // Recipient of the message
|
||||||
From *ecdsa.PublicKey |
From *ecdsa.PublicKey // Sender of the message
|
||||||
Topics [][]byte |
Topics []Topic // Topics to watch messages on
|
||||||
Fn func(*Message) |
Fn func(*Message) // Handler in case of a match
|
||||||
} |
} |
||||||
|
@ -0,0 +1,242 @@ |
|||||||
|
package whisper |
||||||
|
|
||||||
|
import ( |
||||||
|
"testing" |
||||||
|
"time" |
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/p2p" |
||||||
|
"github.com/ethereum/go-ethereum/p2p/discover" |
||||||
|
) |
||||||
|
|
||||||
|
type testPeer struct { |
||||||
|
client *Whisper |
||||||
|
stream *p2p.MsgPipeRW |
||||||
|
termed chan struct{} |
||||||
|
} |
||||||
|
|
||||||
|
func startTestPeer() *testPeer { |
||||||
|
// Create a simulated P2P remote peer and data streams to it
|
||||||
|
remote := p2p.NewPeer(discover.NodeID{}, "", nil) |
||||||
|
tester, tested := p2p.MsgPipe() |
||||||
|
|
||||||
|
// Create a whisper client and connect with it to the tester peer
|
||||||
|
client := New() |
||||||
|
client.Start() |
||||||
|
|
||||||
|
termed := make(chan struct{}) |
||||||
|
go func() { |
||||||
|
defer client.Stop() |
||||||
|
defer close(termed) |
||||||
|
defer tested.Close() |
||||||
|
|
||||||
|
client.handlePeer(remote, tested) |
||||||
|
}() |
||||||
|
|
||||||
|
return &testPeer{ |
||||||
|
client: client, |
||||||
|
stream: tester, |
||||||
|
termed: termed, |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func startTestPeerInited() (*testPeer, error) { |
||||||
|
peer := startTestPeer() |
||||||
|
|
||||||
|
if err := p2p.ExpectMsg(peer.stream, statusCode, []uint64{protocolVersion}); err != nil { |
||||||
|
peer.stream.Close() |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
if err := p2p.SendItems(peer.stream, statusCode, protocolVersion); err != nil { |
||||||
|
peer.stream.Close() |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
return peer, nil |
||||||
|
} |
||||||
|
|
||||||
|
func TestPeerStatusMessage(t *testing.T) { |
||||||
|
tester := startTestPeer() |
||||||
|
|
||||||
|
// Wait for the handshake status message and check it
|
||||||
|
if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil { |
||||||
|
t.Fatalf("status message mismatch: %v", err) |
||||||
|
} |
||||||
|
// Terminate the node
|
||||||
|
tester.stream.Close() |
||||||
|
|
||||||
|
select { |
||||||
|
case <-tester.termed: |
||||||
|
case <-time.After(time.Second): |
||||||
|
t.Fatalf("local close timed out") |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func TestPeerHandshakeFail(t *testing.T) { |
||||||
|
tester := startTestPeer() |
||||||
|
|
||||||
|
// Wait for and check the handshake
|
||||||
|
if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil { |
||||||
|
t.Fatalf("status message mismatch: %v", err) |
||||||
|
} |
||||||
|
// Send an invalid handshake status and verify disconnect
|
||||||
|
if err := p2p.SendItems(tester.stream, messagesCode); err != nil { |
||||||
|
t.Fatalf("failed to send malformed status: %v", err) |
||||||
|
} |
||||||
|
select { |
||||||
|
case <-tester.termed: |
||||||
|
case <-time.After(time.Second): |
||||||
|
t.Fatalf("remote close timed out") |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func TestPeerHandshakeSuccess(t *testing.T) { |
||||||
|
tester := startTestPeer() |
||||||
|
|
||||||
|
// Wait for and check the handshake
|
||||||
|
if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil { |
||||||
|
t.Fatalf("status message mismatch: %v", err) |
||||||
|
} |
||||||
|
// Send a valid handshake status and make sure connection stays live
|
||||||
|
if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil { |
||||||
|
t.Fatalf("failed to send status: %v", err) |
||||||
|
} |
||||||
|
select { |
||||||
|
case <-tester.termed: |
||||||
|
t.Fatalf("valid handshake disconnected") |
||||||
|
|
||||||
|
case <-time.After(100 * time.Millisecond): |
||||||
|
} |
||||||
|
// Clean up the test
|
||||||
|
tester.stream.Close() |
||||||
|
|
||||||
|
select { |
||||||
|
case <-tester.termed: |
||||||
|
case <-time.After(time.Second): |
||||||
|
t.Fatalf("local close timed out") |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func TestPeerSend(t *testing.T) { |
||||||
|
// Start a tester and execute the handshake
|
||||||
|
tester, err := startTestPeerInited() |
||||||
|
if err != nil { |
||||||
|
t.Fatalf("failed to start initialized peer: %v", err) |
||||||
|
} |
||||||
|
defer tester.stream.Close() |
||||||
|
|
||||||
|
// Construct a message and inject into the tester
|
||||||
|
message := NewMessage([]byte("peer broadcast test message")) |
||||||
|
envelope, err := message.Wrap(DefaultPoW, Options{ |
||||||
|
TTL: DefaultTTL, |
||||||
|
}) |
||||||
|
if err != nil { |
||||||
|
t.Fatalf("failed to wrap message: %v", err) |
||||||
|
} |
||||||
|
if err := tester.client.Send(envelope); err != nil { |
||||||
|
t.Fatalf("failed to send message: %v", err) |
||||||
|
} |
||||||
|
// Check that the message is eventually forwarded
|
||||||
|
payload := []interface{}{envelope} |
||||||
|
if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil { |
||||||
|
t.Fatalf("message mismatch: %v", err) |
||||||
|
} |
||||||
|
// Make sure that even with a re-insert, an empty batch is received
|
||||||
|
if err := tester.client.Send(envelope); err != nil { |
||||||
|
t.Fatalf("failed to send message: %v", err) |
||||||
|
} |
||||||
|
if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil { |
||||||
|
t.Fatalf("message mismatch: %v", err) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func TestPeerDeliver(t *testing.T) { |
||||||
|
// Start a tester and execute the handshake
|
||||||
|
tester, err := startTestPeerInited() |
||||||
|
if err != nil { |
||||||
|
t.Fatalf("failed to start initialized peer: %v", err) |
||||||
|
} |
||||||
|
defer tester.stream.Close() |
||||||
|
|
||||||
|
// Watch for all inbound messages
|
||||||
|
arrived := make(chan struct{}, 1) |
||||||
|
tester.client.Watch(Filter{ |
||||||
|
Fn: func(message *Message) { |
||||||
|
arrived <- struct{}{} |
||||||
|
}, |
||||||
|
}) |
||||||
|
// Construct a message and deliver it to the tester peer
|
||||||
|
message := NewMessage([]byte("peer broadcast test message")) |
||||||
|
envelope, err := message.Wrap(DefaultPoW, Options{ |
||||||
|
TTL: DefaultTTL, |
||||||
|
}) |
||||||
|
if err != nil { |
||||||
|
t.Fatalf("failed to wrap message: %v", err) |
||||||
|
} |
||||||
|
if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil { |
||||||
|
t.Fatalf("failed to transfer message: %v", err) |
||||||
|
} |
||||||
|
// Check that the message is delivered upstream
|
||||||
|
select { |
||||||
|
case <-arrived: |
||||||
|
case <-time.After(time.Second): |
||||||
|
t.Fatalf("message delivery timeout") |
||||||
|
} |
||||||
|
// Check that a resend is not delivered
|
||||||
|
if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil { |
||||||
|
t.Fatalf("failed to transfer message: %v", err) |
||||||
|
} |
||||||
|
select { |
||||||
|
case <-time.After(2 * transmissionCycle): |
||||||
|
case <-arrived: |
||||||
|
t.Fatalf("repeating message arrived") |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func TestPeerMessageExpiration(t *testing.T) { |
||||||
|
// Start a tester and execute the handshake
|
||||||
|
tester, err := startTestPeerInited() |
||||||
|
if err != nil { |
||||||
|
t.Fatalf("failed to start initialized peer: %v", err) |
||||||
|
} |
||||||
|
defer tester.stream.Close() |
||||||
|
|
||||||
|
// Fetch the peer instance for later inspection
|
||||||
|
tester.client.peerMu.RLock() |
||||||
|
if peers := len(tester.client.peers); peers != 1 { |
||||||
|
t.Fatalf("peer pool size mismatch: have %v, want %v", peers, 1) |
||||||
|
} |
||||||
|
var peer *peer |
||||||
|
for peer, _ = range tester.client.peers { |
||||||
|
break |
||||||
|
} |
||||||
|
tester.client.peerMu.RUnlock() |
||||||
|
|
||||||
|
// Construct a message and pass it through the tester
|
||||||
|
message := NewMessage([]byte("peer test message")) |
||||||
|
envelope, err := message.Wrap(DefaultPoW, Options{ |
||||||
|
TTL: time.Second, |
||||||
|
}) |
||||||
|
if err != nil { |
||||||
|
t.Fatalf("failed to wrap message: %v", err) |
||||||
|
} |
||||||
|
if err := tester.client.Send(envelope); err != nil { |
||||||
|
t.Fatalf("failed to send message: %v", err) |
||||||
|
} |
||||||
|
payload := []interface{}{envelope} |
||||||
|
if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil { |
||||||
|
t.Fatalf("message mismatch: %v", err) |
||||||
|
} |
||||||
|
// Check that the message is inside the cache
|
||||||
|
if !peer.known.Has(envelope.Hash()) { |
||||||
|
t.Fatalf("message not found in cache") |
||||||
|
} |
||||||
|
// Discard messages until expiration and check cache again
|
||||||
|
exp := time.Now().Add(time.Second + expirationCycle) |
||||||
|
for time.Now().Before(exp) { |
||||||
|
if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil { |
||||||
|
t.Fatalf("message mismatch: %v", err) |
||||||
|
} |
||||||
|
} |
||||||
|
if peer.known.Has(envelope.Hash()) { |
||||||
|
t.Fatalf("message not expired from cache") |
||||||
|
} |
||||||
|
} |
@ -1,29 +0,0 @@ |
|||||||
package whisper |
|
||||||
|
|
||||||
import ( |
|
||||||
"sort" |
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common" |
|
||||||
) |
|
||||||
|
|
||||||
type sortedKeys struct { |
|
||||||
k []int32 |
|
||||||
} |
|
||||||
|
|
||||||
func (self *sortedKeys) Len() int { return len(self.k) } |
|
||||||
func (self *sortedKeys) Less(i, j int) bool { return self.k[i] < self.k[j] } |
|
||||||
func (self *sortedKeys) Swap(i, j int) { self.k[i], self.k[j] = self.k[j], self.k[i] } |
|
||||||
|
|
||||||
func sortKeys(m map[int32]common.Hash) []int32 { |
|
||||||
sorted := new(sortedKeys) |
|
||||||
sorted.k = make([]int32, len(m)) |
|
||||||
i := 0 |
|
||||||
for key, _ := range m { |
|
||||||
sorted.k[i] = key |
|
||||||
i++ |
|
||||||
} |
|
||||||
|
|
||||||
sort.Sort(sorted) |
|
||||||
|
|
||||||
return sorted.k |
|
||||||
} |
|
@ -1,23 +0,0 @@ |
|||||||
package whisper |
|
||||||
|
|
||||||
import ( |
|
||||||
"testing" |
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common" |
|
||||||
) |
|
||||||
|
|
||||||
func TestSorting(t *testing.T) { |
|
||||||
m := map[int32]common.Hash{ |
|
||||||
1: {1}, |
|
||||||
3: {3}, |
|
||||||
2: {2}, |
|
||||||
5: {5}, |
|
||||||
} |
|
||||||
exp := []int32{1, 2, 3, 5} |
|
||||||
res := sortKeys(m) |
|
||||||
for i, k := range res { |
|
||||||
if k != exp[i] { |
|
||||||
t.Error(k, "failed. Expected", exp[i]) |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
@ -0,0 +1,61 @@ |
|||||||
|
// Contains the Whisper protocol Topic element. For formal details please see
|
||||||
|
// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#topics.
|
||||||
|
|
||||||
|
package whisper |
||||||
|
|
||||||
|
import "github.com/ethereum/go-ethereum/crypto" |
||||||
|
|
||||||
|
// Topic represents a cryptographically secure, probabilistic partial
|
||||||
|
// classifications of a message, determined as the first (left) 4 bytes of the
|
||||||
|
// SHA3 hash of some arbitrary data given by the original author of the message.
|
||||||
|
type Topic [4]byte |
||||||
|
|
||||||
|
// NewTopic creates a topic from the 4 byte prefix of the SHA3 hash of the data.
|
||||||
|
func NewTopic(data []byte) Topic { |
||||||
|
prefix := [4]byte{} |
||||||
|
copy(prefix[:], crypto.Sha3(data)[:4]) |
||||||
|
return Topic(prefix) |
||||||
|
} |
||||||
|
|
||||||
|
// NewTopics creates a list of topics from a list of binary data elements, by
|
||||||
|
// iteratively calling NewTopic on each of them.
|
||||||
|
func NewTopics(data ...[]byte) []Topic { |
||||||
|
topics := make([]Topic, len(data)) |
||||||
|
for i, element := range data { |
||||||
|
topics[i] = NewTopic(element) |
||||||
|
} |
||||||
|
return topics |
||||||
|
} |
||||||
|
|
||||||
|
// NewTopicFromString creates a topic using the binary data contents of the
|
||||||
|
// specified string.
|
||||||
|
func NewTopicFromString(data string) Topic { |
||||||
|
return NewTopic([]byte(data)) |
||||||
|
} |
||||||
|
|
||||||
|
// NewTopicsFromStrings creates a list of topics from a list of textual data
|
||||||
|
// elements, by iteratively calling NewTopicFromString on each of them.
|
||||||
|
func NewTopicsFromStrings(data ...string) []Topic { |
||||||
|
topics := make([]Topic, len(data)) |
||||||
|
for i, element := range data { |
||||||
|
topics[i] = NewTopicFromString(element) |
||||||
|
} |
||||||
|
return topics |
||||||
|
} |
||||||
|
|
||||||
|
// String converts a topic byte array to a string representation.
|
||||||
|
func (self *Topic) String() string { |
||||||
|
return string(self[:]) |
||||||
|
} |
||||||
|
|
||||||
|
// TopicSet represents a hash set to check if a topic exists or not.
|
||||||
|
type topicSet map[string]struct{} |
||||||
|
|
||||||
|
// NewTopicSet creates a topic hash set from a slice of topics.
|
||||||
|
func newTopicSet(topics []Topic) topicSet { |
||||||
|
set := make(map[string]struct{}) |
||||||
|
for _, topic := range topics { |
||||||
|
set[topic.String()] = struct{}{} |
||||||
|
} |
||||||
|
return topicSet(set) |
||||||
|
} |
@ -0,0 +1,67 @@ |
|||||||
|
package whisper |
||||||
|
|
||||||
|
import ( |
||||||
|
"bytes" |
||||||
|
"testing" |
||||||
|
) |
||||||
|
|
||||||
|
var topicCreationTests = []struct { |
||||||
|
data []byte |
||||||
|
hash [4]byte |
||||||
|
}{ |
||||||
|
{hash: [4]byte{0xc5, 0xd2, 0x46, 0x01}, data: nil}, |
||||||
|
{hash: [4]byte{0xc5, 0xd2, 0x46, 0x01}, data: []byte{}}, |
||||||
|
{hash: [4]byte{0x8f, 0x9a, 0x2b, 0x7d}, data: []byte("test name")}, |
||||||
|
} |
||||||
|
|
||||||
|
func TestTopicCreation(t *testing.T) { |
||||||
|
// Create the topics individually
|
||||||
|
for i, tt := range topicCreationTests { |
||||||
|
topic := NewTopic(tt.data) |
||||||
|
if bytes.Compare(topic[:], tt.hash[:]) != 0 { |
||||||
|
t.Errorf("binary test %d: hash mismatch: have %v, want %v.", i, topic, tt.hash) |
||||||
|
} |
||||||
|
} |
||||||
|
for i, tt := range topicCreationTests { |
||||||
|
topic := NewTopicFromString(string(tt.data)) |
||||||
|
if bytes.Compare(topic[:], tt.hash[:]) != 0 { |
||||||
|
t.Errorf("textual test %d: hash mismatch: have %v, want %v.", i, topic, tt.hash) |
||||||
|
} |
||||||
|
} |
||||||
|
// Create the topics in batches
|
||||||
|
binaryData := make([][]byte, len(topicCreationTests)) |
||||||
|
for i, tt := range topicCreationTests { |
||||||
|
binaryData[i] = tt.data |
||||||
|
} |
||||||
|
textualData := make([]string, len(topicCreationTests)) |
||||||
|
for i, tt := range topicCreationTests { |
||||||
|
textualData[i] = string(tt.data) |
||||||
|
} |
||||||
|
|
||||||
|
topics := NewTopics(binaryData...) |
||||||
|
for i, tt := range topicCreationTests { |
||||||
|
if bytes.Compare(topics[i][:], tt.hash[:]) != 0 { |
||||||
|
t.Errorf("binary batch test %d: hash mismatch: have %v, want %v.", i, topics[i], tt.hash) |
||||||
|
} |
||||||
|
} |
||||||
|
topics = NewTopicsFromStrings(textualData...) |
||||||
|
for i, tt := range topicCreationTests { |
||||||
|
if bytes.Compare(topics[i][:], tt.hash[:]) != 0 { |
||||||
|
t.Errorf("textual batch test %d: hash mismatch: have %v, want %v.", i, topics[i], tt.hash) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func TestTopicSetCreation(t *testing.T) { |
||||||
|
topics := make([]Topic, len(topicCreationTests)) |
||||||
|
for i, tt := range topicCreationTests { |
||||||
|
topics[i] = NewTopic(tt.data) |
||||||
|
} |
||||||
|
set := newTopicSet(topics) |
||||||
|
for i, tt := range topicCreationTests { |
||||||
|
topic := NewTopic(tt.data) |
||||||
|
if _, ok := set[topic.String()]; !ok { |
||||||
|
t.Errorf("topic %d: not found in set", i) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
@ -1,36 +0,0 @@ |
|||||||
package whisper |
|
||||||
|
|
||||||
import "github.com/ethereum/go-ethereum/crypto" |
|
||||||
|
|
||||||
func hashTopic(topic []byte) []byte { |
|
||||||
return crypto.Sha3(topic)[:4] |
|
||||||
} |
|
||||||
|
|
||||||
// NOTE this isn't DRY, but I don't want to iterate twice.
|
|
||||||
|
|
||||||
// Returns a formatted topics byte slice.
|
|
||||||
// data: unformatted data (e.g., no hashes needed)
|
|
||||||
func Topics(data [][]byte) [][]byte { |
|
||||||
d := make([][]byte, len(data)) |
|
||||||
for i, byts := range data { |
|
||||||
d[i] = hashTopic(byts) |
|
||||||
} |
|
||||||
return d |
|
||||||
} |
|
||||||
|
|
||||||
func TopicsFromString(data ...string) [][]byte { |
|
||||||
d := make([][]byte, len(data)) |
|
||||||
for i, str := range data { |
|
||||||
d[i] = hashTopic([]byte(str)) |
|
||||||
} |
|
||||||
return d |
|
||||||
} |
|
||||||
|
|
||||||
func bytesToMap(s [][]byte) map[string]struct{} { |
|
||||||
m := make(map[string]struct{}) |
|
||||||
for _, topic := range s { |
|
||||||
m[string(topic)] = struct{}{} |
|
||||||
} |
|
||||||
|
|
||||||
return m |
|
||||||
} |
|
@ -1,38 +1,185 @@ |
|||||||
package whisper |
package whisper |
||||||
|
|
||||||
import ( |
import ( |
||||||
"fmt" |
|
||||||
"testing" |
"testing" |
||||||
"time" |
"time" |
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/p2p" |
||||||
|
"github.com/ethereum/go-ethereum/p2p/discover" |
||||||
) |
) |
||||||
|
|
||||||
func TestEvent(t *testing.T) { |
func startTestCluster(n int) []*Whisper { |
||||||
res := make(chan *Message, 1) |
// Create the batch of simulated peers
|
||||||
whisper := New() |
nodes := make([]*p2p.Peer, n) |
||||||
id := whisper.NewIdentity() |
for i := 0; i < n; i++ { |
||||||
whisper.Watch(Filter{ |
nodes[i] = p2p.NewPeer(discover.NodeID{}, "", nil) |
||||||
To: &id.PublicKey, |
} |
||||||
|
whispers := make([]*Whisper, n) |
||||||
|
for i := 0; i < n; i++ { |
||||||
|
whispers[i] = New() |
||||||
|
whispers[i].Start() |
||||||
|
} |
||||||
|
// Wire all the peers to the root one
|
||||||
|
for i := 1; i < n; i++ { |
||||||
|
src, dst := p2p.MsgPipe() |
||||||
|
|
||||||
|
go whispers[0].handlePeer(nodes[i], src) |
||||||
|
go whispers[i].handlePeer(nodes[0], dst) |
||||||
|
} |
||||||
|
return whispers |
||||||
|
} |
||||||
|
|
||||||
|
func TestSelfMessage(t *testing.T) { |
||||||
|
// Start the single node cluster
|
||||||
|
client := startTestCluster(1)[0] |
||||||
|
|
||||||
|
// Start watching for self messages, signal any arrivals
|
||||||
|
self := client.NewIdentity() |
||||||
|
done := make(chan struct{}) |
||||||
|
|
||||||
|
client.Watch(Filter{ |
||||||
|
To: &self.PublicKey, |
||||||
Fn: func(msg *Message) { |
Fn: func(msg *Message) { |
||||||
res <- msg |
close(done) |
||||||
}, |
}, |
||||||
}) |
}) |
||||||
|
// Send a dummy message to oneself
|
||||||
msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now()))) |
msg := NewMessage([]byte("self whisper")) |
||||||
envelope, err := msg.Wrap(DefaultProofOfWork, Options{ |
envelope, err := msg.Wrap(DefaultPoW, Options{ |
||||||
TTL: DefaultTimeToLive, |
From: self, |
||||||
From: id, |
To: &self.PublicKey, |
||||||
To: &id.PublicKey, |
TTL: DefaultTTL, |
||||||
}) |
}) |
||||||
if err != nil { |
if err != nil { |
||||||
fmt.Println(err) |
t.Fatalf("failed to wrap message: %v", err) |
||||||
t.FailNow() |
} |
||||||
|
// Dump the message into the system and wait for it to pop back out
|
||||||
|
if err := client.Send(envelope); err != nil { |
||||||
|
t.Fatalf("failed to send self-message: %v", err) |
||||||
|
} |
||||||
|
select { |
||||||
|
case <-done: |
||||||
|
case <-time.After(time.Second): |
||||||
|
t.Fatalf("self-message receive timeout") |
||||||
} |
} |
||||||
|
} |
||||||
|
|
||||||
|
func TestDirectMessage(t *testing.T) { |
||||||
|
// Start the sender-recipient cluster
|
||||||
|
cluster := startTestCluster(2) |
||||||
|
|
||||||
tick := time.NewTicker(time.Second) |
sender := cluster[0] |
||||||
whisper.postEvent(envelope) |
senderId := sender.NewIdentity() |
||||||
|
|
||||||
|
recipient := cluster[1] |
||||||
|
recipientId := recipient.NewIdentity() |
||||||
|
|
||||||
|
// Watch for arriving messages on the recipient
|
||||||
|
done := make(chan struct{}) |
||||||
|
recipient.Watch(Filter{ |
||||||
|
To: &recipientId.PublicKey, |
||||||
|
Fn: func(msg *Message) { |
||||||
|
close(done) |
||||||
|
}, |
||||||
|
}) |
||||||
|
// Send a dummy message from the sender
|
||||||
|
msg := NewMessage([]byte("direct whisper")) |
||||||
|
envelope, err := msg.Wrap(DefaultPoW, Options{ |
||||||
|
From: senderId, |
||||||
|
To: &recipientId.PublicKey, |
||||||
|
TTL: DefaultTTL, |
||||||
|
}) |
||||||
|
if err != nil { |
||||||
|
t.Fatalf("failed to wrap message: %v", err) |
||||||
|
} |
||||||
|
if err := sender.Send(envelope); err != nil { |
||||||
|
t.Fatalf("failed to send direct message: %v", err) |
||||||
|
} |
||||||
|
// Wait for an arrival or a timeout
|
||||||
select { |
select { |
||||||
case <-res: |
case <-done: |
||||||
case <-tick.C: |
case <-time.After(time.Second): |
||||||
t.Error("did not receive message") |
t.Fatalf("direct message receive timeout") |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func TestAnonymousBroadcast(t *testing.T) { |
||||||
|
testBroadcast(true, t) |
||||||
|
} |
||||||
|
|
||||||
|
func TestIdentifiedBroadcast(t *testing.T) { |
||||||
|
testBroadcast(false, t) |
||||||
|
} |
||||||
|
|
||||||
|
func testBroadcast(anonymous bool, t *testing.T) { |
||||||
|
// Start the single sender multi recipient cluster
|
||||||
|
cluster := startTestCluster(3) |
||||||
|
|
||||||
|
sender := cluster[1] |
||||||
|
targets := cluster[1:] |
||||||
|
for _, target := range targets { |
||||||
|
if !anonymous { |
||||||
|
target.NewIdentity() |
||||||
|
} |
||||||
|
} |
||||||
|
// Watch for arriving messages on the recipients
|
||||||
|
dones := make([]chan struct{}, len(targets)) |
||||||
|
for i := 0; i < len(targets); i++ { |
||||||
|
done := make(chan struct{}) // need for the closure
|
||||||
|
dones[i] = done |
||||||
|
|
||||||
|
targets[i].Watch(Filter{ |
||||||
|
Topics: NewTopicsFromStrings("broadcast topic"), |
||||||
|
Fn: func(msg *Message) { |
||||||
|
close(done) |
||||||
|
}, |
||||||
|
}) |
||||||
|
} |
||||||
|
// Send a dummy message from the sender
|
||||||
|
msg := NewMessage([]byte("broadcast whisper")) |
||||||
|
envelope, err := msg.Wrap(DefaultPoW, Options{ |
||||||
|
Topics: NewTopicsFromStrings("broadcast topic"), |
||||||
|
TTL: DefaultTTL, |
||||||
|
}) |
||||||
|
if err != nil { |
||||||
|
t.Fatalf("failed to wrap message: %v", err) |
||||||
|
} |
||||||
|
if err := sender.Send(envelope); err != nil { |
||||||
|
t.Fatalf("failed to send broadcast message: %v", err) |
||||||
|
} |
||||||
|
// Wait for an arrival on each recipient, or timeouts
|
||||||
|
timeout := time.After(time.Second) |
||||||
|
for _, done := range dones { |
||||||
|
select { |
||||||
|
case <-done: |
||||||
|
case <-timeout: |
||||||
|
t.Fatalf("broadcast message receive timeout") |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func TestMessageExpiration(t *testing.T) { |
||||||
|
// Start the single node cluster and inject a dummy message
|
||||||
|
node := startTestCluster(1)[0] |
||||||
|
|
||||||
|
message := NewMessage([]byte("expiring message")) |
||||||
|
envelope, err := message.Wrap(DefaultPoW, Options{ |
||||||
|
TTL: time.Second, |
||||||
|
}) |
||||||
|
if err != nil { |
||||||
|
t.Fatalf("failed to wrap message: %v", err) |
||||||
|
} |
||||||
|
if err := node.Send(envelope); err != nil { |
||||||
|
t.Fatalf("failed to inject message: %v", err) |
||||||
|
} |
||||||
|
// Check that the message is inside the cache
|
||||||
|
if _, ok := node.messages[envelope.Hash()]; !ok { |
||||||
|
t.Fatalf("message not found in cache") |
||||||
|
} |
||||||
|
// Wait for expiration and check cache again
|
||||||
|
time.Sleep(time.Second) // wait for expiration
|
||||||
|
time.Sleep(expirationCycle) // wait for cleanup cycle
|
||||||
|
if _, ok := node.messages[envelope.Hash()]; ok { |
||||||
|
t.Fatalf("message not expired from cache") |
||||||
} |
} |
||||||
} |
} |
||||||
|
Loading…
Reference in new issue