From 978ffd3097242a5faeb7b23b9c72590170004dc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 22 Apr 2015 18:35:50 +0300 Subject: [PATCH] rpc, xeth: finish cleaning up xeth --- rpc/api.go | 13 ++++++++----- xeth/xeth.go | 40 ++++++++++++++++++++++++++++------------ 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/rpc/api.go b/rpc/api.go index 4da2fb17a..4a9eb5963 100644 --- a/rpc/api.go +++ b/rpc/api.go @@ -406,10 +406,13 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err res, _ := api.xeth().DbGet([]byte(args.Database + args.Key)) *reply = newHexData(res) + case "shh_version": + // Retrieves the currently running whisper protocol version *reply = api.xeth().WhisperVersion() case "shh_post": + // Injects a new message into the whisper network args := new(WhisperMessageArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err @@ -421,18 +424,17 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err *reply = true case "shh_newIdentity": + // Creates a new whisper identity to use for sending/receiving messages *reply = api.xeth().Whisper().NewIdentity() case "shh_hasIdentity": + // Checks if an identity if owned or not args := new(WhisperIdentityArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err } *reply = api.xeth().Whisper().HasIdentity(args.Identity) - case "shh_newGroup", "shh_addToGroup": - return NewNotImplementedError(req.Method) - case "shh_newFilter": // Create a new filter to watch and match messages with args := new(WhisperFilterArgs) @@ -443,6 +445,7 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err *reply = newHexNum(big.NewInt(int64(id)).Bytes()) case "shh_uninstallFilter": + // Remove an existing filter watching messages args := new(FilterIdArgs) if err := json.Unmarshal(req.Params, &args); err != nil { return err @@ -455,7 +458,7 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err if err := json.Unmarshal(req.Params, &args); err != nil { return err } - *reply = api.xeth().MessagesChanged(args.Id) + *reply = api.xeth().WhisperMessagesChanged(args.Id) case "shh_getMessages": // Retrieve all the cached messages matching a specific, existing filter @@ -463,7 +466,7 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err if err := json.Unmarshal(req.Params, &args); err != nil { return err } - *reply = api.xeth().Messages(args.Id) + *reply = api.xeth().WhisperMessages(args.Id) // case "eth_register": // // Placeholder for actual type diff --git a/xeth/xeth.go b/xeth/xeth.go index ea6ae9950..e4040d9d8 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -452,44 +452,60 @@ func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []strin return filter.Find() } +// NewWhisperFilter creates and registers a new message filter to watch for +// inbound whisper messages. All parameters at this point are assumed to be +// HEX encoded. func (p *XEth) NewWhisperFilter(to, from string, topics [][]string) int { + // Pre-define the id to be filled later var id int - callback := func(msg WhisperMessage) { - p.messagesMut.Lock() - defer p.messagesMut.Unlock() + // Callback to delegate core whisper messages to this xeth filter + callback := func(msg WhisperMessage) { + p.messagesMut.RLock() // Only read lock to the filter pool + defer p.messagesMut.RUnlock() p.messages[id].insert(msg) } + // Initialize the core whisper filter and wrap into xeth id = p.Whisper().Watch(to, from, topics, callback) + + p.messagesMut.Lock() p.messages[id] = newWhisperFilter(id, p.Whisper()) + p.messagesMut.Unlock() + return id } +// UninstallWhisperFilter disables and removes an existing filter. func (p *XEth) UninstallWhisperFilter(id int) bool { + p.messagesMut.Lock() + defer p.messagesMut.Unlock() + if _, ok := p.messages[id]; ok { delete(p.messages, id) return true } - return false } -func (self *XEth) MessagesChanged(id int) []WhisperMessage { - self.messagesMut.Lock() - defer self.messagesMut.Unlock() +// WhisperMessages retrieves all the known messages that match a specific filter. +func (self *XEth) WhisperMessages(id int) []WhisperMessage { + self.messagesMut.RLock() + defer self.messagesMut.RUnlock() if self.messages[id] != nil { - return self.messages[id].retrieve() + return self.messages[id].messages() } return nil } -func (self *XEth) Messages(id int) []WhisperMessage { - self.messagesMut.Lock() - defer self.messagesMut.Unlock() +// WhisperMessagesChanged retrieves all the new messages matched by a filter +// since the last retrieval +func (self *XEth) WhisperMessagesChanged(id int) []WhisperMessage { + self.messagesMut.RLock() + defer self.messagesMut.RUnlock() if self.messages[id] != nil { - return self.messages[id].messages() + return self.messages[id].retrieve() } return nil }