diff --git a/whisper/whisperv6/api.go b/whisper/whisperv6/api.go index 3dddb69539..0e8490b419 100644 --- a/whisper/whisperv6/api.go +++ b/whisper/whisperv6/api.go @@ -116,12 +116,17 @@ func (api *PublicWhisperAPI) SetMaxMessageSize(ctx context.Context, size uint32) return true, api.w.SetMaxMessageSize(size) } -// SetMinPow sets the minimum PoW for a message before it is accepted. +// SetMinPow sets the minimum PoW, and notifies the peers. func (api *PublicWhisperAPI) SetMinPoW(ctx context.Context, pow float64) (bool, error) { return true, api.w.SetMinimumPoW(pow) } -// MarkTrustedPeer marks a peer trusted. , which will allow it to send historic (expired) messages. +// SetBloomFilter sets the new value of bloom filter, and notifies the peers. +func (api *PublicWhisperAPI) SetBloomFilter(ctx context.Context, bloom hexutil.Bytes) (bool, error) { + return true, api.w.SetBloomFilter(bloom) +} + +// MarkTrustedPeer marks a peer trusted, which will allow it to send historic (expired) messages. // Note: This function is not adding new nodes, the node needs to exists as a peer. func (api *PublicWhisperAPI) MarkTrustedPeer(ctx context.Context, enode string) (bool, error) { n, err := discover.ParseNode(enode) diff --git a/whisper/whisperv6/doc.go b/whisper/whisperv6/doc.go index 2a4911d65a..da1b4ee5ba 100644 --- a/whisper/whisperv6/doc.go +++ b/whisper/whisperv6/doc.go @@ -35,7 +35,6 @@ import ( ) const ( - EnvelopeVersion = uint64(0) ProtocolVersion = uint64(6) ProtocolVersionStr = "6.0" ProtocolName = "shh" @@ -52,11 +51,14 @@ const ( paddingMask = byte(3) signatureFlag = byte(4) - TopicLength = 4 - signatureLength = 65 - aesKeyLength = 32 - AESNonceLength = 12 - keyIdSize = 32 + TopicLength = 4 // in bytes + signatureLength = 65 // in bytes + aesKeyLength = 32 // in bytes + AESNonceLength = 12 // in bytes + keyIdSize = 32 // in bytes + bloomFilterSize = 64 // in bytes + + EnvelopeHeaderLength = 20 MaxMessageSize = uint32(10 * 1024 * 1024) // maximum accepted size of a message. DefaultMaxMessageSize = uint32(1024 * 1024) @@ -68,10 +70,8 @@ const ( expirationCycle = time.Second transmissionCycle = 300 * time.Millisecond - DefaultTTL = 50 // seconds - SynchAllowance = 10 // seconds - - EnvelopeHeaderLength = 20 + DefaultTTL = 50 // seconds + DefaultSyncAllowance = 10 // seconds ) type unknownVersionError uint64 diff --git a/whisper/whisperv6/envelope.go b/whisper/whisperv6/envelope.go index 676df669bf..9ed712b934 100644 --- a/whisper/whisperv6/envelope.go +++ b/whisper/whisperv6/envelope.go @@ -42,9 +42,11 @@ type Envelope struct { Data []byte Nonce uint64 - pow float64 // Message-specific PoW as described in the Whisper specification. - hash common.Hash // Cached hash of the envelope to avoid rehashing every time. - // Don't access hash directly, use Hash() function instead. + pow float64 // Message-specific PoW as described in the Whisper specification. + + // the following variables should not be accessed directly, use the corresponding function instead: Hash(), Bloom() + hash common.Hash // Cached hash of the envelope to avoid rehashing every time. + bloom []byte } // size returns the size of envelope as it is sent (i.e. public fields only) @@ -227,3 +229,30 @@ func (e *Envelope) Open(watcher *Filter) (msg *ReceivedMessage) { } return msg } + +// Bloom maps 4-bytes Topic into 64-byte bloom filter with 3 bits set (at most). +func (e *Envelope) Bloom() []byte { + if e.bloom == nil { + e.bloom = TopicToBloom(e.Topic) + } + return e.bloom +} + +// TopicToBloom converts the topic (4 bytes) to the bloom filter (64 bytes) +func TopicToBloom(topic TopicType) []byte { + b := make([]byte, bloomFilterSize) + var index [3]int + for j := 0; j < 3; j++ { + index[j] = int(topic[j]) + if (topic[3] & (1 << uint(j))) != 0 { + index[j] += 256 + } + } + + for j := 0; j < 3; j++ { + byteIndex := index[j] / 8 + bitIndex := index[j] % 8 + b[byteIndex] = (1 << uint(bitIndex)) + } + return b +} diff --git a/whisper/whisperv6/peer.go b/whisper/whisperv6/peer.go index 65e0c77b0c..08071c0f77 100644 --- a/whisper/whisperv6/peer.go +++ b/whisper/whisperv6/peer.go @@ -36,6 +36,7 @@ type Peer struct { trusted bool powRequirement float64 + bloomFilter []byte // may contain nil in case of full node known *set.Set // Messages already known by the peer to avoid wasting bandwidth @@ -74,8 +75,12 @@ func (p *Peer) handshake() error { // Send the handshake status message asynchronously errc := make(chan error, 1) go func() { - errc <- p2p.Send(p.ws, statusCode, ProtocolVersion) + pow := p.host.MinPow() + powConverted := math.Float64bits(pow) + bloom := p.host.BloomFilter() + errc <- p2p.SendItems(p.ws, statusCode, ProtocolVersion, powConverted, bloom) }() + // Fetch the remote status packet and verify protocol match packet, err := p.ws.ReadMsg() if err != nil { @@ -85,14 +90,42 @@ func (p *Peer) handshake() error { return fmt.Errorf("peer [%x] sent packet %x before status packet", p.ID(), packet.Code) } s := rlp.NewStream(packet.Payload, uint64(packet.Size)) - peerVersion, err := s.Uint() + _, err = s.List() if err != nil { return fmt.Errorf("peer [%x] sent bad status message: %v", p.ID(), err) } + peerVersion, err := s.Uint() + if err != nil { + return fmt.Errorf("peer [%x] sent bad status message (unable to decode version): %v", p.ID(), err) + } if peerVersion != ProtocolVersion { return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", p.ID(), peerVersion, ProtocolVersion) } - // Wait until out own status is consumed too + + // only version is mandatory, subsequent parameters are optional + powRaw, err := s.Uint() + if err == nil { + pow := math.Float64frombits(powRaw) + if math.IsInf(pow, 0) || math.IsNaN(pow) || pow < 0.0 { + return fmt.Errorf("peer [%x] sent bad status message: invalid pow", p.ID()) + } + p.powRequirement = pow + + var bloom []byte + err = s.Decode(&bloom) + if err == nil { + sz := len(bloom) + if sz != bloomFilterSize && sz != 0 { + return fmt.Errorf("peer [%x] sent bad status message: wrong bloom filter size %d", p.ID(), sz) + } + if isFullNode(bloom) { + p.bloomFilter = nil + } else { + p.bloomFilter = bloom + } + } + } + if err := <-errc; err != nil { return fmt.Errorf("peer [%x] failed to send status packet: %v", p.ID(), err) } @@ -156,7 +189,7 @@ func (p *Peer) broadcast() error { envelopes := p.host.Envelopes() bundle := make([]*Envelope, 0, len(envelopes)) for _, envelope := range envelopes { - if !p.marked(envelope) && envelope.PoW() >= p.powRequirement { + if !p.marked(envelope) && envelope.PoW() >= p.powRequirement && p.bloomMatch(envelope) { bundle = append(bundle, envelope) } } @@ -186,3 +219,16 @@ func (p *Peer) notifyAboutPowRequirementChange(pow float64) error { i := math.Float64bits(pow) return p2p.Send(p.ws, powRequirementCode, i) } + +func (p *Peer) notifyAboutBloomFilterChange(bloom []byte) error { + return p2p.Send(p.ws, bloomFilterExCode, bloom) +} + +func (p *Peer) bloomMatch(env *Envelope) bool { + if p.bloomFilter == nil { + // no filter - full node, accepts all envelops + return true + } + + return bloomFilterMatch(p.bloomFilter, env.Bloom()) +} diff --git a/whisper/whisperv6/peer_test.go b/whisper/whisperv6/peer_test.go index 599a479be4..8a65cb7143 100644 --- a/whisper/whisperv6/peer_test.go +++ b/whisper/whisperv6/peer_test.go @@ -20,6 +20,7 @@ import ( "bytes" "crypto/ecdsa" "fmt" + mrand "math/rand" "net" "sync" "testing" @@ -87,6 +88,9 @@ var nodes [NumNodes]*TestNode var sharedKey []byte = []byte("some arbitrary data here") var sharedTopic TopicType = TopicType{0xF, 0x1, 0x2, 0} var expectedMessage []byte = []byte("per rectum ad astra") +var masterBloomFilter []byte +var masterPow = 0.00000001 +var round int = 1 func TestSimulation(t *testing.T) { // create a chain of whisper nodes, @@ -104,8 +108,13 @@ func TestSimulation(t *testing.T) { // check if each node have received and decrypted exactly one message checkPropagation(t, true) - // send protocol-level messages (powRequirementCode) and check the new PoW requirement values - powReqExchange(t) + // check if Status message was correctly decoded + checkBloomFilterExchange(t) + checkPowExchange(t) + + // send new pow and bloom exchange messages + resetParams(t) + round++ // node #1 sends one expected (decryptable) message sendMsg(t, true, 1) @@ -113,18 +122,65 @@ func TestSimulation(t *testing.T) { // check if each node (except node #0) have received and decrypted exactly one message checkPropagation(t, false) + for i := 1; i < NumNodes; i++ { + time.Sleep(20 * time.Millisecond) + sendMsg(t, true, i) + } + + // check if corresponding protocol-level messages were correctly decoded + checkPowExchangeForNodeZero(t) + checkBloomFilterExchange(t) + stopServers() } +func resetParams(t *testing.T) { + // change pow only for node zero + masterPow = 7777777.0 + nodes[0].shh.SetMinimumPoW(masterPow) + + // change bloom for all nodes + masterBloomFilter = TopicToBloom(sharedTopic) + for i := 0; i < NumNodes; i++ { + nodes[i].shh.SetBloomFilter(masterBloomFilter) + } +} + +func initBloom(t *testing.T) { + masterBloomFilter = make([]byte, bloomFilterSize) + _, err := mrand.Read(masterBloomFilter) + if err != nil { + t.Fatalf("rand failed: %s.", err) + } + + msgBloom := TopicToBloom(sharedTopic) + masterBloomFilter = addBloom(masterBloomFilter, msgBloom) + for i := 0; i < 32; i++ { + masterBloomFilter[i] = 0xFF + } + + if !bloomFilterMatch(masterBloomFilter, msgBloom) { + t.Fatalf("bloom mismatch on initBloom.") + } +} + func initialize(t *testing.T) { + initBloom(t) + var err error ip := net.IPv4(127, 0, 0, 1) port0 := 30303 for i := 0; i < NumNodes; i++ { var node TestNode + b := make([]byte, bloomFilterSize) + copy(b, masterBloomFilter) node.shh = New(&DefaultConfig) - node.shh.SetMinimumPowTest(0.00000001) + node.shh.SetMinimumPoW(masterPow) + node.shh.SetBloomFilter(b) + if !bytes.Equal(node.shh.BloomFilter(), masterBloomFilter) { + t.Fatalf("bloom mismatch on init.") + } node.shh.Start(nil) topics := make([]TopicType, 0) topics = append(topics, sharedTopic) @@ -206,7 +262,7 @@ func checkPropagation(t *testing.T, includingNodeZero bool) { for i := first; i < NumNodes; i++ { f := nodes[i].shh.GetFilter(nodes[i].filerId) if f == nil { - t.Fatalf("failed to get filterId %s from node %d.", nodes[i].filerId, i) + t.Fatalf("failed to get filterId %s from node %d, round %d.", nodes[i].filerId, i, round) } mail := f.Retrieve() @@ -332,34 +388,43 @@ func TestPeerBasic(t *testing.T) { } } -func powReqExchange(t *testing.T) { +func checkPowExchangeForNodeZero(t *testing.T) { + cnt := 0 for i, node := range nodes { for peer := range node.shh.peers { - if peer.powRequirement > 1000.0 { - t.Fatalf("node %d: one of the peers' pow requirement is too big (%f).", i, peer.powRequirement) + if peer.peer.ID() == discover.PubkeyID(&nodes[0].id.PublicKey) { + cnt++ + if peer.powRequirement != masterPow { + t.Fatalf("node %d: failed to set the new pow requirement.", i) + } } } } + if cnt == 0 { + t.Fatalf("no matching peers found.") + } +} - const pow float64 = 7777777.0 - nodes[0].shh.SetMinimumPoW(pow) - - // wait until all the messages are delivered - time.Sleep(64 * time.Millisecond) - - cnt := 0 +func checkPowExchange(t *testing.T) { for i, node := range nodes { for peer := range node.shh.peers { - if peer.peer.ID() == discover.PubkeyID(&nodes[0].id.PublicKey) { - cnt++ - if peer.powRequirement != pow { - t.Fatalf("node %d: failed to set the new pow requirement.", i) + if peer.peer.ID() != discover.PubkeyID(&nodes[0].id.PublicKey) { + if peer.powRequirement != masterPow { + t.Fatalf("node %d: failed to exchange pow requirement in round %d; expected %f, got %f", + i, round, masterPow, peer.powRequirement) } } } } +} - if cnt == 0 { - t.Fatalf("no matching peers found.") +func checkBloomFilterExchange(t *testing.T) { + for i, node := range nodes { + for peer := range node.shh.peers { + if !bytes.Equal(peer.bloomFilter, masterBloomFilter) { + t.Fatalf("node %d: failed to exchange bloom filter requirement in round %d. \n%x expected \n%x got", + i, round, masterBloomFilter, peer.bloomFilter) + } + } } } diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go index 492591486b..bc89aadccd 100644 --- a/whisper/whisperv6/whisper.go +++ b/whisper/whisperv6/whisper.go @@ -48,9 +48,12 @@ type Statistics struct { } const ( - minPowIdx = iota // Minimal PoW required by the whisper node - maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node - overflowIdx = iota // Indicator of message queue overflow + maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node + overflowIdx // Indicator of message queue overflow + minPowIdx // Minimal PoW required by the whisper node + minPowToleranceIdx // Minimal PoW tolerated by the whisper node for a limited time + bloomFilterIdx // Bloom filter for topics of interest for this node + bloomFilterToleranceIdx // Bloom filter tolerated by the whisper node for a limited time ) // Whisper represents a dark communication interface through the Ethereum @@ -76,7 +79,7 @@ type Whisper struct { settings syncmap.Map // holds configuration settings that can be dynamically changed - reactionAllowance int // maximum time in seconds allowed to process the whisper-related messages + syncAllowance int // maximum time in seconds allowed to process the whisper-related messages statsMu sync.Mutex // guard stats stats Statistics // Statistics of whisper node @@ -91,15 +94,15 @@ func New(cfg *Config) *Whisper { } whisper := &Whisper{ - privateKeys: make(map[string]*ecdsa.PrivateKey), - symKeys: make(map[string][]byte), - envelopes: make(map[common.Hash]*Envelope), - expirations: make(map[uint32]*set.SetNonTS), - peers: make(map[*Peer]struct{}), - messageQueue: make(chan *Envelope, messageQueueLimit), - p2pMsgQueue: make(chan *Envelope, messageQueueLimit), - quit: make(chan struct{}), - reactionAllowance: SynchAllowance, + privateKeys: make(map[string]*ecdsa.PrivateKey), + symKeys: make(map[string][]byte), + envelopes: make(map[common.Hash]*Envelope), + expirations: make(map[uint32]*set.SetNonTS), + peers: make(map[*Peer]struct{}), + messageQueue: make(chan *Envelope, messageQueueLimit), + p2pMsgQueue: make(chan *Envelope, messageQueueLimit), + quit: make(chan struct{}), + syncAllowance: DefaultSyncAllowance, } whisper.filters = NewFilters(whisper) @@ -126,11 +129,55 @@ func New(cfg *Config) *Whisper { return whisper } +// MinPow returns the PoW value required by this node. func (w *Whisper) MinPow() float64 { - val, _ := w.settings.Load(minPowIdx) + val, exist := w.settings.Load(minPowIdx) + if !exist || val == nil { + return DefaultMinimumPoW + } + v, ok := val.(float64) + if !ok { + log.Error("Error loading minPowIdx, using default") + return DefaultMinimumPoW + } + return v +} + +// MinPowTolerance returns the value of minimum PoW which is tolerated for a limited +// time after PoW was changed. If sufficient time have elapsed or no change of PoW +// have ever occurred, the return value will be the same as return value of MinPow(). +func (w *Whisper) MinPowTolerance() float64 { + val, exist := w.settings.Load(minPowToleranceIdx) + if !exist || val == nil { + return DefaultMinimumPoW + } return val.(float64) } +// BloomFilter returns the aggregated bloom filter for all the topics of interest. +// The nodes are required to send only messages that match the advertised bloom filter. +// If a message does not match the bloom, it will tantamount to spam, and the peer will +// be disconnected. +func (w *Whisper) BloomFilter() []byte { + val, exist := w.settings.Load(bloomFilterIdx) + if !exist || val == nil { + return nil + } + return val.([]byte) +} + +// BloomFilterTolerance returns the bloom filter which is tolerated for a limited +// time after new bloom was advertised to the peers. If sufficient time have elapsed +// or no change of bloom filter have ever occurred, the return value will be the same +// as return value of BloomFilter(). +func (w *Whisper) BloomFilterTolerance() []byte { + val, exist := w.settings.Load(bloomFilterToleranceIdx) + if !exist || val == nil { + return nil + } + return val.([]byte) +} + // MaxMessageSize returns the maximum accepted message size. func (w *Whisper) MaxMessageSize() uint32 { val, _ := w.settings.Load(maxMsgSizeIdx) @@ -180,18 +227,40 @@ func (w *Whisper) SetMaxMessageSize(size uint32) error { return nil } +// SetBloomFilter sets the new bloom filter +func (w *Whisper) SetBloomFilter(bloom []byte) error { + if len(bloom) != bloomFilterSize { + return fmt.Errorf("invalid bloom filter size: %d", len(bloom)) + } + + b := make([]byte, bloomFilterSize) + copy(b, bloom) + + w.settings.Store(bloomFilterIdx, b) + w.notifyPeersAboutBloomFilterChange(b) + + go func() { + // allow some time before all the peers have processed the notification + time.Sleep(time.Duration(w.syncAllowance) * time.Second) + w.settings.Store(bloomFilterToleranceIdx, b) + }() + + return nil +} + // SetMinimumPoW sets the minimal PoW required by this node func (w *Whisper) SetMinimumPoW(val float64) error { if val < 0.0 { return fmt.Errorf("invalid PoW: %f", val) } + w.settings.Store(minPowIdx, val) w.notifyPeersAboutPowRequirementChange(val) go func() { // allow some time before all the peers have processed the notification - time.Sleep(time.Duration(w.reactionAllowance) * time.Second) - w.settings.Store(minPowIdx, val) + time.Sleep(time.Duration(w.syncAllowance) * time.Second) + w.settings.Store(minPowToleranceIdx, val) }() return nil @@ -199,21 +268,13 @@ func (w *Whisper) SetMinimumPoW(val float64) error { // SetMinimumPoW sets the minimal PoW in test environment func (w *Whisper) SetMinimumPowTest(val float64) { - w.notifyPeersAboutPowRequirementChange(val) w.settings.Store(minPowIdx, val) + w.notifyPeersAboutPowRequirementChange(val) + w.settings.Store(minPowToleranceIdx, val) } func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) { - arr := make([]*Peer, len(w.peers)) - i := 0 - - w.peerMu.Lock() - for p := range w.peers { - arr[i] = p - i++ - } - w.peerMu.Unlock() - + arr := w.getPeers() for _, p := range arr { err := p.notifyAboutPowRequirementChange(pow) if err != nil { @@ -221,11 +282,37 @@ func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) { err = p.notifyAboutPowRequirementChange(pow) } if err != nil { - log.Warn("oversized message received", "peer", p.ID(), "error", err) + log.Warn("failed to notify peer about new pow requirement", "peer", p.ID(), "error", err) + } + } +} + +func (w *Whisper) notifyPeersAboutBloomFilterChange(bloom []byte) { + arr := w.getPeers() + for _, p := range arr { + err := p.notifyAboutBloomFilterChange(bloom) + if err != nil { + // allow one retry + err = p.notifyAboutBloomFilterChange(bloom) + } + if err != nil { + log.Warn("failed to notify peer about new bloom filter", "peer", p.ID(), "error", err) } } } +func (w *Whisper) getPeers() []*Peer { + arr := make([]*Peer, len(w.peers)) + i := 0 + w.peerMu.Lock() + for p := range w.peers { + arr[i] = p + i++ + } + w.peerMu.Unlock() + return arr +} + // getPeer retrieves peer by ID func (w *Whisper) getPeer(peerID []byte) (*Peer, error) { w.peerMu.Lock() @@ -459,7 +546,28 @@ func (w *Whisper) GetSymKey(id string) ([]byte, error) { // Subscribe installs a new message handler used for filtering, decrypting // and subsequent storing of incoming messages. func (w *Whisper) Subscribe(f *Filter) (string, error) { - return w.filters.Install(f) + s, err := w.filters.Install(f) + if err == nil { + w.updateBloomFilter(f) + } + return s, err +} + +// updateBloomFilter recalculates the new value of bloom filter, +// and informs the peers if necessary. +func (w *Whisper) updateBloomFilter(f *Filter) { + aggregate := make([]byte, bloomFilterSize) + for _, t := range f.Topics { + top := BytesToTopic(t) + b := TopicToBloom(top) + aggregate = addBloom(aggregate, b) + } + + if !bloomFilterMatch(w.BloomFilter(), aggregate) { + // existing bloom filter must be updated + aggregate = addBloom(w.BloomFilter(), aggregate) + w.SetBloomFilter(aggregate) + } } // GetFilter returns the filter by id. @@ -592,7 +700,21 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { } p.powRequirement = f case bloomFilterExCode: - // to be implemented + var bloom []byte + err := packet.Decode(&bloom) + if err == nil && len(bloom) != bloomFilterSize { + err = fmt.Errorf("wrong bloom filter size %d", len(bloom)) + } + + if err != nil { + log.Warn("failed to decode bloom filter exchange message, peer will be disconnected", "peer", p.peer.ID(), "err", err) + return errors.New("invalid bloom filter exchange message") + } + if isFullNode(bloom) { + p.bloomFilter = nil + } else { + p.bloomFilter = bloom + } case p2pMessageCode: // peer-to-peer message, sent directly to peer bypassing PoW checks, etc. // this message is not supposed to be forwarded to other peers, and @@ -633,7 +755,7 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { sent := envelope.Expiry - envelope.TTL if sent > now { - if sent-SynchAllowance > now { + if sent-DefaultSyncAllowance > now { return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash()) } else { // recalculate PoW, adjusted for the time difference, plus one second for latency @@ -642,7 +764,7 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { } if envelope.Expiry < now { - if envelope.Expiry+SynchAllowance*2 < now { + if envelope.Expiry+DefaultSyncAllowance*2 < now { return false, fmt.Errorf("very old message") } else { log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex()) @@ -655,11 +777,22 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { } if envelope.PoW() < wh.MinPow() { - log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex()) - return false, nil // drop envelope without error for now + // maybe the value was recently changed, and the peers did not adjust yet. + // in this case the previous value is retrieved by MinPowTolerance() + // for a short period of peer synchronization. + if envelope.PoW() < wh.MinPowTolerance() { + return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex()) + } + } - // once the status message includes the PoW requirement, an error should be returned here: - //return false, fmt.Errorf("envelope with low PoW dropped: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex()) + if !bloomFilterMatch(wh.BloomFilter(), envelope.Bloom()) { + // maybe the value was recently changed, and the peers did not adjust yet. + // in this case the previous value is retrieved by BloomFilterTolerance() + // for a short period of peer synchronization. + if !bloomFilterMatch(wh.BloomFilterTolerance(), envelope.Bloom()) { + return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x", + envelope.Hash().Hex(), wh.BloomFilter(), envelope.Bloom(), envelope.Topic) + } } hash := envelope.Hash() @@ -897,3 +1030,40 @@ func GenerateRandomID() (id string, err error) { id = common.Bytes2Hex(buf) return id, err } + +func isFullNode(bloom []byte) bool { + if bloom == nil { + return true + } + for _, b := range bloom { + if b != 255 { + return false + } + } + return true +} + +func bloomFilterMatch(filter, sample []byte) bool { + if filter == nil { + // full node, accepts all messages + return true + } + + for i := 0; i < bloomFilterSize; i++ { + f := filter[i] + s := sample[i] + if (f | s) != f { + return false + } + } + + return true +} + +func addBloom(a, b []byte) []byte { + c := make([]byte, bloomFilterSize) + for i := 0; i < bloomFilterSize; i++ { + c[i] = a[i] | b[i] + } + return c +} diff --git a/whisper/whisperv6/whisper_test.go b/whisper/whisperv6/whisper_test.go index b391a1161b..fa14acb1b1 100644 --- a/whisper/whisperv6/whisper_test.go +++ b/whisper/whisperv6/whisper_test.go @@ -843,3 +843,64 @@ func TestSymmetricSendKeyMismatch(t *testing.T) { t.Fatalf("received a message when keys weren't matching") } } + +func TestBloom(t *testing.T) { + topic := TopicType{0, 0, 255, 6} + b := TopicToBloom(topic) + x := make([]byte, bloomFilterSize) + x[0] = byte(1) + x[32] = byte(1) + x[bloomFilterSize-1] = byte(128) + if !bloomFilterMatch(x, b) || !bloomFilterMatch(b, x) { + t.Fatalf("bloom filter does not match the mask") + } + + _, err := mrand.Read(b) + if err != nil { + t.Fatalf("math rand error") + } + _, err = mrand.Read(x) + if err != nil { + t.Fatalf("math rand error") + } + if !bloomFilterMatch(b, b) { + t.Fatalf("bloom filter does not match self") + } + x = addBloom(x, b) + if !bloomFilterMatch(x, b) { + t.Fatalf("bloom filter does not match combined bloom") + } + if !isFullNode(nil) { + t.Fatalf("isFullNode did not recognize nil as full node") + } + x[17] = 254 + if isFullNode(x) { + t.Fatalf("isFullNode false positive") + } + for i := 0; i < bloomFilterSize; i++ { + b[i] = byte(255) + } + if !isFullNode(b) { + t.Fatalf("isFullNode false negative") + } + if bloomFilterMatch(x, b) { + t.Fatalf("bloomFilterMatch false positive") + } + if !bloomFilterMatch(b, x) { + t.Fatalf("bloomFilterMatch false negative") + } + + w := New(&DefaultConfig) + f := w.BloomFilter() + if f != nil { + t.Fatalf("wrong bloom on creation") + } + err = w.SetBloomFilter(x) + if err != nil { + t.Fatalf("failed to set bloom filter: %s", err) + } + f = w.BloomFilter() + if !bloomFilterMatch(f, x) || !bloomFilterMatch(x, f) { + t.Fatalf("retireved wrong bloom filter") + } +}