From 6c04c19eb4506efa5f6de47561025b3702619f79 Mon Sep 17 00:00:00 2001 From: Taylor Gerring Date: Thu, 19 Mar 2015 22:58:07 -0400 Subject: [PATCH] Reorg filter logic to XEth --- core/filter.go | 2 +- event/filter/eth_filter.go | 4 +- rpc/api.go | 191 +++++----------------------------- rpc/api_test.go | 64 ++++++------ rpc/args.go | 10 ++ xeth/xeth.go | 205 ++++++++++++++++++++++++++++++++++++- 6 files changed, 273 insertions(+), 203 deletions(-) diff --git a/core/filter.go b/core/filter.go index 487e82902e..f64ab4c07f 100644 --- a/core/filter.go +++ b/core/filter.go @@ -46,7 +46,7 @@ func NewFilter(eth Backend) *Filter { // SetOptions copies the filter options to the filter it self. The reason for this "silly" copy // is simply because named arguments in this case is extremely nice and readable. -func (self *Filter) SetOptions(options FilterOptions) { +func (self *Filter) SetOptions(options *FilterOptions) { self.earliest = options.Earliest self.latest = options.Latest self.skip = options.Skip diff --git a/event/filter/eth_filter.go b/event/filter/eth_filter.go index cb75d7e1a0..4406372dbe 100644 --- a/event/filter/eth_filter.go +++ b/event/filter/eth_filter.go @@ -48,7 +48,9 @@ func (self *FilterManager) InstallFilter(filter *core.Filter) (id int) { func (self *FilterManager) UninstallFilter(id int) { self.filterMu.Lock() defer self.filterMu.Unlock() - delete(self.filters, id) + if _, ok := self.filters[id]; ok { + delete(self.filters, id) + } } // GetFilter retrieves a filter installed using InstallFilter. diff --git a/rpc/api.go b/rpc/api.go index 4f86e703da..85f798c7c7 100644 --- a/rpc/api.go +++ b/rpc/api.go @@ -7,7 +7,6 @@ import ( "path" "strings" "sync" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -15,15 +14,13 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/event/filter" "github.com/ethereum/go-ethereum/state" "github.com/ethereum/go-ethereum/xeth" ) var ( - defaultGasPrice = big.NewInt(150000000000) - defaultGas = big.NewInt(500000) - filterTickerTime = 5 * time.Minute + defaultGasPrice = big.NewInt(150000000000) + defaultGas = big.NewInt(500000) ) type EthereumApi struct { @@ -31,17 +28,9 @@ type EthereumApi struct { xethMu sync.RWMutex mux *event.TypeMux - quit chan struct{} - filterManager *filter.FilterManager - - logMut sync.RWMutex - logs map[int]*logFilter - - messagesMut sync.RWMutex - messages map[int]*whisperFilter - // Register keeps a list of accounts and transaction data - regmut sync.Mutex - register map[string][]*NewTxArgs + // // Register keeps a list of accounts and transaction data + // regmut sync.Mutex + // register map[string][]*NewTxArgs db common.Database } @@ -49,16 +38,10 @@ type EthereumApi struct { func NewEthereumApi(eth *xeth.XEth, dataDir string) *EthereumApi { db, _ := ethdb.NewLDBDatabase(path.Join(dataDir, "dapps")) api := &EthereumApi{ - eth: eth, - mux: eth.Backend().EventMux(), - quit: make(chan struct{}), - filterManager: filter.NewFilterManager(eth.Backend().EventMux()), - logs: make(map[int]*logFilter), - messages: make(map[int]*whisperFilter), - db: db, + eth: eth, + mux: eth.Backend().EventMux(), + db: db, } - go api.filterManager.Start() - go api.start() return api } @@ -85,39 +68,6 @@ func (self *EthereumApi) getStateWithNum(num int64) *xeth.State { return self.xethWithStateNum(num).State() } -func (self *EthereumApi) start() { - timer := time.NewTicker(2 * time.Second) -done: - for { - select { - case <-timer.C: - self.logMut.Lock() - self.messagesMut.Lock() - for id, filter := range self.logs { - if time.Since(filter.timeout) > filterTickerTime { - self.filterManager.UninstallFilter(id) - delete(self.logs, id) - } - } - - for id, filter := range self.messages { - if time.Since(filter.timeout) > filterTickerTime { - self.xeth().Whisper().Unwatch(id) - delete(self.messages, id) - } - } - self.messagesMut.Unlock() - self.logMut.Unlock() - case <-self.quit: - break done - } - } -} - -func (self *EthereumApi) stop() { - close(self.quit) -} - // func (self *EthereumApi) Register(args string, reply *interface{}) error { // self.regmut.Lock() // defer self.regmut.Unlock() @@ -149,91 +99,43 @@ func (self *EthereumApi) stop() { // } func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) error { - var id int - filter := core.NewFilter(self.xeth().Backend()) - filter.SetOptions(toFilterOptions(args)) - filter.LogsCallback = func(logs state.Logs) { - self.logMut.Lock() - defer self.logMut.Unlock() - - self.logs[id].add(logs...) - } - id = self.filterManager.InstallFilter(filter) - self.logs[id] = &logFilter{timeout: time.Now()} - + opts := toFilterOptions(args) + id := self.xeth().RegisterFilter(opts) *reply = common.ToHex(big.NewInt(int64(id)).Bytes()) return nil } func (self *EthereumApi) UninstallFilter(id int, reply *interface{}) error { - if _, ok := self.logs[id]; ok { - delete(self.logs, id) - } + *reply = self.xeth().UninstallFilter(id) - self.filterManager.UninstallFilter(id) - *reply = true return nil } func (self *EthereumApi) NewFilterString(args *FilterStringArgs, reply *interface{}) error { - var id int - filter := core.NewFilter(self.xeth().Backend()) - - callback := func(block *types.Block, logs state.Logs) { - self.logMut.Lock() - defer self.logMut.Unlock() - - for _, log := range logs { - self.logs[id].add(log) - } - self.logs[id].add(&state.StateLog{}) - } - - switch args.Word { - case "pending": - filter.PendingCallback = callback - case "latest": - filter.BlockCallback = callback - default: - return NewValidationError("Word", "Must be `latest` or `pending`") + if err := args.requirements(); err != nil { + return err } - id = self.filterManager.InstallFilter(filter) - self.logs[id] = &logFilter{timeout: time.Now()} + id := self.xeth().NewFilterString(args.Word) *reply = common.ToHex(big.NewInt(int64(id)).Bytes()) - return nil } func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error { - self.logMut.Lock() - defer self.logMut.Unlock() - - if self.logs[id] != nil { - *reply = NewLogsRes(self.logs[id].get()) - } - + *reply = NewLogsRes(self.xeth().FilterChanged(id)) return nil } func (self *EthereumApi) Logs(id int, reply *interface{}) error { - self.logMut.Lock() - defer self.logMut.Unlock() - - filter := self.filterManager.GetFilter(id) - if filter != nil { - *reply = NewLogsRes(filter.Find()) - } + *reply = NewLogsRes(self.xeth().Logs(id)) return nil } func (self *EthereumApi) AllLogs(args *FilterOptions, reply *interface{}) error { - filter := core.NewFilter(self.xeth().Backend()) - filter.SetOptions(toFilterOptions(args)) - - *reply = NewLogsRes(filter.Find()) + opts := toFilterOptions(args) + *reply = NewLogsRes(self.xeth().AllLogs(opts)) return nil } @@ -385,36 +287,22 @@ func (p *EthereumApi) NewWhisperIdentity(reply *interface{}) error { // } func (p *EthereumApi) NewWhisperFilter(args *WhisperFilterArgs, reply *interface{}) error { - var id int opts := new(xeth.Options) opts.From = args.From opts.To = args.To opts.Topics = args.Topics - opts.Fn = func(msg xeth.WhisperMessage) { - p.messagesMut.Lock() - defer p.messagesMut.Unlock() - p.messages[id].add(msg) // = append(p.messages[id], msg) - } - id = p.xeth().Whisper().Watch(opts) - p.messages[id] = &whisperFilter{timeout: time.Now()} + id := p.xeth().NewWhisperFilter(opts) *reply = common.ToHex(big.NewInt(int64(id)).Bytes()) return nil } func (p *EthereumApi) UninstallWhisperFilter(id int, reply *interface{}) error { - delete(p.messages, id) - *reply = true + *reply = p.xeth().UninstallWhisperFilter(id) return nil } func (self *EthereumApi) MessagesChanged(id int, reply *interface{}) error { - self.messagesMut.Lock() - defer self.messagesMut.Unlock() - - if self.messages[id] != nil { - *reply = self.messages[id].get() - } - + *reply = self.xeth().MessagesChanged(id) return nil } @@ -835,7 +723,7 @@ func (self *EthereumApi) xeth() *xeth.XEth { return self.eth } -func toFilterOptions(options *FilterOptions) core.FilterOptions { +func toFilterOptions(options *FilterOptions) *core.FilterOptions { var opts core.FilterOptions // Convert optional address slice/string to byte slice @@ -868,38 +756,5 @@ func toFilterOptions(options *FilterOptions) core.FilterOptions { } opts.Topics = topics - return opts -} - -type whisperFilter struct { - messages []xeth.WhisperMessage - timeout time.Time - id int -} - -func (w *whisperFilter) add(msgs ...xeth.WhisperMessage) { - w.messages = append(w.messages, msgs...) -} -func (w *whisperFilter) get() []xeth.WhisperMessage { - w.timeout = time.Now() - tmp := w.messages - w.messages = nil - return tmp -} - -type logFilter struct { - logs state.Logs - timeout time.Time - id int -} - -func (l *logFilter) add(logs ...state.Log) { - l.logs = append(l.logs, logs...) -} - -func (l *logFilter) get() state.Logs { - l.timeout = time.Now() - tmp := l.logs - l.logs = nil - return tmp + return &opts } diff --git a/rpc/api_test.go b/rpc/api_test.go index ec03822c5b..727ade007d 100644 --- a/rpc/api_test.go +++ b/rpc/api_test.go @@ -2,9 +2,9 @@ package rpc import ( "encoding/json" - "sync" + // "sync" "testing" - "time" + // "time" ) func TestWeb3Sha3(t *testing.T) { @@ -24,33 +24,33 @@ func TestWeb3Sha3(t *testing.T) { } } -func TestFilterClose(t *testing.T) { - t.Skip() - api := &EthereumApi{ - logs: make(map[int]*logFilter), - messages: make(map[int]*whisperFilter), - quit: make(chan struct{}), - } - - filterTickerTime = 1 - api.logs[0] = &logFilter{} - api.messages[0] = &whisperFilter{} - var wg sync.WaitGroup - wg.Add(1) - go api.start() - go func() { - select { - case <-time.After(500 * time.Millisecond): - api.stop() - wg.Done() - } - }() - wg.Wait() - if len(api.logs) != 0 { - t.Error("expected logs to be empty") - } - - if len(api.messages) != 0 { - t.Error("expected messages to be empty") - } -} +// func TestFilterClose(t *testing.T) { +// t.Skip() +// api := &EthereumApi{ +// logs: make(map[int]*logFilter), +// messages: make(map[int]*whisperFilter), +// quit: make(chan struct{}), +// } + +// filterTickerTime = 1 +// api.logs[0] = &logFilter{} +// api.messages[0] = &whisperFilter{} +// var wg sync.WaitGroup +// wg.Add(1) +// go api.start() +// go func() { +// select { +// case <-time.After(500 * time.Millisecond): +// api.stop() +// wg.Done() +// } +// }() +// wg.Wait() +// if len(api.logs) != 0 { +// t.Error("expected logs to be empty") +// } + +// if len(api.messages) != 0 { +// t.Error("expected messages to be empty") +// } +// } diff --git a/rpc/args.go b/rpc/args.go index ab1c405850..bd6be5a0f3 100644 --- a/rpc/args.go +++ b/rpc/args.go @@ -609,6 +609,16 @@ func (args *FilterStringArgs) UnmarshalJSON(b []byte) (err error) { return nil } +func (args *FilterStringArgs) requirements() error { + switch args.Word { + case "latest", "pending": + break + default: + return NewValidationError("Word", "Must be `latest` or `pending`") + } + return nil +} + type FilterIdArgs struct { Id int } diff --git a/xeth/xeth.go b/xeth/xeth.go index 115bd787ae..922fce8f11 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -6,6 +6,8 @@ import ( "encoding/json" "fmt" "math/big" + "sync" + "time" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" @@ -13,13 +15,17 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/event/filter" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/state" "github.com/ethereum/go-ethereum/whisper" ) -var pipelogger = logger.NewLogger("XETH") +var ( + pipelogger = logger.NewLogger("XETH") + filterTickerTime = 5 * time.Minute +) // to resolve the import cycle type Backend interface { @@ -71,6 +77,15 @@ type XEth struct { whisper *Whisper frontend Frontend + + quit chan struct{} + filterManager *filter.FilterManager + + logMut sync.RWMutex + logs map[int]*logFilter + + messagesMut sync.RWMutex + messages map[int]*whisperFilter } // dummyFrontend is a non-interactive frontend that allows all @@ -90,15 +105,55 @@ func New(eth Backend, frontend Frontend) *XEth { chainManager: eth.ChainManager(), accountManager: eth.AccountManager(), whisper: NewWhisper(eth.Whisper()), + quit: make(chan struct{}), + filterManager: filter.NewFilterManager(eth.EventMux()), frontend: frontend, + logs: make(map[int]*logFilter), + messages: make(map[int]*whisperFilter), } if frontend == nil { xeth.frontend = dummyFrontend{} } xeth.state = NewState(xeth, xeth.chainManager.TransState()) + go xeth.start() + go xeth.filterManager.Start() + return xeth } +func (self *XEth) start() { + timer := time.NewTicker(2 * time.Second) +done: + for { + select { + case <-timer.C: + self.logMut.Lock() + self.messagesMut.Lock() + for id, filter := range self.logs { + if time.Since(filter.timeout) > filterTickerTime { + self.filterManager.UninstallFilter(id) + delete(self.logs, id) + } + } + + for id, filter := range self.messages { + if time.Since(filter.timeout) > filterTickerTime { + self.Whisper().Unwatch(id) + delete(self.messages, id) + } + } + self.messagesMut.Unlock() + self.logMut.Unlock() + case <-self.quit: + break done + } + } +} + +func (self *XEth) stop() { + close(self.quit) +} + func (self *XEth) Backend() Backend { return self.eth } func (self *XEth) WithState(statedb *state.StateDB) *XEth { xeth := &XEth{ @@ -241,6 +296,121 @@ func (self *XEth) SecretToAddress(key string) string { return common.ToHex(pair.Address()) } +func (self *XEth) RegisterFilter(args *core.FilterOptions) int { + var id int + filter := core.NewFilter(self.Backend()) + filter.SetOptions(args) + filter.LogsCallback = func(logs state.Logs) { + self.logMut.Lock() + defer self.logMut.Unlock() + + self.logs[id].add(logs...) + } + id = self.filterManager.InstallFilter(filter) + self.logs[id] = &logFilter{timeout: time.Now()} + + return id +} + +func (self *XEth) UninstallFilter(id int) bool { + if _, ok := self.logs[id]; ok { + delete(self.logs, id) + self.filterManager.UninstallFilter(id) + return true + } + + return false +} + +func (self *XEth) NewFilterString(word string) int { + var id int + filter := core.NewFilter(self.Backend()) + + callback := func(block *types.Block, logs state.Logs) { + self.logMut.Lock() + defer self.logMut.Unlock() + + for _, log := range logs { + self.logs[id].add(log) + } + self.logs[id].add(&state.StateLog{}) + } + + switch word { + case "pending": + filter.PendingCallback = callback + case "latest": + filter.BlockCallback = callback + } + + id = self.filterManager.InstallFilter(filter) + self.logs[id] = &logFilter{timeout: time.Now()} + + return id +} + +func (self *XEth) FilterChanged(id int) state.Logs { + self.logMut.Lock() + defer self.logMut.Unlock() + + if self.logs[id] != nil { + return self.logs[id].get() + } + + return nil +} + +func (self *XEth) Logs(id int) state.Logs { + self.logMut.Lock() + defer self.logMut.Unlock() + + filter := self.filterManager.GetFilter(id) + if filter != nil { + return filter.Find() + } + + return nil +} + +func (self *XEth) AllLogs(args *core.FilterOptions) state.Logs { + filter := core.NewFilter(self.Backend()) + filter.SetOptions(args) + + return filter.Find() +} + +func (p *XEth) NewWhisperFilter(opts *Options) int { + var id int + opts.Fn = func(msg WhisperMessage) { + p.messagesMut.Lock() + defer p.messagesMut.Unlock() + p.messages[id].add(msg) // = append(p.messages[id], msg) + } + id = p.Whisper().Watch(opts) + p.messages[id] = &whisperFilter{timeout: time.Now()} + return id +} + +func (p *XEth) UninstallWhisperFilter(id int) bool { + if _, ok := p.messages[id]; ok { + delete(p.messages, id) + return true + } + + return false +} + +func (self *XEth) MessagesChanged(id int) []WhisperMessage { + self.messagesMut.Lock() + defer self.messagesMut.Unlock() + + if self.messages[id] != nil { + return self.messages[id].get() + } + + return nil +} + type KeyVal struct { Key string `json:"key"` Value string `json:"value"` @@ -411,3 +581,36 @@ func (m callmsg) GasPrice() *big.Int { return m.gasPrice } func (m callmsg) Gas() *big.Int { return m.gas } func (m callmsg) Value() *big.Int { return m.value } func (m callmsg) Data() []byte { return m.data } + +type whisperFilter struct { + messages []WhisperMessage + timeout time.Time + id int +} + +func (w *whisperFilter) add(msgs ...WhisperMessage) { + w.messages = append(w.messages, msgs...) +} +func (w *whisperFilter) get() []WhisperMessage { + w.timeout = time.Now() + tmp := w.messages + w.messages = nil + return tmp +} + +type logFilter struct { + logs state.Logs + timeout time.Time + id int +} + +func (l *logFilter) add(logs ...state.Log) { + l.logs = append(l.logs, logs...) +} + +func (l *logFilter) get() state.Logs { + l.timeout = time.Now() + tmp := l.logs + l.logs = nil + return tmp +}