diff --git a/core/filter.go b/core/filter.go index c10fb7eebc..2ca57da654 100644 --- a/core/filter.go +++ b/core/filter.go @@ -22,9 +22,9 @@ type Filter struct { max int topics [][]common.Hash - BlockCallback func(*types.Block, state.Logs) - PendingCallback func(*types.Transaction) - LogsCallback func(state.Logs) + BlockCallback func(*types.Block, state.Logs) + TransactionCallback func(*types.Transaction) + LogsCallback func(state.Logs) } // Create a new filter which uses a bloom filter on blocks to figure out whether a particular block diff --git a/event/filter/eth_filter.go b/event/filter/eth_filter.go index a1abe34181..b0d5078a23 100644 --- a/event/filter/eth_filter.go +++ b/event/filter/eth_filter.go @@ -88,8 +88,8 @@ out: case core.TxPreEvent: self.filterMu.RLock() for _, filter := range self.filters { - if filter.PendingCallback != nil { - filter.PendingCallback(event.Tx) + if filter.TransactionCallback != nil { + filter.TransactionCallback(event.Tx) } } self.filterMu.RUnlock() diff --git a/rpc/api.go b/rpc/api.go index b79a1306ef..a3312075b9 100644 --- a/rpc/api.go +++ b/rpc/api.go @@ -322,14 +322,13 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err return err } - id := api.xeth().RegisterFilter(args.Earliest, args.Latest, args.Skip, args.Max, args.Address, args.Topics) + id := api.xeth().NewLogFilter(args.Earliest, args.Latest, args.Skip, args.Max, args.Address, args.Topics) *reply = newHexNum(big.NewInt(int64(id)).Bytes()) + case "eth_newBlockFilter": - args := new(FilterStringArgs) - if err := json.Unmarshal(req.Params, &args); err != nil { - return err - } - *reply = newHexNum(api.xeth().NewFilterString(args.Word)) + *reply = newHexNum(api.xeth().NewBlockFilter()) + case "eth_transactionFilter": + *reply = newHexNum(api.xeth().NewTransactionFilter()) case "eth_uninstallFilter": args := new(FilterIdArgs) if err := json.Unmarshal(req.Params, &args); err != nil { @@ -341,7 +340,17 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err if err := json.Unmarshal(req.Params, &args); err != nil { return err } - *reply = NewLogsRes(api.xeth().FilterChanged(args.Id)) + + switch api.xeth().GetFilterType(args.Id) { + case xeth.BlockFilterTy: + *reply = NewHashesRes(api.xeth().BlockFilterChanged(args.Id)) + case xeth.TransactionFilterTy: + *reply = NewHashesRes(api.xeth().TransactionFilterChanged(args.Id)) + case xeth.LogFilterTy: + *reply = NewLogsRes(api.xeth().LogFilterChanged(args.Id)) + default: + *reply = []string{} // reply empty string slice + } case "eth_getFilterLogs": args := new(FilterIdArgs) if err := json.Unmarshal(req.Params, &args); err != nil { diff --git a/rpc/responses.go b/rpc/responses.go index 884b7e69ba..9fdf60c022 100644 --- a/rpc/responses.go +++ b/rpc/responses.go @@ -3,6 +3,7 @@ package rpc import ( "encoding/json" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" ) @@ -303,3 +304,13 @@ func NewLogsRes(logs state.Logs) (ls []LogRes) { return } + +func NewHashesRes(hs []common.Hash) []string { + hashes := make([]string, len(hs)) + + for i, hash := range hs { + hashes[i] = hash.Hex() + } + + return hashes +} diff --git a/xeth/xeth.go b/xeth/xeth.go index ac59069d57..a0b936b570 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -29,6 +29,14 @@ var ( defaultGas = big.NewInt(90000) //500000 ) +// byte will be inferred +const ( + UnknownFilterTy = iota + BlockFilterTy + TransactionFilterTy + LogFilterTy +) + func DefaultGas() *big.Int { return new(big.Int).Set(defaultGas) } func DefaultGasPrice() *big.Int { return new(big.Int).Set(defaultGasPrice) } @@ -42,11 +50,17 @@ type XEth struct { quit chan struct{} filterManager *filter.FilterManager - logMut sync.RWMutex - logs map[int]*logFilter + logMu sync.RWMutex + logQueue map[int]*logQueue + + blockMu sync.RWMutex + blockQueue map[int]*hashQueue + + transactionMu sync.RWMutex + transactionQueue map[int]*hashQueue - messagesMut sync.RWMutex - messages map[int]*whisperFilter + messagesMu sync.RWMutex + messages map[int]*whisperFilter // regmut sync.Mutex // register map[string][]*interface{} // TODO improve return type @@ -59,14 +73,16 @@ type XEth struct { // confirms all transactions will be used. func New(eth *eth.Ethereum, frontend Frontend) *XEth { xeth := &XEth{ - backend: eth, - frontend: frontend, - whisper: NewWhisper(eth.Whisper()), - quit: make(chan struct{}), - filterManager: filter.NewFilterManager(eth.EventMux()), - logs: make(map[int]*logFilter), - messages: make(map[int]*whisperFilter), - agent: miner.NewRemoteAgent(), + backend: eth, + frontend: frontend, + whisper: NewWhisper(eth.Whisper()), + quit: make(chan struct{}), + filterManager: filter.NewFilterManager(eth.EventMux()), + logQueue: make(map[int]*logQueue), + blockQueue: make(map[int]*hashQueue), + transactionQueue: make(map[int]*hashQueue), + messages: make(map[int]*whisperFilter), + agent: miner.NewRemoteAgent(), } eth.Miner().Register(xeth.agent) @@ -87,23 +103,41 @@ done: for { select { case <-timer.C: - self.logMut.Lock() - self.messagesMut.Lock() - for id, filter := range self.logs { + self.logMu.Lock() + for id, filter := range self.logQueue { + if time.Since(filter.timeout) > filterTickerTime { + self.filterManager.UninstallFilter(id) + delete(self.logQueue, id) + } + } + self.logMu.Unlock() + + self.blockMu.Lock() + for id, filter := range self.blockQueue { + if time.Since(filter.timeout) > filterTickerTime { + self.filterManager.UninstallFilter(id) + delete(self.blockQueue, id) + } + } + self.blockMu.Unlock() + + self.transactionMu.Lock() + for id, filter := range self.transactionQueue { if time.Since(filter.timeout) > filterTickerTime { self.filterManager.UninstallFilter(id) - delete(self.logs, id) + delete(self.transactionQueue, id) } } + self.transactionMu.Unlock() + self.messagesMu.Lock() for id, filter := range self.messages { if time.Since(filter.activity()) > filterTickerTime { self.Whisper().Unwatch(id) delete(self.messages, id) } } - self.messagesMut.Unlock() - self.logMut.Unlock() + self.messagesMu.Unlock() case <-self.quit: break done } @@ -360,7 +394,32 @@ func (self *XEth) SecretToAddress(key string) string { return common.ToHex(pair.Address()) } -func (self *XEth) RegisterFilter(earliest, latest int64, skip, max int, address []string, topics [][]string) int { +func (self *XEth) UninstallFilter(id int) bool { + defer self.filterManager.UninstallFilter(id) + + if _, ok := self.logQueue[id]; ok { + self.logMu.Lock() + defer self.logMu.Unlock() + delete(self.logQueue, id) + return true + } + if _, ok := self.blockQueue[id]; ok { + self.blockMu.Lock() + defer self.blockMu.Unlock() + delete(self.blockQueue, id) + return true + } + if _, ok := self.transactionQueue[id]; ok { + self.transactionMu.Lock() + defer self.transactionMu.Unlock() + delete(self.transactionQueue, id) + return true + } + + return false +} + +func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address []string, topics [][]string) int { var id int filter := core.NewFilter(self.backend) filter.SetEarliestBlock(earliest) @@ -370,71 +429,90 @@ func (self *XEth) RegisterFilter(earliest, latest int64, skip, max int, address filter.SetAddress(cAddress(address)) filter.SetTopics(cTopics(topics)) filter.LogsCallback = func(logs state.Logs) { - self.logMut.Lock() - defer self.logMut.Unlock() + self.logMu.Lock() + defer self.logMu.Unlock() - self.logs[id].add(logs...) + self.logQueue[id].add(logs...) } id = self.filterManager.InstallFilter(filter) - self.logs[id] = &logFilter{timeout: time.Now()} + self.logQueue[id] = &logQueue{timeout: time.Now()} return id } -func (self *XEth) UninstallFilter(id int) bool { - if _, ok := self.logs[id]; ok { - delete(self.logs, id) - self.filterManager.UninstallFilter(id) - return true - } +func (self *XEth) NewTransactionFilter() int { + var id int + filter := core.NewFilter(self.backend) + filter.TransactionCallback = func(tx *types.Transaction) { + self.transactionMu.Lock() + defer self.transactionMu.Unlock() - return false + self.transactionQueue[id].add(tx.Hash()) + } + id = self.filterManager.InstallFilter(filter) + self.transactionQueue[id] = &hashQueue{timeout: time.Now()} + return id } -func (self *XEth) NewFilterString(word string) int { +func (self *XEth) NewBlockFilter() int { var id int filter := core.NewFilter(self.backend) + filter.BlockCallback = func(block *types.Block, logs state.Logs) { + self.blockMu.Lock() + defer self.blockMu.Unlock() - switch word { - case "pending": - filter.PendingCallback = func(tx *types.Transaction) { - self.logMut.Lock() - defer self.logMut.Unlock() - - self.logs[id].add(&state.Log{}) - } - case "latest": - filter.BlockCallback = func(block *types.Block, logs state.Logs) { - self.logMut.Lock() - defer self.logMut.Unlock() + self.blockQueue[id].add(block.Hash()) + } + id = self.filterManager.InstallFilter(filter) + self.blockQueue[id] = &hashQueue{timeout: time.Now()} + return id +} - for _, log := range logs { - self.logs[id].add(log) - } - self.logs[id].add(&state.Log{}) - } +func (self *XEth) GetFilterType(id int) byte { + if _, ok := self.blockQueue[id]; ok { + return BlockFilterTy + } else if _, ok := self.transactionQueue[id]; ok { + return TransactionFilterTy + } else if _, ok := self.logQueue[id]; ok { + return LogFilterTy } - id = self.filterManager.InstallFilter(filter) - self.logs[id] = &logFilter{timeout: time.Now()} + return UnknownFilterTy +} - return id +func (self *XEth) LogFilterChanged(id int) state.Logs { + self.logMu.Lock() + defer self.logMu.Unlock() + + if self.logQueue[id] != nil { + return self.logQueue[id].get() + } + return nil } -func (self *XEth) FilterChanged(id int) state.Logs { - self.logMut.Lock() - defer self.logMut.Unlock() +func (self *XEth) BlockFilterChanged(id int) []common.Hash { + self.blockMu.Lock() + defer self.blockMu.Unlock() - if self.logs[id] != nil { - return self.logs[id].get() + if self.blockQueue[id] != nil { + return self.blockQueue[id].get() } + return nil +} + +func (self *XEth) TransactionFilterChanged(id int) []common.Hash { + self.blockMu.Lock() + defer self.blockMu.Unlock() + if self.blockQueue[id] != nil { + return self.transactionQueue[id].get() + } return nil } func (self *XEth) Logs(id int) state.Logs { - self.logMut.Lock() - defer self.logMut.Unlock() + self.logMu.Lock() + defer self.logMu.Unlock() filter := self.filterManager.GetFilter(id) if filter != nil { @@ -465,24 +543,24 @@ func (p *XEth) NewWhisperFilter(to, from string, topics [][]string) int { // Callback to delegate core whisper messages to this xeth filter callback := func(msg WhisperMessage) { - p.messagesMut.RLock() // Only read lock to the filter pool - defer p.messagesMut.RUnlock() + p.messagesMu.RLock() // Only read lock to the filter pool + defer p.messagesMu.RUnlock() p.messages[id].insert(msg) } // Initialize the core whisper filter and wrap into xeth id = p.Whisper().Watch(to, from, topics, callback) - p.messagesMut.Lock() + p.messagesMu.Lock() p.messages[id] = newWhisperFilter(id, p.Whisper()) - p.messagesMut.Unlock() + p.messagesMu.Unlock() return id } // UninstallWhisperFilter disables and removes an existing filter. func (p *XEth) UninstallWhisperFilter(id int) bool { - p.messagesMut.Lock() - defer p.messagesMut.Unlock() + p.messagesMu.Lock() + defer p.messagesMu.Unlock() if _, ok := p.messages[id]; ok { delete(p.messages, id) @@ -493,8 +571,8 @@ func (p *XEth) UninstallWhisperFilter(id int) bool { // WhisperMessages retrieves all the known messages that match a specific filter. func (self *XEth) WhisperMessages(id int) []WhisperMessage { - self.messagesMut.RLock() - defer self.messagesMut.RUnlock() + self.messagesMu.RLock() + defer self.messagesMu.RUnlock() if self.messages[id] != nil { return self.messages[id].messages() @@ -505,8 +583,8 @@ func (self *XEth) WhisperMessages(id int) []WhisperMessage { // WhisperMessagesChanged retrieves all the new messages matched by a filter // since the last retrieval func (self *XEth) WhisperMessagesChanged(id int) []WhisperMessage { - self.messagesMut.RLock() - defer self.messagesMut.RUnlock() + self.messagesMu.RLock() + defer self.messagesMu.RUnlock() if self.messages[id] != nil { return self.messages[id].retrieve() @@ -767,19 +845,36 @@ func (m callmsg) Gas() *big.Int { return m.gas } func (m callmsg) Value() *big.Int { return m.value } func (m callmsg) Data() []byte { return m.data } -type logFilter struct { +type logQueue struct { logs state.Logs timeout time.Time id int } -func (l *logFilter) add(logs ...*state.Log) { +func (l *logQueue) add(logs ...*state.Log) { l.logs = append(l.logs, logs...) } -func (l *logFilter) get() state.Logs { +func (l *logQueue) get() state.Logs { l.timeout = time.Now() tmp := l.logs l.logs = nil return tmp } + +type hashQueue struct { + hashes []common.Hash + timeout time.Time + id int +} + +func (l *hashQueue) add(hashes ...common.Hash) { + l.hashes = append(l.hashes, hashes...) +} + +func (l *hashQueue) get() []common.Hash { + l.timeout = time.Now() + tmp := l.hashes + l.hashes = nil + return tmp +}