diff --git a/eth/backend.go b/eth/backend.go index 5b7dc64350..fb401a68d1 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -14,7 +14,6 @@ import ( "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/pow/ezp" "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/state" "github.com/ethereum/go-ethereum/whisper" ) @@ -75,7 +74,6 @@ func New(db ethutil.Database, identity p2p.ClientIdentity, keyManager *crypto.Ke clientIdentity: identity, blacklist: p2p.NewBlacklist(), eventMux: &event.TypeMux{}, - filters: make(map[int]*core.Filter), } eth.txPool = core.NewTxPool(eth) @@ -83,6 +81,7 @@ func New(db ethutil.Database, identity p2p.ClientIdentity, keyManager *crypto.Ke eth.blockManager = core.NewBlockManager(eth) eth.chainManager.SetProcessor(eth.blockManager) eth.whisper = whisper.New() + eth.filterManager = filter.NewFilterManager(eth.EventMux()) hasBlock := eth.chainManager.HasBlock insertChain := eth.chainManager.InsertChain @@ -164,8 +163,7 @@ func (s *Ethereum) Start(seed bool) error { } s.blockPool.Start() s.whisper.Start() - - go s.filterLoop() + s.filterManager.Start() // broadcast transactions s.txSub = s.eventMux.Subscribe(core.TxPreEvent{}) @@ -267,58 +265,9 @@ func saveProtocolVersion(db ethutil.Database) { } } -// InstallFilter adds filter for blockchain events. -// The filter's callbacks will run for matching blocks and messages. -// The filter should not be modified after it has been installed. +// XXX Refactor me & MOVE func (self *Ethereum) InstallFilter(filter *core.Filter) (id int) { - self.filterMu.Lock() - id = self.filterId - self.filters[id] = filter - self.filterId++ - self.filterMu.Unlock() - return id -} - -func (self *Ethereum) UninstallFilter(id int) { - self.filterMu.Lock() - delete(self.filters, id) - self.filterMu.Unlock() -} - -// GetFilter retrieves a filter installed using InstallFilter. -// The filter may not be modified. -func (self *Ethereum) GetFilter(id int) *core.Filter { - self.filterMu.RLock() - defer self.filterMu.RUnlock() - return self.filters[id] -} - -func (self *Ethereum) filterLoop() { - // Subscribe to events - events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Messages(nil)) - for event := range events.Chan() { - switch event.(type) { - case core.NewBlockEvent: - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.BlockCallback != nil { - e := event.(core.NewBlockEvent) - filter.BlockCallback(e.Block) - } - } - self.filterMu.RUnlock() - case state.Messages: - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.MessageCallback != nil { - e := event.(state.Messages) - msgs := filter.FilterMessages(e) - if len(msgs) > 0 { - filter.MessageCallback(msgs) - } - } - } - self.filterMu.RUnlock() - } - } + return self.filterManager.InstallFilter(filter) } +func (self *Ethereum) UninstallFilter(id int) { self.filterManager.UninstallFilter(id) } +func (self *Ethereum) GetFilter(id int) *core.Filter { return self.filterManager.GetFilter(id) } diff --git a/event/filter/old_filter.go b/event/filter/old_filter.go new file mode 100644 index 0000000000..1a9a88173a --- /dev/null +++ b/event/filter/old_filter.go @@ -0,0 +1,94 @@ +// XXX This is the old filter system specifically for messages. This is till in used and could use some refactoring +package filter + +import ( + "sync" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/state" +) + +type FilterManager struct { + eventMux *event.TypeMux + + filterMu sync.RWMutex + filterId int + filters map[int]*core.Filter + + quit chan struct{} +} + +func NewFilterManager(mux *event.TypeMux) *FilterManager { + return &FilterManager{ + eventMux: mux, + filters: make(map[int]*core.Filter), + } +} + +func (self *FilterManager) Start() { + go self.filterLoop() +} + +func (self *FilterManager) Stop() { + close(self.quit) +} + +func (self *FilterManager) InstallFilter(filter *core.Filter) (id int) { + self.filterMu.Lock() + id = self.filterId + self.filters[id] = filter + self.filterId++ + self.filterMu.Unlock() + return id +} + +func (self *FilterManager) UninstallFilter(id int) { + self.filterMu.Lock() + delete(self.filters, id) + self.filterMu.Unlock() +} + +// GetFilter retrieves a filter installed using InstallFilter. +// The filter may not be modified. +func (self *FilterManager) GetFilter(id int) *core.Filter { + self.filterMu.RLock() + defer self.filterMu.RUnlock() + return self.filters[id] +} + +func (self *FilterManager) filterLoop() { + // Subscribe to events + events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Messages(nil)) + +out: + for { + select { + case <-self.quit: + break out + case event := <-events.Chan(): + switch event := event.(type) { + case core.NewBlockEvent: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.BlockCallback != nil { + filter.BlockCallback(event.Block) + } + } + self.filterMu.RUnlock() + + case state.Messages: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.MessageCallback != nil { + msgs := filter.FilterMessages(event) + if len(msgs) > 0 { + filter.MessageCallback(msgs) + } + } + } + self.filterMu.RUnlock() + } + } + } +}