whisper: message filtering optimized

pull/16210/head
Vlad 7 years ago
parent 4c845bdc27
commit 014d8d9837
  1. 66
      whisper/whisperv6/filter.go

@ -35,6 +35,7 @@ type Filter struct {
PoW float64 // Proof of work as described in the Whisper spec PoW float64 // Proof of work as described in the Whisper spec
AllowP2P bool // Indicates whether this filter is interested in direct peer-to-peer messages AllowP2P bool // Indicates whether this filter is interested in direct peer-to-peer messages
SymKeyHash common.Hash // The Keccak256Hash of the symmetric key, needed for optimization SymKeyHash common.Hash // The Keccak256Hash of the symmetric key, needed for optimization
id string // unique identifier
Messages map[common.Hash]*ReceivedMessage Messages map[common.Hash]*ReceivedMessage
mutex sync.RWMutex mutex sync.RWMutex
@ -42,16 +43,21 @@ type Filter struct {
// Filters represents a collection of filters // Filters represents a collection of filters
type Filters struct { type Filters struct {
watchers map[string]*Filter watchers map[string]*Filter
whisper *Whisper topicMatcher map[TopicType]map[*Filter]struct{}
mutex sync.RWMutex allTopicsMatcher map[*Filter]struct{}
whisper *Whisper
mutex sync.RWMutex
} }
// NewFilters returns a newly created filter collection // NewFilters returns a newly created filter collection
func NewFilters(w *Whisper) *Filters { func NewFilters(w *Whisper) *Filters {
return &Filters{ return &Filters{
watchers: make(map[string]*Filter), watchers: make(map[string]*Filter),
whisper: w, topicMatcher: make(map[TopicType]map[*Filter]struct{}),
allTopicsMatcher: make(map[*Filter]struct{}),
whisper: w,
} }
} }
@ -81,7 +87,9 @@ func (fs *Filters) Install(watcher *Filter) (string, error) {
watcher.SymKeyHash = crypto.Keccak256Hash(watcher.KeySym) watcher.SymKeyHash = crypto.Keccak256Hash(watcher.KeySym)
} }
watcher.id = id
fs.watchers[id] = watcher fs.watchers[id] = watcher
fs.addTopicMatcher(watcher)
return id, err return id, err
} }
@ -91,12 +99,49 @@ func (fs *Filters) Uninstall(id string) bool {
fs.mutex.Lock() fs.mutex.Lock()
defer fs.mutex.Unlock() defer fs.mutex.Unlock()
if fs.watchers[id] != nil { if fs.watchers[id] != nil {
fs.removeFromTopicMatchers(fs.watchers[id])
delete(fs.watchers, id) delete(fs.watchers, id)
return true return true
} }
return false return false
} }
// addTopicMatcher adds a filter to the topic matchers
func (fs *Filters) addTopicMatcher(watcher *Filter) {
if len(watcher.Topics) == 0 {
fs.allTopicsMatcher[watcher] = struct{}{}
} else {
for _, t := range watcher.Topics {
topic := BytesToTopic(t)
if fs.topicMatcher[topic] == nil {
fs.topicMatcher[topic] = make(map[*Filter]struct{})
}
fs.topicMatcher[topic][watcher] = struct{}{}
}
}
}
// removeFromTopicMatchers removes a filter from the topic matchers
func (fs *Filters) removeFromTopicMatchers(watcher *Filter) {
delete(fs.allTopicsMatcher, watcher)
for _, topic := range watcher.Topics {
delete(fs.topicMatcher[BytesToTopic(topic)], watcher)
}
}
// getWatchersByTopic returns a slice containing the filters that
// match a specific topic
func (fs *Filters) getWatchersByTopic(topic TopicType) []*Filter {
res := make([]*Filter, 0, len(fs.allTopicsMatcher))
for watcher, _ := range fs.allTopicsMatcher {
res = append(res, watcher)
}
for watcher, _ := range fs.topicMatcher[topic] {
res = append(res, watcher)
}
return res
}
// Get returns a filter from the collection with a specific ID // Get returns a filter from the collection with a specific ID
func (fs *Filters) Get(id string) *Filter { func (fs *Filters) Get(id string) *Filter {
fs.mutex.RLock() fs.mutex.RLock()
@ -112,11 +157,10 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
fs.mutex.RLock() fs.mutex.RLock()
defer fs.mutex.RUnlock() defer fs.mutex.RUnlock()
i := -1 // only used for logging info candidates := fs.getWatchersByTopic(env.Topic)
for _, watcher := range fs.watchers { for _, watcher := range candidates {
i++
if p2pMessage && !watcher.AllowP2P { if p2pMessage && !watcher.AllowP2P {
log.Trace(fmt.Sprintf("msg [%x], filter [%d]: p2p messages are not allowed", env.Hash(), i)) log.Trace(fmt.Sprintf("msg [%x], filter [%s]: p2p messages are not allowed", env.Hash(), watcher.id))
continue continue
} }
@ -128,10 +172,10 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
if match { if match {
msg = env.Open(watcher) msg = env.Open(watcher)
if msg == nil { if msg == nil {
log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", i) log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", watcher.id)
} }
} else { } else {
log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", i) log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", watcher.id)
} }
} }

Loading…
Cancel
Save