From 982f73fa6d6f12874729faacd0db14fc78d518dd Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 20 Feb 2015 12:59:54 +0100 Subject: [PATCH] Added timeout for filter & removed clipboard. Closes #350 --- rpc/packages.go | 72 ++++++++++++++++++++++++++++------- rpc/packages_test.go | 37 ++++++++++++++++++ rpc/util.go | 33 ++++++++++++++++ ui/qt/clipboard/capi.hpp | 11 ------ ui/qt/clipboard/clipboard.cpp | 20 ---------- ui/qt/clipboard/clipboard.go | 15 -------- ui/qt/clipboard/clipboard.hpp | 23 ----------- 7 files changed, 129 insertions(+), 82 deletions(-) create mode 100644 rpc/packages_test.go delete mode 100644 ui/qt/clipboard/capi.hpp delete mode 100644 ui/qt/clipboard/clipboard.cpp delete mode 100644 ui/qt/clipboard/clipboard.go delete mode 100644 ui/qt/clipboard/clipboard.hpp diff --git a/rpc/packages.go b/rpc/packages.go index 63eea54d6d..7411392c2f 100644 --- a/rpc/packages.go +++ b/rpc/packages.go @@ -13,6 +13,7 @@ import ( "math/big" "strings" "sync" + "time" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -31,13 +32,14 @@ const ( type EthereumApi struct { xeth *xeth.XEth + quit chan struct{} filterManager *filter.FilterManager logMut sync.RWMutex - logs map[int]state.Logs + logs map[int]*logFilter messagesMut sync.RWMutex - messages map[int][]xeth.WhisperMessage + messages map[int]*whisperFilter // Register keeps a list of accounts and transaction data regmut sync.Mutex register map[string][]*NewTxArgs @@ -49,12 +51,14 @@ func NewEthereumApi(eth *xeth.XEth) *EthereumApi { db, _ := ethdb.NewLDBDatabase("dapps") api := &EthereumApi{ xeth: eth, + quit: make(chan struct{}), filterManager: filter.NewFilterManager(eth.Backend().EventMux()), - logs: make(map[int]state.Logs), - messages: make(map[int][]xeth.WhisperMessage), + logs: make(map[int]*logFilter), + messages: make(map[int]*whisperFilter), db: db, } go api.filterManager.Start() + go api.start() return api } @@ -97,7 +101,11 @@ func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) erro self.logMut.Lock() defer self.logMut.Unlock() - self.logs[id] = append(self.logs[id], logs...) + if self.logs[id] == nil { + self.logs[id] = &logFilter{timeout: time.Now()} + } + + self.logs[id].add(logs...) } id = self.filterManager.InstallFilter(filter) *reply = id @@ -113,7 +121,11 @@ func (self *EthereumApi) NewFilterString(args string, reply *interface{}) error self.logMut.Lock() defer self.logMut.Unlock() - self.logs[id] = append(self.logs[id], &state.StateLog{}) + if self.logs[id] == nil { + self.logs[id] = &logFilter{timeout: time.Now()} + } + + self.logs[id].add(&state.StateLog{}) } if args == "pending" { filter.PendingCallback = callback @@ -131,9 +143,9 @@ func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error { self.logMut.Lock() defer self.logMut.Unlock() - *reply = toLogs(self.logs[id]) - - self.logs[id] = nil // empty the logs + if self.logs[id] != nil { + *reply = toLogs(self.logs[id].get()) + } return nil } @@ -331,7 +343,10 @@ func (p *EthereumApi) NewWhisperFilter(args *xeth.Options, reply *interface{}) e args.Fn = func(msg xeth.WhisperMessage) { p.messagesMut.Lock() defer p.messagesMut.Unlock() - p.messages[id] = append(p.messages[id], msg) + if p.messages[id] == nil { + p.messages[id] = &whisperFilter{timeout: time.Now()} + } + p.messages[id].add(msg) // = append(p.messages[id], msg) } id = p.xeth.Whisper().Watch(args) *reply = id @@ -342,9 +357,9 @@ func (self *EthereumApi) MessagesChanged(id int, reply *interface{}) error { self.messagesMut.Lock() defer self.messagesMut.Unlock() - *reply = self.messages[id] - - self.messages[id] = nil // empty the messages + if self.messages[id] != nil { + *reply = self.messages[id].get() + } return nil } @@ -535,3 +550,34 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error rpclogger.DebugDetailf("Reply: %T %s", reply, reply) return nil } + +var filterTickerTime = 15 * time.Second + +func (self *EthereumApi) start() { + timer := time.NewTicker(filterTickerTime) +done: + for { + select { + case <-timer.C: + self.logMut.Lock() + self.messagesMut.Lock() + for id, filter := range self.logs { + if time.Since(filter.timeout) > 20*time.Second { + delete(self.logs, id) + } + } + + for id, filter := range self.messages { + if time.Since(filter.timeout) > 20*time.Second { + delete(self.messages, id) + } + } + case <-self.quit: + break done + } + } +} + +func (self *EthereumApi) stop() { + close(self.quit) +} diff --git a/rpc/packages_test.go b/rpc/packages_test.go new file mode 100644 index 0000000000..037fd78b39 --- /dev/null +++ b/rpc/packages_test.go @@ -0,0 +1,37 @@ +package rpc + +import ( + "sync" + "testing" + "time" +) + +func TestFilterClose(t *testing.T) { + api := &EthereumApi{ + logs: make(map[int]*logFilter), + messages: make(map[int]*whisperFilter), + quit: make(chan struct{}), + } + + filterTickerTime = 1 + api.logs[0] = &logFilter{} + api.messages[0] = &whisperFilter{} + var wg sync.WaitGroup + wg.Add(1) + go api.start() + go func() { + select { + case <-time.After(500 * time.Millisecond): + api.stop() + wg.Done() + } + }() + wg.Wait() + if len(api.logs) != 0 { + t.Error("expected logs to be empty") + } + + if len(api.messages) != 0 { + t.Error("expected messages to be empty") + } +} diff --git a/rpc/util.go b/rpc/util.go index 366e315ac2..29824bcdb9 100644 --- a/rpc/util.go +++ b/rpc/util.go @@ -20,10 +20,12 @@ import ( "encoding/json" "io" "net/http" + "time" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/state" + "github.com/ethereum/go-ethereum/xeth" ) var rpclogger = logger.NewLogger("RPC") @@ -100,3 +102,34 @@ func toLogs(logs state.Logs) (ls []Log) { return } + +type whisperFilter struct { + messages []xeth.WhisperMessage + timeout time.Time +} + +func (w *whisperFilter) add(msgs ...xeth.WhisperMessage) { + w.messages = append(w.messages, msgs...) +} +func (w *whisperFilter) get() []xeth.WhisperMessage { + w.timeout = time.Now() + tmp := w.messages + w.messages = nil + return tmp +} + +type logFilter struct { + logs state.Logs + timeout time.Time +} + +func (l *logFilter) add(logs ...state.Log) { + l.logs = append(l.logs, logs...) +} + +func (l *logFilter) get() state.Logs { + l.timeout = time.Now() + tmp := l.logs + l.logs = nil + return tmp +} diff --git a/ui/qt/clipboard/capi.hpp b/ui/qt/clipboard/capi.hpp deleted file mode 100644 index 2026134691..0000000000 --- a/ui/qt/clipboard/capi.hpp +++ /dev/null @@ -1,11 +0,0 @@ -#pragma once - -#include "clipboard.hpp" - -typedef void Clipboard_; - -Clipboard_ *initClipboard() -{ - Clipboard *clipboard = new(Clipboard); - return static_cast(clipboard); -} diff --git a/ui/qt/clipboard/clipboard.cpp b/ui/qt/clipboard/clipboard.cpp deleted file mode 100644 index 192aa7a2b9..0000000000 --- a/ui/qt/clipboard/clipboard.cpp +++ /dev/null @@ -1,20 +0,0 @@ -#include "clipboard.h" - -#include - -Clipboard::Clipboard() -{ - connect(QApplication::clipboard(), &QClipboard::dataChanged, [this] { emit clipboardChanged();}); -} - -QString Clipboard::get() const -{ - QClipboard *clipboard = QApplication::clipboard(); - return clipboard->text(); -} - -void Clipboard::toClipboard(QString _text) -{ - QClipboard *clipboard = QApplicationion::clipboard(); - clipboard->setText(_text); -} diff --git a/ui/qt/clipboard/clipboard.go b/ui/qt/clipboard/clipboard.go deleted file mode 100644 index 064ee954d0..0000000000 --- a/ui/qt/clipboard/clipboard.go +++ /dev/null @@ -1,15 +0,0 @@ -package clipboard - -// #cgo CPPFLAGS: -I./ -// #cgo CXXFLAGS: -std=c++0x -pedantic-errors -Wall -fno-strict-aliasing -// #cgo LDFLAGS: -lstdc++ -// #cgo pkg-config: Qt5Quick -// -// #include "capi.hpp" -import "C" - -import "github.com/obscuren/qml" - -func SetQMLClipboard(context *qml.Context) { - context.SetVar("clipboard", (unsafe.Pointer)(C.initClipboard())) -} diff --git a/ui/qt/clipboard/clipboard.hpp b/ui/qt/clipboard/clipboard.hpp deleted file mode 100644 index 1aa213ceb1..0000000000 --- a/ui/qt/clipboard/clipboard.hpp +++ /dev/null @@ -1,23 +0,0 @@ -#pragma once - -#ifdef __cplusplus -extern "C" { -#endif - -class Clipboard : public QObject -{ - Q_OBJECT - Q_PROPERTY(QString get READ get WRITE toClipboard NOTIFY clipboardChanged) -public: - Clipboard(); - virtual ~Clipboard(){} - - Q_INVOKABLE void toClipboard(QString _text); - -signals: - void clipboardChanged(); -}; - -#ifdef __cplusplus -} // extern "C" -#endif