From 3563c59b12b0b8b5fd15847bf97d71dfd8416207 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 17 Apr 2015 16:45:44 +0300 Subject: [PATCH 01/14] rpc, whisper, xeth: polish whisper RPC interface --- rpc/api.go | 17 ++--- whisper/whisper.go | 13 +--- xeth/whisper.go | 135 ++++++++++++++++------------------------ xeth/whisper_filter.go | 26 ++++++++ xeth/whisper_message.go | 31 +++++++++ xeth/xeth.go | 27 ++------ 6 files changed, 129 insertions(+), 120 deletions(-) create mode 100644 xeth/whisper_filter.go create mode 100644 xeth/whisper_message.go diff --git a/rpc/api.go b/rpc/api.go index 5930a4c7b3..614e087640 100644 --- a/rpc/api.go +++ b/rpc/api.go @@ -408,18 +408,18 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err *reply = newHexData(res) case "shh_version": *reply = api.xeth().WhisperVersion() + case "shh_post": args := new(WhisperMessageArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err } - err := api.xeth().Whisper().Post(args.Payload, args.To, args.From, args.Topics, args.Priority, args.Ttl) if err != nil { return err } - *reply = true + case "shh_newIdentity": *reply = api.xeth().Whisper().NewIdentity() // case "shh_removeIdentity": @@ -434,32 +434,35 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err return err } *reply = api.xeth().Whisper().HasIdentity(args.Identity) + case "shh_newGroup", "shh_addToGroup": return NewNotImplementedError(req.Method) + case "shh_newFilter": args := new(WhisperFilterArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err } - opts := new(xeth.Options) - // opts.From = args.From - opts.To = args.To - opts.Topics = args.Topics - id := api.xeth().NewWhisperFilter(opts) + id := api.xeth().NewWhisperFilter(args.To, args.From, args.Topics) *reply = newHexNum(big.NewInt(int64(id)).Bytes()) + case "shh_uninstallFilter": args := new(FilterIdArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err } *reply = api.xeth().UninstallWhisperFilter(args.Id) + case "shh_getFilterChanges": + // Retrieve all the new messages arrived since the last request args := new(FilterIdArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err } *reply = api.xeth().MessagesChanged(args.Id) + case "shh_getMessages": + // Retrieve all the cached messages matching a specific, existing filter args := new(FilterIdArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err diff --git a/whisper/whisper.go b/whisper/whisper.go index 9317fad50e..59a1a63c48 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -58,6 +58,8 @@ type Whisper struct { quit chan struct{} } +// New creates a Whisper client ready to communicate through the Ethereum P2P +// network. func New() *Whisper { whisper := &Whisper{ filters: filter.New(), @@ -148,7 +150,7 @@ func (self *Whisper) Stop() { glog.V(logger.Info).Infoln("Whisper stopped") } -// Messages retrieves the currently pooled messages matching a filter id. +// Messages retrieves all the currently pooled messages matching a filter id. func (self *Whisper) Messages(id int) []*Message { messages := make([]*Message, 0) if filter := self.filters.Get(id); filter != nil { @@ -163,15 +165,6 @@ func (self *Whisper) Messages(id int) []*Message { return messages } -// func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool { -// k := string(crypto.FromECDSAPub(key)) -// if _, ok := self.keys[k]; ok { -// delete(self.keys, k) -// return true -// } -// return false -// } - // handlePeer is called by the underlying P2P layer when the whisper sub-protocol // connection is negotiated. func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { diff --git a/xeth/whisper.go b/xeth/whisper.go index 342910b5c3..386897f395 100644 --- a/xeth/whisper.go +++ b/xeth/whisper.go @@ -1,7 +1,9 @@ +// Contains the external API to the whisper sub-protocol. + package xeth import ( - "errors" + "fmt" "time" "github.com/ethereum/go-ethereum/common" @@ -12,109 +14,78 @@ import ( var qlogger = logger.NewLogger("XSHH") +// Whisper represents the API wrapper around the internal whisper implementation. type Whisper struct { *whisper.Whisper } +// NewWhisper wraps an internal whisper client into an external API version. func NewWhisper(w *whisper.Whisper) *Whisper { return &Whisper{w} } -func (self *Whisper) Post(payload string, to, from string, topics []string, priority, ttl uint32) error { - if priority == 0 { - priority = 1000 - } - - if ttl == 0 { - ttl = 100 - } - - pk := crypto.ToECDSAPub(common.FromHex(from)) - if key := self.Whisper.GetIdentity(pk); key != nil || len(from) == 0 { - msg := whisper.NewMessage(common.FromHex(payload)) - envelope, err := msg.Wrap(time.Duration(priority*100000), whisper.Options{ - TTL: time.Duration(ttl) * time.Second, - To: crypto.ToECDSAPub(common.FromHex(to)), - From: key, - Topics: whisper.NewTopicsFromStrings(topics...), - }) - - if err != nil { - return err - } - - if err := self.Whisper.Send(envelope); err != nil { - return err - } - } else { - return errors.New("unmatched pub / priv for seal") - } - - return nil -} - +// NewIdentity generates a new cryptographic identity for the client, and injects +// it into the known identities for message decryption. func (self *Whisper) NewIdentity() string { - key := self.Whisper.NewIdentity() - - return common.ToHex(crypto.FromECDSAPub(&key.PublicKey)) + identity := self.Whisper.NewIdentity() + return common.ToHex(crypto.FromECDSAPub(&identity.PublicKey)) } +// HasIdentity checks if the the whisper node is configured with the private key +// of the specified public pair. func (self *Whisper) HasIdentity(key string) bool { return self.Whisper.HasIdentity(crypto.ToECDSAPub(common.FromHex(key))) } -// func (self *Whisper) RemoveIdentity(key string) bool { -// return self.Whisper.RemoveIdentity(crypto.ToECDSAPub(common.FromHex(key))) -// } - -func (self *Whisper) Watch(opts *Options) int { - filter := whisper.Filter{ - To: crypto.ToECDSAPub(common.FromHex(opts.To)), - From: crypto.ToECDSAPub(common.FromHex(opts.From)), - Topics: whisper.NewTopicsFromStrings(opts.Topics...), +// Post injects a message into the whisper network for distribution. +func (self *Whisper) Post(payload string, to, from string, topics []string, priority, ttl uint32) error { + // Construct the whisper message and transmission options + message := whisper.NewMessage(common.FromHex(payload)) + options := whisper.Options{ + To: crypto.ToECDSAPub(common.FromHex(to)), + TTL: time.Duration(ttl) * time.Second, + Topics: whisper.NewTopicsFromStrings(topics...), } - - var i int - filter.Fn = func(msg *whisper.Message) { - opts.Fn(NewWhisperMessage(msg)) + if len(from) != 0 { + if key := self.Whisper.GetIdentity(crypto.ToECDSAPub(common.FromHex(from))); key != nil { + options.From = key + } else { + return fmt.Errorf("unknown identity to send from: %s", from) + } } - - i = self.Whisper.Watch(filter) - - return i -} - -func (self *Whisper) Messages(id int) (messages []WhisperMessage) { - msgs := self.Whisper.Messages(id) - messages = make([]WhisperMessage, len(msgs)) - for i, message := range msgs { - messages[i] = NewWhisperMessage(message) + // Wrap and send the message + pow := time.Duration(priority) * time.Millisecond + envelope, err := message.Wrap(pow, options) + if err != nil { + return err } - - return + if err := self.Whisper.Send(envelope); err != nil { + return err + } + return nil } -type Options struct { - To string - From string - Topics []string - Fn func(msg WhisperMessage) +// Watch installs a new message handler to run in case a matching packet arrives +// from the whisper network. +func (self *Whisper) Watch(to, from string, topics []string, fn func(WhisperMessage)) int { + filter := whisper.Filter{ + To: crypto.ToECDSAPub(common.FromHex(to)), + From: crypto.ToECDSAPub(common.FromHex(from)), + Topics: whisper.NewTopicsFromStrings(topics...), + } + filter.Fn = func(message *whisper.Message) { + fn(NewWhisperMessage(message)) + } + return self.Whisper.Watch(filter) } -type WhisperMessage struct { - ref *whisper.Message - Payload string `json:"payload"` - To string `json:"to"` - From string `json:"from"` - Sent int64 `json:"sent"` -} +// Messages retrieves all the currently pooled messages matching a filter id. +func (self *Whisper) Messages(id int) []WhisperMessage { + pool := self.Whisper.Messages(id) -func NewWhisperMessage(msg *whisper.Message) WhisperMessage { - return WhisperMessage{ - ref: msg, - Payload: common.ToHex(msg.Payload), - From: common.ToHex(crypto.FromECDSAPub(msg.Recover())), - To: common.ToHex(crypto.FromECDSAPub(msg.To)), - Sent: msg.Sent, + messages := make([]WhisperMessage, len(pool)) + for i, message := range pool { + messages[i] = NewWhisperMessage(message) } + return messages } diff --git a/xeth/whisper_filter.go b/xeth/whisper_filter.go new file mode 100644 index 0000000000..9d8a739b76 --- /dev/null +++ b/xeth/whisper_filter.go @@ -0,0 +1,26 @@ +// Contains the external API side message filter for watching, pooling and polling +// matched whisper messages. + +package xeth + +import "time" + +// whisperFilter is the message cache matching a specific filter, accumulating +// inbound messages until the are requested by the client. +type whisperFilter struct { + id int // Filter identifier + cache []WhisperMessage // Cache of messages not yet polled + timeout time.Time // Time when the last message batch was queries +} + +// insert injects a new batch of messages into the filter cache. +func (w *whisperFilter) insert(msgs ...WhisperMessage) { + w.cache = append(w.cache, msgs...) +} + +// retrieve fetches all the cached messages from the filter. +func (w *whisperFilter) retrieve() (messages []WhisperMessage) { + messages, w.cache = w.cache, nil + w.timeout = time.Now() + return +} diff --git a/xeth/whisper_message.go b/xeth/whisper_message.go new file mode 100644 index 0000000000..14796cfbc1 --- /dev/null +++ b/xeth/whisper_message.go @@ -0,0 +1,31 @@ +// Contains the external API representation of a whisper message. + +package xeth + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/whisper" +) + +// WhisperMessage is the external API representation of a whisper.Message. +type WhisperMessage struct { + ref *whisper.Message + + Payload string `json:"payload"` + To string `json:"to"` + From string `json:"from"` + Sent int64 `json:"sent"` +} + +// NewWhisperMessage converts an internal message into an API version. +func NewWhisperMessage(message *whisper.Message) WhisperMessage { + return WhisperMessage{ + ref: message, + + Payload: common.ToHex(message.Payload), + From: common.ToHex(crypto.FromECDSAPub(message.Recover())), + To: common.ToHex(crypto.FromECDSAPub(message.To)), + Sent: message.Sent, + } +} diff --git a/xeth/xeth.go b/xeth/xeth.go index 693acb9107..e7e553036f 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -452,14 +452,15 @@ func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []strin return filter.Find() } -func (p *XEth) NewWhisperFilter(opts *Options) int { +func (p *XEth) NewWhisperFilter(to, from string, topics []string) int { var id int - opts.Fn = func(msg WhisperMessage) { + callback := func(msg WhisperMessage) { p.messagesMut.Lock() defer p.messagesMut.Unlock() - p.messages[id].add(msg) // = append(p.messages[id], msg) + + p.messages[id].insert(msg) } - id = p.Whisper().Watch(opts) + id = p.Whisper().Watch(to, from, topics, callback) p.messages[id] = &whisperFilter{timeout: time.Now()} return id } @@ -478,7 +479,7 @@ func (self *XEth) MessagesChanged(id int) []WhisperMessage { defer self.messagesMut.Unlock() if self.messages[id] != nil { - return self.messages[id].get() + return self.messages[id].retrieve() } return nil @@ -731,22 +732,6 @@ func (m callmsg) Gas() *big.Int { return m.gas } func (m callmsg) Value() *big.Int { return m.value } func (m callmsg) Data() []byte { return m.data } -type whisperFilter struct { - messages []WhisperMessage - timeout time.Time - id int -} - -func (w *whisperFilter) add(msgs ...WhisperMessage) { - w.messages = append(w.messages, msgs...) -} -func (w *whisperFilter) get() []WhisperMessage { - w.timeout = time.Now() - tmp := w.messages - w.messages = nil - return tmp -} - type logFilter struct { logs state.Logs timeout time.Time From 5aa523e32bedd923c4075a21daefd1b4a512277c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 17 Apr 2015 18:19:39 +0300 Subject: [PATCH 02/14] whisper: fix send timestamp omission during envelope opening --- whisper/envelope.go | 1 + 1 file changed, 1 insertion(+) diff --git a/whisper/envelope.go b/whisper/envelope.go index 07762c3004..ba3e4ccc94 100644 --- a/whisper/envelope.go +++ b/whisper/envelope.go @@ -72,6 +72,7 @@ func (self *Envelope) Open(key *ecdsa.PrivateKey) (msg *Message, err error) { message := &Message{ Flags: data[0], + Sent: int64(self.Expiry - self.TTL), } data = data[1:] From 7948cc0029db76557d6540341bdfeb818ce32c65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 20 Apr 2015 14:56:38 +0300 Subject: [PATCH 03/14] rpc, whisper, xeth: fix RPC message retrieval data race --- rpc/api.go | 2 +- whisper/envelope.go | 1 + whisper/envelope_test.go | 36 +++++++++++++++++++ whisper/message.go | 6 ++-- xeth/whisper_filter.go | 74 +++++++++++++++++++++++++++++++++++----- xeth/xeth.go | 13 +++++-- 6 files changed, 119 insertions(+), 13 deletions(-) create mode 100644 whisper/envelope_test.go diff --git a/rpc/api.go b/rpc/api.go index 614e087640..a73188f07a 100644 --- a/rpc/api.go +++ b/rpc/api.go @@ -467,7 +467,7 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err if err := json.Unmarshal(req.Params, &args); err != nil { return err } - *reply = api.xeth().Whisper().Messages(args.Id) + *reply = api.xeth().Messages(args.Id) // case "eth_register": // // Placeholder for actual type diff --git a/whisper/envelope.go b/whisper/envelope.go index ba3e4ccc94..c1d84df78b 100644 --- a/whisper/envelope.go +++ b/whisper/envelope.go @@ -73,6 +73,7 @@ func (self *Envelope) Open(key *ecdsa.PrivateKey) (msg *Message, err error) { message := &Message{ Flags: data[0], Sent: int64(self.Expiry - self.TTL), + Hash: self.Hash(), } data = data[1:] diff --git a/whisper/envelope_test.go b/whisper/envelope_test.go new file mode 100644 index 0000000000..ed1f083651 --- /dev/null +++ b/whisper/envelope_test.go @@ -0,0 +1,36 @@ +package whisper + +import ( + "bytes" + "testing" +) + +func TestEnvelopeOpen(t *testing.T) { + payload := []byte("hello world") + message := NewMessage(payload) + + envelope, err := message.Wrap(DefaultPoW, Options{}) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + opened, err := envelope.Open(nil) + if err != nil { + t.Fatalf("failed to open envelope: %v.", err) + } + if opened.Flags != message.Flags { + t.Fatalf("flags mismatch: have %d, want %d", opened.Flags, message.Flags) + } + if bytes.Compare(opened.Signature, message.Signature) != 0 { + t.Fatalf("signature mismatch: have 0x%x, want 0x%x", opened.Signature, message.Signature) + } + if bytes.Compare(opened.Payload, message.Payload) != 0 { + t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, message.Payload) + } + if opened.Sent != message.Sent { + t.Fatalf("send time mismatch: have %d, want %d", opened.Sent, message.Sent) + } + + if opened.Hash != envelope.Hash() { + t.Fatalf("message hash mismatch: have 0x%x, want 0x%x", opened.Hash, envelope.Hash()) + } +} diff --git a/whisper/message.go b/whisper/message.go index 07c6735677..69d85b8948 100644 --- a/whisper/message.go +++ b/whisper/message.go @@ -8,12 +8,13 @@ import ( "math/rand" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" ) -// Message represents an end-user data packet to trasmit through the Whisper +// Message represents an end-user data packet to transmit through the Whisper // protocol. These are wrapped into Envelopes that need not be understood by // intermediate nodes, just forwarded. type Message struct { @@ -22,7 +23,8 @@ type Message struct { Payload []byte Sent int64 - To *ecdsa.PublicKey + To *ecdsa.PublicKey // Message recipient (identity used to decode the message) + Hash common.Hash // Message envelope hash to act as a unique id in de-duplication } // Options specifies the exact way a message should be wrapped into an Envelope. diff --git a/xeth/whisper_filter.go b/xeth/whisper_filter.go index 9d8a739b76..52e70e041d 100644 --- a/xeth/whisper_filter.go +++ b/xeth/whisper_filter.go @@ -1,26 +1,84 @@ // Contains the external API side message filter for watching, pooling and polling -// matched whisper messages. +// matched whisper messages, also serializing data access to avoid duplications. package xeth -import "time" +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" +) // whisperFilter is the message cache matching a specific filter, accumulating // inbound messages until the are requested by the client. type whisperFilter struct { - id int // Filter identifier - cache []WhisperMessage // Cache of messages not yet polled - timeout time.Time // Time when the last message batch was queries + id int // Filter identifier for old message retrieval + ref *Whisper // Whisper reference for old message retrieval + + cache []WhisperMessage // Cache of messages not yet polled + skip map[common.Hash]struct{} // List of retrieved messages to avoid duplication + update time.Time // Time of the last message query + + lock sync.RWMutex // Lock protecting the filter internals +} + +// newWhisperFilter creates a new serialized, poll based whisper topic filter. +func newWhisperFilter(id int, ref *Whisper) *whisperFilter { + return &whisperFilter{ + id: id, + ref: ref, + + update: time.Now(), + skip: make(map[common.Hash]struct{}), + } +} + +// messages retrieves all the cached messages from the entire pool matching the +// filter, resetting the filter's change buffer. +func (w *whisperFilter) messages() []WhisperMessage { + w.lock.Lock() + defer w.lock.Unlock() + + w.cache = nil + w.update = time.Now() + + w.skip = make(map[common.Hash]struct{}) + messages := w.ref.Messages(w.id) + for _, message := range messages { + w.skip[message.ref.Hash] = struct{}{} + } + return messages } // insert injects a new batch of messages into the filter cache. -func (w *whisperFilter) insert(msgs ...WhisperMessage) { - w.cache = append(w.cache, msgs...) +func (w *whisperFilter) insert(messages ...WhisperMessage) { + w.lock.Lock() + defer w.lock.Unlock() + + for _, message := range messages { + if _, ok := w.skip[message.ref.Hash]; !ok { + w.cache = append(w.cache, messages...) + } + } } // retrieve fetches all the cached messages from the filter. func (w *whisperFilter) retrieve() (messages []WhisperMessage) { + w.lock.Lock() + defer w.lock.Unlock() + messages, w.cache = w.cache, nil - w.timeout = time.Now() + w.update = time.Now() + return } + +// activity returns the last time instance when client requests were executed on +// the filter. +func (w *whisperFilter) activity() time.Time { + w.lock.RLock() + defer w.lock.RUnlock() + + return w.update +} diff --git a/xeth/xeth.go b/xeth/xeth.go index e7e553036f..8cc32c9586 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -97,7 +97,7 @@ done: } for id, filter := range self.messages { - if time.Since(filter.timeout) > filterTickerTime { + if time.Since(filter.activity()) > filterTickerTime { self.Whisper().Unwatch(id) delete(self.messages, id) } @@ -461,7 +461,7 @@ func (p *XEth) NewWhisperFilter(to, from string, topics []string) int { p.messages[id].insert(msg) } id = p.Whisper().Watch(to, from, topics, callback) - p.messages[id] = &whisperFilter{timeout: time.Now()} + p.messages[id] = newWhisperFilter(id, p.Whisper()) return id } @@ -481,7 +481,16 @@ func (self *XEth) MessagesChanged(id int) []WhisperMessage { if self.messages[id] != nil { return self.messages[id].retrieve() } + return nil +} + +func (self *XEth) Messages(id int) []WhisperMessage { + self.messagesMut.Lock() + defer self.messagesMut.Unlock() + if self.messages[id] != nil { + return self.messages[id].messages() + } return nil } From 19bc4624eaefc2c8201260e7afa1a5893159bffc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 20 Apr 2015 14:58:23 +0300 Subject: [PATCH 04/14] eth: pull in a lost merge change Ref: https://github.com/Gustav-Simonsson/go-ethereum/commit/21c4c155ee68890a069654dcc5bc083a867f65cd --- eth/backend.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index 783f339088..466912899a 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -222,10 +222,12 @@ func New(config *Config) (*Ethereum, error) { eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit) eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux()) eth.chainManager.SetProcessor(eth.blockProcessor) - eth.whisper = whisper.New() - eth.shhVersionId = int(eth.whisper.Version()) eth.miner = miner.New(eth, eth.pow, config.MinerThreads) eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager, eth.downloader) + if config.Shh { + eth.whisper = whisper.New() + eth.shhVersionId = int(eth.whisper.Version()) + } netprv, err := config.nodeKey() if err != nil { From 7f48eb8737878e352a65475382532db26f9fbc52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 21 Apr 2015 11:43:11 +0300 Subject: [PATCH 05/14] whisper, xeth/whisper: surface TTL and hash to the API --- whisper/envelope.go | 3 ++- whisper/envelope_test.go | 6 +++++- whisper/message.go | 10 +++++++--- whisper/message_test.go | 4 ++++ xeth/whisper_message.go | 8 +++++++- 5 files changed, 25 insertions(+), 6 deletions(-) diff --git a/whisper/envelope.go b/whisper/envelope.go index c1d84df78b..a4e2fa031c 100644 --- a/whisper/envelope.go +++ b/whisper/envelope.go @@ -72,7 +72,8 @@ func (self *Envelope) Open(key *ecdsa.PrivateKey) (msg *Message, err error) { message := &Message{ Flags: data[0], - Sent: int64(self.Expiry - self.TTL), + Sent: time.Unix(int64(self.Expiry-self.TTL), 0), + TTL: time.Duration(self.TTL) * time.Second, Hash: self.Hash(), } data = data[1:] diff --git a/whisper/envelope_test.go b/whisper/envelope_test.go index ed1f083651..3117284f17 100644 --- a/whisper/envelope_test.go +++ b/whisper/envelope_test.go @@ -3,6 +3,7 @@ package whisper import ( "bytes" "testing" + "time" ) func TestEnvelopeOpen(t *testing.T) { @@ -26,9 +27,12 @@ func TestEnvelopeOpen(t *testing.T) { if bytes.Compare(opened.Payload, message.Payload) != 0 { t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, message.Payload) } - if opened.Sent != message.Sent { + if opened.Sent.Unix() != message.Sent.Unix() { t.Fatalf("send time mismatch: have %d, want %d", opened.Sent, message.Sent) } + if opened.TTL/time.Second != DefaultTTL/time.Second { + t.Fatalf("message TTL mismatch: have %v, want %v", opened.TTL, DefaultTTL) + } if opened.Hash != envelope.Hash() { t.Fatalf("message hash mismatch: have 0x%x, want 0x%x", opened.Hash, envelope.Hash()) diff --git a/whisper/message.go b/whisper/message.go index 69d85b8948..2b92d515c4 100644 --- a/whisper/message.go +++ b/whisper/message.go @@ -21,10 +21,12 @@ type Message struct { Flags byte // First bit is signature presence, rest reserved and should be random Signature []byte Payload []byte - Sent int64 + + Sent time.Time // Time when the message was posted into the network + TTL time.Duration // Maximum time to live allowed for the message To *ecdsa.PublicKey // Message recipient (identity used to decode the message) - Hash common.Hash // Message envelope hash to act as a unique id in de-duplication + Hash common.Hash // Message envelope hash to act as a unique id } // Options specifies the exact way a message should be wrapped into an Envelope. @@ -45,7 +47,7 @@ func NewMessage(payload []byte) *Message { return &Message{ Flags: flags, Payload: payload, - Sent: time.Now().Unix(), + Sent: time.Now(), } } @@ -66,6 +68,8 @@ func (self *Message) Wrap(pow time.Duration, options Options) (*Envelope, error) if options.TTL == 0 { options.TTL = DefaultTTL } + self.TTL = options.TTL + // Sign and encrypt the message if requested if options.From != nil { if err := self.sign(options.From); err != nil { diff --git a/whisper/message_test.go b/whisper/message_test.go index 18a254e5c9..0b4a24c248 100644 --- a/whisper/message_test.go +++ b/whisper/message_test.go @@ -4,6 +4,7 @@ import ( "bytes" "crypto/elliptic" "testing" + "time" "github.com/ethereum/go-ethereum/crypto" ) @@ -25,6 +26,9 @@ func TestMessageSimpleWrap(t *testing.T) { if bytes.Compare(msg.Payload, payload) != 0 { t.Fatalf("payload mismatch after wrapping: have 0x%x, want 0x%x", msg.Payload, payload) } + if msg.TTL/time.Second != DefaultTTL/time.Second { + t.Fatalf("message TTL mismatch: have %v, want %v", msg.TTL, DefaultTTL) + } } // Tests whether a message can be signed, and wrapped in plain-text. diff --git a/xeth/whisper_message.go b/xeth/whisper_message.go index 14796cfbc1..c8195cec12 100644 --- a/xeth/whisper_message.go +++ b/xeth/whisper_message.go @@ -3,6 +3,8 @@ package xeth import ( + "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/whisper" @@ -16,6 +18,8 @@ type WhisperMessage struct { To string `json:"to"` From string `json:"from"` Sent int64 `json:"sent"` + TTL int64 `json:"ttl"` + Hash string `json:"hash"` } // NewWhisperMessage converts an internal message into an API version. @@ -26,6 +30,8 @@ func NewWhisperMessage(message *whisper.Message) WhisperMessage { Payload: common.ToHex(message.Payload), From: common.ToHex(crypto.FromECDSAPub(message.Recover())), To: common.ToHex(crypto.FromECDSAPub(message.To)), - Sent: message.Sent, + Sent: message.Sent.Unix(), + TTL: int64(message.TTL / time.Second), + Hash: common.ToHex(message.Hash.Bytes()), } } From 87447f9f3f99cc59d58b029fff39fc39142f1281 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 21 Apr 2015 12:13:57 +0300 Subject: [PATCH 06/14] whisper: fix payload loss in case of plaintext decrypt --- whisper/envelope_test.go | 104 ++++++++++++++++++++++++++++++++++++++- whisper/message.go | 9 ++-- 2 files changed, 109 insertions(+), 4 deletions(-) diff --git a/whisper/envelope_test.go b/whisper/envelope_test.go index 3117284f17..b64767b2e7 100644 --- a/whisper/envelope_test.go +++ b/whisper/envelope_test.go @@ -4,6 +4,9 @@ import ( "bytes" "testing" "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/ecies" ) func TestEnvelopeOpen(t *testing.T) { @@ -16,7 +19,7 @@ func TestEnvelopeOpen(t *testing.T) { } opened, err := envelope.Open(nil) if err != nil { - t.Fatalf("failed to open envelope: %v.", err) + t.Fatalf("failed to open envelope: %v", err) } if opened.Flags != message.Flags { t.Fatalf("flags mismatch: have %d, want %d", opened.Flags, message.Flags) @@ -38,3 +41,102 @@ func TestEnvelopeOpen(t *testing.T) { t.Fatalf("message hash mismatch: have 0x%x, want 0x%x", opened.Hash, envelope.Hash()) } } + +func TestEnvelopeAnonymousOpenUntargeted(t *testing.T) { + payload := []byte("hello envelope") + envelope, err := NewMessage(payload).Wrap(DefaultPoW, Options{}) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + opened, err := envelope.Open(nil) + if err != nil { + t.Fatalf("failed to open envelope: %v", err) + } + if opened.To != nil { + t.Fatalf("recipient mismatch: have 0x%x, want nil", opened.To) + } + if bytes.Compare(opened.Payload, payload) != 0 { + t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, payload) + } +} + +func TestEnvelopeAnonymousOpenTargeted(t *testing.T) { + key, err := crypto.GenerateKey() + if err != nil { + t.Fatalf("failed to generate test identity: %v", err) + } + + payload := []byte("hello envelope") + envelope, err := NewMessage(payload).Wrap(DefaultPoW, Options{ + To: &key.PublicKey, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + opened, err := envelope.Open(nil) + if err != nil { + t.Fatalf("failed to open envelope: %v", err) + } + if opened.To != nil { + t.Fatalf("recipient mismatch: have 0x%x, want nil", opened.To) + } + if bytes.Compare(opened.Payload, payload) == 0 { + t.Fatalf("payload match, should have been encrypted: 0x%x", opened.Payload) + } +} + +func TestEnvelopeIdentifiedOpenUntargeted(t *testing.T) { + key, err := crypto.GenerateKey() + if err != nil { + t.Fatalf("failed to generate test identity: %v", err) + } + + payload := []byte("hello envelope") + envelope, err := NewMessage(payload).Wrap(DefaultPoW, Options{}) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + opened, err := envelope.Open(key) + switch err { + case nil: + t.Fatalf("envelope opened with bad key: %v", opened) + + case ecies.ErrInvalidPublicKey: + // Ok, key mismatch but opened + + default: + t.Fatalf("failed to open envelope: %v", err) + } + + if opened.To != nil { + t.Fatalf("recipient mismatch: have 0x%x, want nil", opened.To) + } + if bytes.Compare(opened.Payload, payload) != 0 { + t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, payload) + } +} + +func TestEnvelopeIdentifiedOpenTargeted(t *testing.T) { + key, err := crypto.GenerateKey() + if err != nil { + t.Fatalf("failed to generate test identity: %v", err) + } + + payload := []byte("hello envelope") + envelope, err := NewMessage(payload).Wrap(DefaultPoW, Options{ + To: &key.PublicKey, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + opened, err := envelope.Open(key) + if err != nil { + t.Fatalf("failed to open envelope: %v", err) + } + if opened.To != nil { + t.Fatalf("recipient mismatch: have 0x%x, want nil", opened.To) + } + if bytes.Compare(opened.Payload, payload) != 0 { + t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, payload) + } +} diff --git a/whisper/message.go b/whisper/message.go index 2b92d515c4..a80380a92b 100644 --- a/whisper/message.go +++ b/whisper/message.go @@ -120,9 +120,12 @@ func (self *Message) encrypt(key *ecdsa.PublicKey) (err error) { } // decrypt decrypts an encrypted payload with a private key. -func (self *Message) decrypt(key *ecdsa.PrivateKey) (err error) { - self.Payload, err = crypto.Decrypt(key, self.Payload) - return +func (self *Message) decrypt(key *ecdsa.PrivateKey) error { + cleartext, err := crypto.Decrypt(key, self.Payload) + if err == nil { + self.Payload = cleartext + } + return err } // hash calculates the SHA3 checksum of the message flags and payload. From 15586368e52f49a0f7ea28f890af49d196760846 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 21 Apr 2015 12:45:10 +0300 Subject: [PATCH 07/14] whisper: fix spurious From identity with untargeted messages --- whisper/whisper.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/whisper/whisper.go b/whisper/whisper.go index 59a1a63c48..61999f07ad 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -260,9 +260,11 @@ func (self *Whisper) open(envelope *Envelope) *Message { // Iterate over the keys and try to decrypt the message for _, key := range self.keys { message, err := envelope.Open(key) - if err == nil || err == ecies.ErrInvalidPublicKey { + if err == nil { message.To = &key.PublicKey return message + } else if err == ecies.ErrInvalidPublicKey { + return message } } // Failed to decrypt, don't return anything From ae4bfc3cfb3f1debad9dd0211950ce09038ffa90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 21 Apr 2015 18:31:08 +0300 Subject: [PATCH 08/14] rpc, ui/qt/qwhisper, whisper, xeth: introduce complex topic filters --- rpc/api.go | 8 +- rpc/args.go | 69 +++++++++++++---- rpc/args_test.go | 2 +- ui/qt/qwhisper/whisper.go | 2 +- whisper/filter.go | 53 +++++++++++-- whisper/topic.go | 117 +++++++++++++++++++++++++++-- whisper/topic_test.go | 151 +++++++++++++++++++++++++++++++++++--- whisper/whisper.go | 22 +++--- whisper/whisper_test.go | 2 +- xeth/whisper.go | 4 +- xeth/xeth.go | 2 +- 11 files changed, 373 insertions(+), 59 deletions(-) diff --git a/rpc/api.go b/rpc/api.go index a73188f07a..4da2fb17a3 100644 --- a/rpc/api.go +++ b/rpc/api.go @@ -422,12 +422,7 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err case "shh_newIdentity": *reply = api.xeth().Whisper().NewIdentity() - // case "shh_removeIdentity": - // args := new(WhisperIdentityArgs) - // if err := json.Unmarshal(req.Params, &args); err != nil { - // return err - // } - // *reply = api.xeth().Whisper().RemoveIdentity(args.Identity) + case "shh_hasIdentity": args := new(WhisperIdentityArgs) if err := json.Unmarshal(req.Params, &args); err != nil { @@ -439,6 +434,7 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err return NewNotImplementedError(req.Method) case "shh_newFilter": + // Create a new filter to watch and match messages with args := new(WhisperFilterArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err diff --git a/rpc/args.go b/rpc/args.go index 7694a3d3f4..8df483f088 100644 --- a/rpc/args.go +++ b/rpc/args.go @@ -1010,25 +1010,27 @@ func (args *WhisperIdentityArgs) UnmarshalJSON(b []byte) (err error) { } type WhisperFilterArgs struct { - To string `json:"to"` + To string From string - Topics []string + Topics [][]string } +// UnmarshalJSON implements the json.Unmarshaler interface, invoked to convert a +// JSON message blob into a WhisperFilterArgs structure. func (args *WhisperFilterArgs) UnmarshalJSON(b []byte) (err error) { + // Unmarshal the JSON message and sanity check var obj []struct { - To interface{} - Topics []interface{} + To interface{} `json:"to"` + From interface{} `json:"from"` + Topics interface{} `json:"topics"` } - - if err = json.Unmarshal(b, &obj); err != nil { + if err := json.Unmarshal(b, &obj); err != nil { return NewDecodeParamError(err.Error()) } - if len(obj) < 1 { return NewInsufficientParamsError(len(obj), 1) } - + // Retrieve the simple data contents of the filter arguments if obj[0].To == nil { args.To = "" } else { @@ -1038,17 +1040,52 @@ func (args *WhisperFilterArgs) UnmarshalJSON(b []byte) (err error) { } args.To = argstr } - - t := make([]string, len(obj[0].Topics)) - for i, j := range obj[0].Topics { - argstr, ok := j.(string) + if obj[0].From == nil { + args.From = "" + } else { + argstr, ok := obj[0].From.(string) if !ok { - return NewInvalidTypeError("topics["+string(i)+"]", "is not a string") + return NewInvalidTypeError("from", "is not a string") } - t[i] = argstr + args.From = argstr + } + // Construct the nested topic array + if obj[0].Topics != nil { + // Make sure we have an actual topic array + list, ok := obj[0].Topics.([]interface{}) + if !ok { + return NewInvalidTypeError("topics", "is not an array") + } + // Iterate over each topic and handle nil, string or array + topics := make([][]string, len(list)) + for idx, field := range list { + switch value := field.(type) { + case nil: + topics[idx] = []string{""} + + case string: + topics[idx] = []string{value} + + case []interface{}: + topics[idx] = make([]string, len(value)) + for i, nested := range value { + switch value := nested.(type) { + case nil: + topics[idx][i] = "" + + case string: + topics[idx][i] = value + + default: + return NewInvalidTypeError(fmt.Sprintf("topic[%d][%d]", idx, i), "is not a string") + } + } + default: + return NewInvalidTypeError(fmt.Sprintf("topic[%d]", idx), "not a string or array") + } + } + args.Topics = topics } - args.Topics = t - return nil } diff --git a/rpc/args_test.go b/rpc/args_test.go index 2f011bfd92..f5949b7a21 100644 --- a/rpc/args_test.go +++ b/rpc/args_test.go @@ -1943,7 +1943,7 @@ func TestWhisperFilterArgs(t *testing.T) { input := `[{"topics": ["0x68656c6c6f20776f726c64"], "to": "0x34ag445g3455b34"}]` expected := new(WhisperFilterArgs) expected.To = "0x34ag445g3455b34" - expected.Topics = []string{"0x68656c6c6f20776f726c64"} + expected.Topics = [][]string{[]string{"0x68656c6c6f20776f726c64"}} args := new(WhisperFilterArgs) if err := json.Unmarshal([]byte(input), &args); err != nil { diff --git a/ui/qt/qwhisper/whisper.go b/ui/qt/qwhisper/whisper.go index 50b0626f56..b7409c57fc 100644 --- a/ui/qt/qwhisper/whisper.go +++ b/ui/qt/qwhisper/whisper.go @@ -106,7 +106,7 @@ func filterFromMap(opts map[string]interface{}) (f whisper.Filter) { if topicList, ok := opts["topics"].(*qml.List); ok { var topics []string topicList.Convert(&topics) - f.Topics = whisper.NewTopicsFromStrings(topics...) + f.Topics = whisper.NewTopicFilterFromStringsFlat(topics...) } return diff --git a/whisper/filter.go b/whisper/filter.go index 8fcc45afd2..8a398ab76c 100644 --- a/whisper/filter.go +++ b/whisper/filter.go @@ -2,12 +2,55 @@ package whisper -import "crypto/ecdsa" +import ( + "crypto/ecdsa" + + "github.com/ethereum/go-ethereum/event/filter" +) // Filter is used to subscribe to specific types of whisper messages. type Filter struct { - To *ecdsa.PublicKey // Recipient of the message - From *ecdsa.PublicKey // Sender of the message - Topics []Topic // Topics to watch messages on - Fn func(*Message) // Handler in case of a match + To *ecdsa.PublicKey // Recipient of the message + From *ecdsa.PublicKey // Sender of the message + Topics [][]Topic // Topics to filter messages with + Fn func(msg *Message) // Handler in case of a match +} + +// filterer is the internal, fully initialized filter ready to match inbound +// messages to a variety of criteria. +type filterer struct { + to string // Recipient of the message + from string // Sender of the message + matcher *topicMatcher // Topics to filter messages with + fn func(data interface{}) // Handler in case of a match +} + +// Compare checks if the specified filter matches the current one. +func (self filterer) Compare(f filter.Filter) bool { + filter := f.(filterer) + + // Check the message sender and recipient + if len(self.to) > 0 && self.to != filter.to { + return false + } + if len(self.from) > 0 && self.from != filter.from { + return false + } + // Check the topic filtering + topics := make([]Topic, len(filter.matcher.conditions)) + for i, group := range filter.matcher.conditions { + // Message should contain a single topic entry, extract + for topics[i], _ = range group { + break + } + } + if !self.matcher.Matches(topics) { + return false + } + return true +} + +// Trigger is called when a filter successfully matches an inbound message. +func (self filterer) Trigger(data interface{}) { + self.fn(data) } diff --git a/whisper/topic.go b/whisper/topic.go index a965c7cc2a..b2a264e299 100644 --- a/whisper/topic.go +++ b/whisper/topic.go @@ -27,6 +27,26 @@ func NewTopics(data ...[]byte) []Topic { return topics } +// NewTopicFilter creates a 2D topic array used by whisper.Filter from binary +// data elements. +func NewTopicFilter(data ...[][]byte) [][]Topic { + filter := make([][]Topic, len(data)) + for i, condition := range data { + filter[i] = NewTopics(condition...) + } + return filter +} + +// NewTopicFilterFlat creates a 2D topic array used by whisper.Filter from flat +// binary data elements. +func NewTopicFilterFlat(data ...[]byte) [][]Topic { + filter := make([][]Topic, len(data)) + for i, element := range data { + filter[i] = []Topic{NewTopic(element)} + } + return filter +} + // NewTopicFromString creates a topic using the binary data contents of the // specified string. func NewTopicFromString(data string) Topic { @@ -43,19 +63,100 @@ func NewTopicsFromStrings(data ...string) []Topic { return topics } +// NewTopicFilterFromStrings creates a 2D topic array used by whisper.Filter +// from textual data elements. +func NewTopicFilterFromStrings(data ...[]string) [][]Topic { + filter := make([][]Topic, len(data)) + for i, condition := range data { + filter[i] = NewTopicsFromStrings(condition...) + } + return filter +} + +// NewTopicFilterFromStringsFlat creates a 2D topic array used by whisper.Filter from flat +// binary data elements. +func NewTopicFilterFromStringsFlat(data ...string) [][]Topic { + filter := make([][]Topic, len(data)) + for i, element := range data { + filter[i] = []Topic{NewTopicFromString(element)} + } + return filter +} + // String converts a topic byte array to a string representation. func (self *Topic) String() string { return string(self[:]) } -// TopicSet represents a hash set to check if a topic exists or not. -type topicSet map[string]struct{} +// topicMatcher is a filter expression to verify if a list of topics contained +// in an arriving message matches some topic conditions. The topic matcher is +// built up of a list of conditions, each of which must be satisfied by the +// corresponding topic in the message. Each condition may require: a) an exact +// topic match; b) a match from a set of topics; or c) a wild-card matching all. +// +// If a message contains more topics than required by the matcher, those beyond +// the condition count are ignored and assumed to match. +// +// Consider the following sample topic matcher: +// sample := { +// {TopicA1, TopicA2, TopicA3}, +// {TopicB}, +// nil, +// {TopicD1, TopicD2} +// } +// In order for a message to pass this filter, it should enumerate at least 4 +// topics, the first any of [TopicA1, TopicA2, TopicA3], the second mandatory +// "TopicB", the third is ignored by the filter and the fourth either "TopicD1" +// or "TopicD2". If the message contains further topics, the filter will match +// them too. +type topicMatcher struct { + conditions []map[Topic]struct{} +} + +// newTopicMatcher create a topic matcher from a list of topic conditions. +func newTopicMatcher(topics ...[]Topic) *topicMatcher { + matcher := make([]map[Topic]struct{}, len(topics)) + for i, condition := range topics { + matcher[i] = make(map[Topic]struct{}) + for _, topic := range condition { + matcher[i][topic] = struct{}{} + } + } + return &topicMatcher{conditions: matcher} +} + +// newTopicMatcherFromBinary create a topic matcher from a list of binary conditions. +func newTopicMatcherFromBinary(data ...[][]byte) *topicMatcher { + topics := make([][]Topic, len(data)) + for i, condition := range data { + topics[i] = NewTopics(condition...) + } + return newTopicMatcher(topics...) +} + +// newTopicMatcherFromStrings creates a topic matcher from a list of textual +// conditions. +func newTopicMatcherFromStrings(data ...[]string) *topicMatcher { + topics := make([][]Topic, len(data)) + for i, condition := range data { + topics[i] = NewTopicsFromStrings(condition...) + } + return newTopicMatcher(topics...) +} -// NewTopicSet creates a topic hash set from a slice of topics. -func newTopicSet(topics []Topic) topicSet { - set := make(map[string]struct{}) - for _, topic := range topics { - set[topic.String()] = struct{}{} +// Matches checks if a list of topics matches this particular condition set. +func (self *topicMatcher) Matches(topics []Topic) bool { + // Mismatch if there aren't enough topics + if len(self.conditions) > len(topics) { + return false + } + // Check each topic condition for existence (skip wild-cards) + for i := 0; i < len(topics) && i < len(self.conditions); i++ { + if len(self.conditions[i]) > 0 { + if _, ok := self.conditions[i][topics[i]]; !ok { + return false + } + } } - return topicSet(set) + return true } diff --git a/whisper/topic_test.go b/whisper/topic_test.go index 4015079dcf..22ee060966 100644 --- a/whisper/topic_test.go +++ b/whisper/topic_test.go @@ -52,16 +52,149 @@ func TestTopicCreation(t *testing.T) { } } -func TestTopicSetCreation(t *testing.T) { - topics := make([]Topic, len(topicCreationTests)) - for i, tt := range topicCreationTests { - topics[i] = NewTopic(tt.data) +var topicMatcherCreationTest = struct { + binary [][][]byte + textual [][]string + matcher []map[[4]byte]struct{} +}{ + binary: [][][]byte{ + [][]byte{}, + [][]byte{ + []byte("Topic A"), + }, + [][]byte{ + []byte("Topic B1"), + []byte("Topic B2"), + []byte("Topic B3"), + }, + }, + textual: [][]string{ + []string{}, + []string{"Topic A"}, + []string{"Topic B1", "Topic B2", "Topic B3"}, + }, + matcher: []map[[4]byte]struct{}{ + map[[4]byte]struct{}{}, + map[[4]byte]struct{}{ + [4]byte{0x25, 0xfc, 0x95, 0x66}: struct{}{}, + }, + map[[4]byte]struct{}{ + [4]byte{0x93, 0x6d, 0xec, 0x09}: struct{}{}, + [4]byte{0x25, 0x23, 0x34, 0xd3}: struct{}{}, + [4]byte{0x6b, 0xc2, 0x73, 0xd1}: struct{}{}, + }, + }, +} + +func TestTopicMatcherCreation(t *testing.T) { + test := topicMatcherCreationTest + + matcher := newTopicMatcherFromBinary(test.binary...) + for i, cond := range matcher.conditions { + for topic, _ := range cond { + if _, ok := test.matcher[i][topic]; !ok { + t.Errorf("condition %d; extra topic found: 0x%x", i, topic[:]) + } + } } - set := newTopicSet(topics) - for i, tt := range topicCreationTests { - topic := NewTopic(tt.data) - if _, ok := set[topic.String()]; !ok { - t.Errorf("topic %d: not found in set", i) + for i, cond := range test.matcher { + for topic, _ := range cond { + if _, ok := matcher.conditions[i][topic]; !ok { + t.Errorf("condition %d; topic not found: 0x%x", i, topic[:]) + } + } + } + + matcher = newTopicMatcherFromStrings(test.textual...) + for i, cond := range matcher.conditions { + for topic, _ := range cond { + if _, ok := test.matcher[i][topic]; !ok { + t.Errorf("condition %d; extra topic found: 0x%x", i, topic[:]) + } + } + } + for i, cond := range test.matcher { + for topic, _ := range cond { + if _, ok := matcher.conditions[i][topic]; !ok { + t.Errorf("condition %d; topic not found: 0x%x", i, topic[:]) + } + } + } +} + +var topicMatcherTests = []struct { + filter [][]string + topics []string + match bool +}{ + // Empty topic matcher should match everything + { + filter: [][]string{}, + topics: []string{}, + match: true, + }, + { + filter: [][]string{}, + topics: []string{"a", "b", "c"}, + match: true, + }, + // Fixed topic matcher should match strictly, but only prefix + { + filter: [][]string{[]string{"a"}, []string{"b"}}, + topics: []string{"a"}, + match: false, + }, + { + filter: [][]string{[]string{"a"}, []string{"b"}}, + topics: []string{"a", "b"}, + match: true, + }, + { + filter: [][]string{[]string{"a"}, []string{"b"}}, + topics: []string{"a", "b", "c"}, + match: true, + }, + // Multi-matcher should match any from a sub-group + { + filter: [][]string{[]string{"a1", "a2"}}, + topics: []string{"a"}, + match: false, + }, + { + filter: [][]string{[]string{"a1", "a2"}}, + topics: []string{"a1"}, + match: true, + }, + { + filter: [][]string{[]string{"a1", "a2"}}, + topics: []string{"a2"}, + match: true, + }, + // Wild-card condition should match anything + { + filter: [][]string{[]string{}, []string{"b"}}, + topics: []string{"a"}, + match: false, + }, + { + filter: [][]string{[]string{}, []string{"b"}}, + topics: []string{"a", "b"}, + match: true, + }, + { + filter: [][]string{[]string{}, []string{"b"}}, + topics: []string{"b", "b"}, + match: true, + }, +} + +func TestTopicMatcher(t *testing.T) { + for i, tt := range topicMatcherTests { + topics := NewTopicsFromStrings(tt.topics...) + + matcher := newTopicMatcherFromStrings(tt.filter...) + if match := matcher.Matches(topics); match != tt.match { + t.Errorf("test %d: match mismatch: have %v, want %v", i, match, tt.match) } } } diff --git a/whisper/whisper.go b/whisper/whisper.go index 61999f07ad..5d6ee6e3b4 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -118,11 +118,11 @@ func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey { // Watch installs a new message handler to run in case a matching packet arrives // from the whisper network. func (self *Whisper) Watch(options Filter) int { - filter := filter.Generic{ - Str1: string(crypto.FromECDSAPub(options.To)), - Str2: string(crypto.FromECDSAPub(options.From)), - Data: newTopicSet(options.Topics), - Fn: func(data interface{}) { + filter := filterer{ + to: string(crypto.FromECDSAPub(options.To)), + from: string(crypto.FromECDSAPub(options.From)), + matcher: newTopicMatcher(options.Topics...), + fn: func(data interface{}) { options.Fn(data.(*Message)) }, } @@ -273,10 +273,14 @@ func (self *Whisper) open(envelope *Envelope) *Message { // createFilter creates a message filter to check against installed handlers. func createFilter(message *Message, topics []Topic) filter.Filter { - return filter.Generic{ - Str1: string(crypto.FromECDSAPub(message.To)), - Str2: string(crypto.FromECDSAPub(message.Recover())), - Data: newTopicSet(topics), + matcher := make([][]Topic, len(topics)) + for i, topic := range topics { + matcher[i] = []Topic{topic} + } + return filterer{ + to: string(crypto.FromECDSAPub(message.To)), + from: string(crypto.FromECDSAPub(message.Recover())), + matcher: newTopicMatcher(matcher...), } } diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go index def8e68d8b..8fce0e0363 100644 --- a/whisper/whisper_test.go +++ b/whisper/whisper_test.go @@ -129,7 +129,7 @@ func testBroadcast(anonymous bool, t *testing.T) { dones[i] = done targets[i].Watch(Filter{ - Topics: NewTopicsFromStrings("broadcast topic"), + Topics: NewTopicFilterFromStrings([]string{"broadcast topic"}), Fn: func(msg *Message) { close(done) }, diff --git a/xeth/whisper.go b/xeth/whisper.go index 386897f395..25c4af3b18 100644 --- a/xeth/whisper.go +++ b/xeth/whisper.go @@ -67,11 +67,11 @@ func (self *Whisper) Post(payload string, to, from string, topics []string, prio // Watch installs a new message handler to run in case a matching packet arrives // from the whisper network. -func (self *Whisper) Watch(to, from string, topics []string, fn func(WhisperMessage)) int { +func (self *Whisper) Watch(to, from string, topics [][]string, fn func(WhisperMessage)) int { filter := whisper.Filter{ To: crypto.ToECDSAPub(common.FromHex(to)), From: crypto.ToECDSAPub(common.FromHex(from)), - Topics: whisper.NewTopicsFromStrings(topics...), + Topics: whisper.NewTopicFilterFromStrings(topics...), } filter.Fn = func(message *whisper.Message) { fn(NewWhisperMessage(message)) diff --git a/xeth/xeth.go b/xeth/xeth.go index 8cc32c9586..ea6ae9950a 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -452,7 +452,7 @@ func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []strin return filter.Find() } -func (p *XEth) NewWhisperFilter(to, from string, topics []string) int { +func (p *XEth) NewWhisperFilter(to, from string, topics [][]string) int { var id int callback := func(msg WhisperMessage) { p.messagesMut.Lock() From db615a85ec000dab7f73a6d4b1b46428ba4acdee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 22 Apr 2015 12:50:48 +0300 Subject: [PATCH 09/14] ui/qt/qwhisper, whisper, xeth: polish topic filter, fix wildcards --- ui/qt/qwhisper/whisper.go | 2 +- whisper/filter.go | 60 +++++++++++++++ whisper/filter_test.go | 149 ++++++++++++++++++++++++++++++++++++++ whisper/topic.go | 42 +---------- whisper/topic_test.go | 3 +- whisper/whisper_test.go | 2 +- xeth/whisper.go | 2 +- 7 files changed, 215 insertions(+), 45 deletions(-) create mode 100644 whisper/filter_test.go diff --git a/ui/qt/qwhisper/whisper.go b/ui/qt/qwhisper/whisper.go index b7409c57fc..4ab6d2e5a7 100644 --- a/ui/qt/qwhisper/whisper.go +++ b/ui/qt/qwhisper/whisper.go @@ -106,7 +106,7 @@ func filterFromMap(opts map[string]interface{}) (f whisper.Filter) { if topicList, ok := opts["topics"].(*qml.List); ok { var topics []string topicList.Convert(&topics) - f.Topics = whisper.NewTopicFilterFromStringsFlat(topics...) + f.Topics = whisper.NewFilterTopicsFromStringsFlat(topics...) } return diff --git a/whisper/filter.go b/whisper/filter.go index 8a398ab76c..c946d93806 100644 --- a/whisper/filter.go +++ b/whisper/filter.go @@ -16,6 +16,66 @@ type Filter struct { Fn func(msg *Message) // Handler in case of a match } +// NewFilterTopics creates a 2D topic array used by whisper.Filter from binary +// data elements. +func NewFilterTopics(data ...[][]byte) [][]Topic { + filter := make([][]Topic, len(data)) + for i, condition := range data { + // Handle the special case of condition == [[]byte{}] + if len(condition) == 1 && len(condition[0]) == 0 { + filter[i] = []Topic{} + continue + } + // Otherwise flatten normally + filter[i] = NewTopics(condition...) + } + return filter +} + +// NewFilterTopicsFlat creates a 2D topic array used by whisper.Filter from flat +// binary data elements. +func NewFilterTopicsFlat(data ...[]byte) [][]Topic { + filter := make([][]Topic, len(data)) + for i, element := range data { + // Only add non-wildcard topics + filter[i] = make([]Topic, 0, 1) + if len(element) > 0 { + filter[i] = append(filter[i], NewTopic(element)) + } + } + return filter +} + +// NewFilterTopicsFromStrings creates a 2D topic array used by whisper.Filter +// from textual data elements. +func NewFilterTopicsFromStrings(data ...[]string) [][]Topic { + filter := make([][]Topic, len(data)) + for i, condition := range data { + // Handle the special case of condition == [""] + if len(condition) == 1 && condition[0] == "" { + filter[i] = []Topic{} + continue + } + // Otherwise flatten normally + filter[i] = NewTopicsFromStrings(condition...) + } + return filter +} + +// NewFilterTopicsFromStringsFlat creates a 2D topic array used by whisper.Filter from flat +// binary data elements. +func NewFilterTopicsFromStringsFlat(data ...string) [][]Topic { + filter := make([][]Topic, len(data)) + for i, element := range data { + // Only add non-wildcard topics + filter[i] = make([]Topic, 0, 1) + if element != "" { + filter[i] = append(filter[i], NewTopicFromString(element)) + } + } + return filter +} + // filterer is the internal, fully initialized filter ready to match inbound // messages to a variety of criteria. type filterer struct { diff --git a/whisper/filter_test.go b/whisper/filter_test.go new file mode 100644 index 0000000000..ac0ebaba72 --- /dev/null +++ b/whisper/filter_test.go @@ -0,0 +1,149 @@ +package whisper + +import ( + "bytes" + + "testing" +) + +var filterTopicsCreationTests = []struct { + topics [][]string + filter [][][4]byte +}{ + { // Simple topic filter + topics: [][]string{ + {"abc", "def", "ghi"}, + {"def"}, + {"ghi", "abc"}, + }, + filter: [][][4]byte{ + {{0x4e, 0x03, 0x65, 0x7a}, {0x34, 0x60, 0x7c, 0x9b}, {0x21, 0x41, 0x7d, 0xf9}}, + {{0x34, 0x60, 0x7c, 0x9b}}, + {{0x21, 0x41, 0x7d, 0xf9}, {0x4e, 0x03, 0x65, 0x7a}}, + }, + }, + { // Wild-carded topic filter + topics: [][]string{ + {"abc", "def", "ghi"}, + {}, + {""}, + {"def"}, + }, + filter: [][][4]byte{ + {{0x4e, 0x03, 0x65, 0x7a}, {0x34, 0x60, 0x7c, 0x9b}, {0x21, 0x41, 0x7d, 0xf9}}, + {}, + {}, + {{0x34, 0x60, 0x7c, 0x9b}}, + }, + }, +} + +var filterTopicsCreationFlatTests = []struct { + topics []string + filter [][][4]byte +}{ + { // Simple topic list + topics: []string{"abc", "def", "ghi"}, + filter: [][][4]byte{ + {{0x4e, 0x03, 0x65, 0x7a}}, + {{0x34, 0x60, 0x7c, 0x9b}}, + {{0x21, 0x41, 0x7d, 0xf9}}, + }, + }, + { // Wild-carded topic list + topics: []string{"abc", "", "ghi"}, + filter: [][][4]byte{ + {{0x4e, 0x03, 0x65, 0x7a}}, + {}, + {{0x21, 0x41, 0x7d, 0xf9}}, + }, + }, +} + +func TestFilterTopicsCreation(t *testing.T) { + // Check full filter creation + for i, tt := range filterTopicsCreationTests { + // Check the textual creation + filter := NewFilterTopicsFromStrings(tt.topics...) + if len(filter) != len(tt.topics) { + t.Errorf("test %d: condition count mismatch: have %v, want %v", i, len(filter), len(tt.topics)) + continue + } + for j, condition := range filter { + if len(condition) != len(tt.filter[j]) { + t.Errorf("test %d, condition %d: size mismatch: have %v, want %v", i, j, len(condition), len(tt.filter[j])) + continue + } + for k := 0; k < len(condition); k++ { + if bytes.Compare(condition[k][:], tt.filter[j][k][:]) != 0 { + t.Errorf("test %d, condition %d, segment %d: filter mismatch: have 0x%x, want 0x%x", i, j, k, condition[k], tt.filter[j][k]) + } + } + } + // Check the binary creation + binary := make([][][]byte, len(tt.topics)) + for j, condition := range tt.topics { + binary[j] = make([][]byte, len(condition)) + for k, segment := range condition { + binary[j][k] = []byte(segment) + } + } + filter = NewFilterTopics(binary...) + if len(filter) != len(tt.topics) { + t.Errorf("test %d: condition count mismatch: have %v, want %v", i, len(filter), len(tt.topics)) + continue + } + for j, condition := range filter { + if len(condition) != len(tt.filter[j]) { + t.Errorf("test %d, condition %d: size mismatch: have %v, want %v", i, j, len(condition), len(tt.filter[j])) + continue + } + for k := 0; k < len(condition); k++ { + if bytes.Compare(condition[k][:], tt.filter[j][k][:]) != 0 { + t.Errorf("test %d, condition %d, segment %d: filter mismatch: have 0x%x, want 0x%x", i, j, k, condition[k], tt.filter[j][k]) + } + } + } + } + // Check flat filter creation + for i, tt := range filterTopicsCreationFlatTests { + // Check the textual creation + filter := NewFilterTopicsFromStringsFlat(tt.topics...) + if len(filter) != len(tt.topics) { + t.Errorf("test %d: condition count mismatch: have %v, want %v", i, len(filter), len(tt.topics)) + continue + } + for j, condition := range filter { + if len(condition) != len(tt.filter[j]) { + t.Errorf("test %d, condition %d: size mismatch: have %v, want %v", i, j, len(condition), len(tt.filter[j])) + continue + } + for k := 0; k < len(condition); k++ { + if bytes.Compare(condition[k][:], tt.filter[j][k][:]) != 0 { + t.Errorf("test %d, condition %d, segment %d: filter mismatch: have 0x%x, want 0x%x", i, j, k, condition[k], tt.filter[j][k]) + } + } + } + // Check the binary creation + binary := make([][]byte, len(tt.topics)) + for j, topic := range tt.topics { + binary[j] = []byte(topic) + } + filter = NewFilterTopicsFlat(binary...) + if len(filter) != len(tt.topics) { + t.Errorf("test %d: condition count mismatch: have %v, want %v", i, len(filter), len(tt.topics)) + continue + } + for j, condition := range filter { + if len(condition) != len(tt.filter[j]) { + t.Errorf("test %d, condition %d: size mismatch: have %v, want %v", i, j, len(condition), len(tt.filter[j])) + continue + } + for k := 0; k < len(condition); k++ { + if bytes.Compare(condition[k][:], tt.filter[j][k][:]) != 0 { + t.Errorf("test %d, condition %d, segment %d: filter mismatch: have 0x%x, want 0x%x", i, j, k, condition[k], tt.filter[j][k]) + } + } + } + } +} diff --git a/whisper/topic.go b/whisper/topic.go index b2a264e299..c47c94ae1d 100644 --- a/whisper/topic.go +++ b/whisper/topic.go @@ -11,6 +11,8 @@ import "github.com/ethereum/go-ethereum/crypto" type Topic [4]byte // NewTopic creates a topic from the 4 byte prefix of the SHA3 hash of the data. +// +// Note, empty topics are considered the wildcard, and cannot be used in messages. func NewTopic(data []byte) Topic { prefix := [4]byte{} copy(prefix[:], crypto.Sha3(data)[:4]) @@ -27,26 +29,6 @@ func NewTopics(data ...[]byte) []Topic { return topics } -// NewTopicFilter creates a 2D topic array used by whisper.Filter from binary -// data elements. -func NewTopicFilter(data ...[][]byte) [][]Topic { - filter := make([][]Topic, len(data)) - for i, condition := range data { - filter[i] = NewTopics(condition...) - } - return filter -} - -// NewTopicFilterFlat creates a 2D topic array used by whisper.Filter from flat -// binary data elements. -func NewTopicFilterFlat(data ...[]byte) [][]Topic { - filter := make([][]Topic, len(data)) - for i, element := range data { - filter[i] = []Topic{NewTopic(element)} - } - return filter -} - // NewTopicFromString creates a topic using the binary data contents of the // specified string. func NewTopicFromString(data string) Topic { @@ -63,26 +45,6 @@ func NewTopicsFromStrings(data ...string) []Topic { return topics } -// NewTopicFilterFromStrings creates a 2D topic array used by whisper.Filter -// from textual data elements. -func NewTopicFilterFromStrings(data ...[]string) [][]Topic { - filter := make([][]Topic, len(data)) - for i, condition := range data { - filter[i] = NewTopicsFromStrings(condition...) - } - return filter -} - -// NewTopicFilterFromStringsFlat creates a 2D topic array used by whisper.Filter from flat -// binary data elements. -func NewTopicFilterFromStringsFlat(data ...string) [][]Topic { - filter := make([][]Topic, len(data)) - for i, element := range data { - filter[i] = []Topic{NewTopicFromString(element)} - } - return filter -} - // String converts a topic byte array to a string representation. func (self *Topic) String() string { return string(self[:]) diff --git a/whisper/topic_test.go b/whisper/topic_test.go index 22ee060966..976f3e88d8 100644 --- a/whisper/topic_test.go +++ b/whisper/topic_test.go @@ -9,9 +9,8 @@ var topicCreationTests = []struct { data []byte hash [4]byte }{ - {hash: [4]byte{0xc5, 0xd2, 0x46, 0x01}, data: nil}, - {hash: [4]byte{0xc5, 0xd2, 0x46, 0x01}, data: []byte{}}, {hash: [4]byte{0x8f, 0x9a, 0x2b, 0x7d}, data: []byte("test name")}, + {hash: [4]byte{0xf2, 0x6e, 0x77, 0x79}, data: []byte("some other test")}, } func TestTopicCreation(t *testing.T) { diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go index 8fce0e0363..7c5067f511 100644 --- a/whisper/whisper_test.go +++ b/whisper/whisper_test.go @@ -129,7 +129,7 @@ func testBroadcast(anonymous bool, t *testing.T) { dones[i] = done targets[i].Watch(Filter{ - Topics: NewTopicFilterFromStrings([]string{"broadcast topic"}), + Topics: NewFilterTopicsFromStringsFlat("broadcast topic"), Fn: func(msg *Message) { close(done) }, diff --git a/xeth/whisper.go b/xeth/whisper.go index 25c4af3b18..36c6ca63ff 100644 --- a/xeth/whisper.go +++ b/xeth/whisper.go @@ -71,7 +71,7 @@ func (self *Whisper) Watch(to, from string, topics [][]string, fn func(WhisperMe filter := whisper.Filter{ To: crypto.ToECDSAPub(common.FromHex(to)), From: crypto.ToECDSAPub(common.FromHex(from)), - Topics: whisper.NewTopicFilterFromStrings(topics...), + Topics: whisper.NewFilterTopicsFromStrings(topics...), } filter.Fn = func(message *whisper.Message) { fn(NewWhisperMessage(message)) From e252dae499a8c682c92527262cca01af337438aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 22 Apr 2015 17:24:35 +0300 Subject: [PATCH 10/14] rpc: use nil topic wildcards instead of "" --- rpc/args.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/args.go b/rpc/args.go index 8df483f088..4bd48e6d61 100644 --- a/rpc/args.go +++ b/rpc/args.go @@ -1061,7 +1061,7 @@ func (args *WhisperFilterArgs) UnmarshalJSON(b []byte) (err error) { for idx, field := range list { switch value := field.(type) { case nil: - topics[idx] = []string{""} + topics[idx] = []string{} case string: topics[idx] = []string{value} From 70ded4cbf06d19993d829d843a27002cf181c619 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 22 Apr 2015 17:25:54 +0300 Subject: [PATCH 11/14] xeth: fix un-decoded whisper RPC topic string bug --- xeth/whisper.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/xeth/whisper.go b/xeth/whisper.go index 36c6ca63ff..edb62c7485 100644 --- a/xeth/whisper.go +++ b/xeth/whisper.go @@ -39,12 +39,17 @@ func (self *Whisper) HasIdentity(key string) bool { // Post injects a message into the whisper network for distribution. func (self *Whisper) Post(payload string, to, from string, topics []string, priority, ttl uint32) error { + // Decode the topic strings + topicsDecoded := make([][]byte, len(topics)) + for i, topic := range topics { + topicsDecoded[i] = common.FromHex(topic) + } // Construct the whisper message and transmission options message := whisper.NewMessage(common.FromHex(payload)) options := whisper.Options{ To: crypto.ToECDSAPub(common.FromHex(to)), TTL: time.Duration(ttl) * time.Second, - Topics: whisper.NewTopicsFromStrings(topics...), + Topics: whisper.NewTopics(topicsDecoded...), } if len(from) != 0 { if key := self.Whisper.GetIdentity(crypto.ToECDSAPub(common.FromHex(from))); key != nil { @@ -68,10 +73,19 @@ func (self *Whisper) Post(payload string, to, from string, topics []string, prio // Watch installs a new message handler to run in case a matching packet arrives // from the whisper network. func (self *Whisper) Watch(to, from string, topics [][]string, fn func(WhisperMessage)) int { + // Decode the topic strings + topicsDecoded := make([][][]byte, len(topics)) + for i, condition := range topics { + topicsDecoded[i] = make([][]byte, len(condition)) + for j, topic := range condition { + topicsDecoded[i][j] = common.FromHex(topic) + } + } + // Assemble and inject the filter into the whisper client filter := whisper.Filter{ To: crypto.ToECDSAPub(common.FromHex(to)), From: crypto.ToECDSAPub(common.FromHex(from)), - Topics: whisper.NewFilterTopicsFromStrings(topics...), + Topics: whisper.NewFilterTopics(topicsDecoded...), } filter.Fn = func(message *whisper.Message) { fn(NewWhisperMessage(message)) From 406e74e2afdce374fd227873f4eb96a0ba99bd23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 22 Apr 2015 17:40:39 +0300 Subject: [PATCH 12/14] whisper: fix a small data race duirng peer connection --- whisper/peer.go | 8 ++------ whisper/whisper.go | 18 ++++++++++-------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/whisper/peer.go b/whisper/peer.go index 28abf42605..77e09becea 100644 --- a/whisper/peer.go +++ b/whisper/peer.go @@ -23,18 +23,14 @@ type peer struct { // newPeer creates and initializes a new whisper peer connection, returning either // the newly constructed link or a failure reason. -func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) (*peer, error) { - p := &peer{ +func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *peer { + return &peer{ host: host, peer: remote, ws: rw, known: set.New(), quit: make(chan struct{}), } - if err := p.handshake(); err != nil { - return nil, err - } - return p, nil } // start initiates the peer updater, periodically broadcasting the whisper packets diff --git a/whisper/whisper.go b/whisper/whisper.go index 5d6ee6e3b4..a48e1e380d 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -168,15 +168,9 @@ func (self *Whisper) Messages(id int) []*Message { // handlePeer is called by the underlying P2P layer when the whisper sub-protocol // connection is negotiated. func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { - // Create, initialize and start the whisper peer - whisperPeer, err := newPeer(self, peer, rw) - if err != nil { - return err - } - whisperPeer.start() - defer whisperPeer.stop() + // Create the new peer and start tracking it + whisperPeer := newPeer(self, peer, rw) - // Start tracking the active peer self.peerMu.Lock() self.peers[whisperPeer] = struct{}{} self.peerMu.Unlock() @@ -186,6 +180,14 @@ func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { delete(self.peers, whisperPeer) self.peerMu.Unlock() }() + + // Run the peer handshake and state updates + if err := whisperPeer.handshake(); err != nil { + return err + } + whisperPeer.start() + defer whisperPeer.stop() + // Read and process inbound messages directly to merge into client-global state for { // Fetch the next packet and decode the contained envelopes From 2b9fd6b40a759078d1dcc1c7edcfafd4ccf38af3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 22 Apr 2015 18:17:07 +0300 Subject: [PATCH 13/14] whisper: add full filter test suite --- whisper/filter_test.go | 50 ++++++++++++++++++++++++++++++++++++++++++ whisper/peer.go | 3 +-- 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/whisper/filter_test.go b/whisper/filter_test.go index ac0ebaba72..ca28fd83c0 100644 --- a/whisper/filter_test.go +++ b/whisper/filter_test.go @@ -147,3 +147,53 @@ func TestFilterTopicsCreation(t *testing.T) { } } } + +var filterCompareTests = []struct { + matcher filterer + message filterer + match bool +}{ + { // Wild-card filter matching anything + matcher: filterer{to: "", from: "", matcher: newTopicMatcher()}, + message: filterer{to: "to", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + match: true, + }, + { // Filter matching the to field + matcher: filterer{to: "to", from: "", matcher: newTopicMatcher()}, + message: filterer{to: "to", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + match: true, + }, + { // Filter rejecting the to field + matcher: filterer{to: "to", from: "", matcher: newTopicMatcher()}, + message: filterer{to: "", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + match: false, + }, + { // Filter matching the from field + matcher: filterer{to: "", from: "from", matcher: newTopicMatcher()}, + message: filterer{to: "to", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + match: true, + }, + { // Filter rejecting the from field + matcher: filterer{to: "", from: "from", matcher: newTopicMatcher()}, + message: filterer{to: "to", from: "", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + match: false, + }, + { // Filter matching the topic field + matcher: filterer{to: "", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + message: filterer{to: "to", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + match: true, + }, + { // Filter rejecting the topic field + matcher: filterer{to: "", from: "", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)}, + message: filterer{to: "to", from: "from", matcher: newTopicMatcher()}, + match: false, + }, +} + +func TestFilterCompare(t *testing.T) { + for i, tt := range filterCompareTests { + if match := tt.matcher.Compare(tt.message); match != tt.match { + t.Errorf("test %d: match mismatch: have %v, want %v", i, match, tt.match) + } + } +} diff --git a/whisper/peer.go b/whisper/peer.go index 77e09becea..9fdc284346 100644 --- a/whisper/peer.go +++ b/whisper/peer.go @@ -21,8 +21,7 @@ type peer struct { quit chan struct{} } -// newPeer creates and initializes a new whisper peer connection, returning either -// the newly constructed link or a failure reason. +// newPeer creates a new whisper peer object, but does not run the handshake itself. func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *peer { return &peer{ host: host, From 978ffd3097242a5faeb7b23b9c72590170004dc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 22 Apr 2015 18:35:50 +0300 Subject: [PATCH 14/14] rpc, xeth: finish cleaning up xeth --- rpc/api.go | 13 ++++++++----- xeth/xeth.go | 40 ++++++++++++++++++++++++++++------------ 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/rpc/api.go b/rpc/api.go index 4da2fb17a3..4a9eb59633 100644 --- a/rpc/api.go +++ b/rpc/api.go @@ -406,10 +406,13 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err res, _ := api.xeth().DbGet([]byte(args.Database + args.Key)) *reply = newHexData(res) + case "shh_version": + // Retrieves the currently running whisper protocol version *reply = api.xeth().WhisperVersion() case "shh_post": + // Injects a new message into the whisper network args := new(WhisperMessageArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err @@ -421,18 +424,17 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err *reply = true case "shh_newIdentity": + // Creates a new whisper identity to use for sending/receiving messages *reply = api.xeth().Whisper().NewIdentity() case "shh_hasIdentity": + // Checks if an identity if owned or not args := new(WhisperIdentityArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err } *reply = api.xeth().Whisper().HasIdentity(args.Identity) - case "shh_newGroup", "shh_addToGroup": - return NewNotImplementedError(req.Method) - case "shh_newFilter": // Create a new filter to watch and match messages with args := new(WhisperFilterArgs) @@ -443,6 +445,7 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err *reply = newHexNum(big.NewInt(int64(id)).Bytes()) case "shh_uninstallFilter": + // Remove an existing filter watching messages args := new(FilterIdArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err @@ -455,7 +458,7 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err if err := json.Unmarshal(req.Params, &args); err != nil { return err } - *reply = api.xeth().MessagesChanged(args.Id) + *reply = api.xeth().WhisperMessagesChanged(args.Id) case "shh_getMessages": // Retrieve all the cached messages matching a specific, existing filter @@ -463,7 +466,7 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err if err := json.Unmarshal(req.Params, &args); err != nil { return err } - *reply = api.xeth().Messages(args.Id) + *reply = api.xeth().WhisperMessages(args.Id) // case "eth_register": // // Placeholder for actual type diff --git a/xeth/xeth.go b/xeth/xeth.go index ea6ae9950a..e4040d9d80 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -452,44 +452,60 @@ func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []strin return filter.Find() } +// NewWhisperFilter creates and registers a new message filter to watch for +// inbound whisper messages. All parameters at this point are assumed to be +// HEX encoded. func (p *XEth) NewWhisperFilter(to, from string, topics [][]string) int { + // Pre-define the id to be filled later var id int - callback := func(msg WhisperMessage) { - p.messagesMut.Lock() - defer p.messagesMut.Unlock() + // Callback to delegate core whisper messages to this xeth filter + callback := func(msg WhisperMessage) { + p.messagesMut.RLock() // Only read lock to the filter pool + defer p.messagesMut.RUnlock() p.messages[id].insert(msg) } + // Initialize the core whisper filter and wrap into xeth id = p.Whisper().Watch(to, from, topics, callback) + + p.messagesMut.Lock() p.messages[id] = newWhisperFilter(id, p.Whisper()) + p.messagesMut.Unlock() + return id } +// UninstallWhisperFilter disables and removes an existing filter. func (p *XEth) UninstallWhisperFilter(id int) bool { + p.messagesMut.Lock() + defer p.messagesMut.Unlock() + if _, ok := p.messages[id]; ok { delete(p.messages, id) return true } - return false } -func (self *XEth) MessagesChanged(id int) []WhisperMessage { - self.messagesMut.Lock() - defer self.messagesMut.Unlock() +// WhisperMessages retrieves all the known messages that match a specific filter. +func (self *XEth) WhisperMessages(id int) []WhisperMessage { + self.messagesMut.RLock() + defer self.messagesMut.RUnlock() if self.messages[id] != nil { - return self.messages[id].retrieve() + return self.messages[id].messages() } return nil } -func (self *XEth) Messages(id int) []WhisperMessage { - self.messagesMut.Lock() - defer self.messagesMut.Unlock() +// WhisperMessagesChanged retrieves all the new messages matched by a filter +// since the last retrieval +func (self *XEth) WhisperMessagesChanged(id int) []WhisperMessage { + self.messagesMut.RLock() + defer self.messagesMut.RUnlock() if self.messages[id] != nil { - return self.messages[id].messages() + return self.messages[id].retrieve() } return nil }