Official Go implementation of the Ethereum protocol
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go-ethereum/swarm/pss/pss.go

1049 lines
32 KiB

// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package pss
import (
"bytes"
"context"
"crypto/ecdsa"
"crypto/rand"
"errors"
"fmt"
"hash"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/sha3"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
all: new p2p node representation (#17643) Package p2p/enode provides a generalized representation of p2p nodes which can contain arbitrary information in key/value pairs. It is also the new home for the node database. The "v4" identity scheme is also moved here from p2p/enr to remove the dependency on Ethereum crypto from that package. Record signature handling is changed significantly. The identity scheme registry is removed and acceptable schemes must be passed to any method that needs identity. This means records must now be validated explicitly after decoding. The enode API is designed to make signature handling easy and safe: most APIs around the codebase work with enode.Node, which is a wrapper around a valid record. Going from enr.Record to enode.Node requires a valid signature. * p2p/discover: port to p2p/enode This ports the discovery code to the new node representation in p2p/enode. The wire protocol is unchanged, this can be considered a refactoring change. The Kademlia table can now deal with nodes using an arbitrary identity scheme. This requires a few incompatible API changes: - Table.Lookup is not available anymore. It used to take a public key as argument because v4 protocol requires one. Its replacement is LookupRandom. - Table.Resolve takes *enode.Node instead of NodeID. This is also for v4 protocol compatibility because nodes cannot be looked up by ID alone. - Types Node and NodeID are gone. Further commits in the series will be fixes all over the the codebase to deal with those removals. * p2p: port to p2p/enode and discovery changes This adapts package p2p to the changes in p2p/discover. All uses of discover.Node and discover.NodeID are replaced by their equivalents from p2p/enode. New API is added to retrieve the enode.Node instance of a peer. The behavior of Server.Self with discovery disabled is improved. It now tries much harder to report a working IP address, falling back to 127.0.0.1 if no suitable address can be determined through other means. These changes were needed for tests of other packages later in the series. * p2p/simulations, p2p/testing: port to p2p/enode No surprises here, mostly replacements of discover.Node, discover.NodeID with their new equivalents. The 'interesting' API changes are: - testing.ProtocolSession tracks complete nodes, not just their IDs. - adapters.NodeConfig has a new method to create a complete node. These changes were needed to make swarm tests work. Note that the NodeID change makes the code incompatible with old simulation snapshots. * whisper/whisperv5, whisper/whisperv6: port to p2p/enode This port was easy because whisper uses []byte for node IDs and URL strings in the API. * eth: port to p2p/enode Again, easy to port because eth uses strings for node IDs and doesn't care about node information in any way. * les: port to p2p/enode Apart from replacing discover.NodeID with enode.ID, most changes are in the server pool code. It now deals with complete nodes instead of (Pubkey, IP, Port) triples. The database format is unchanged for now, but we should probably change it to use the node database later. * node: port to p2p/enode This change simply replaces discover.Node and discover.NodeID with their new equivalents. * swarm/network: port to p2p/enode Swarm has its own node address representation, BzzAddr, containing both an overlay address (the hash of a secp256k1 public key) and an underlay address (enode:// URL). There are no changes to the BzzAddr format in this commit, but certain operations such as creating a BzzAddr from a node ID are now impossible because node IDs aren't public keys anymore. Most swarm-related changes in the series remove uses of NewAddrFromNodeID, replacing it with NewAddr which takes a complete node as argument. ToOverlayAddr is removed because we can just use the node ID directly.
6 years ago
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/pot"
"github.com/ethereum/go-ethereum/swarm/storage"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
)
const (
defaultPaddingByteSize = 16
DefaultMsgTTL = time.Second * 120
defaultDigestCacheTTL = time.Second * 10
defaultSymKeyCacheCapacity = 512
digestLength = 32 // byte length of digest used for pss cache (currently same as swarm chunk hash)
defaultWhisperWorkTime = 3
defaultWhisperPoW = 0.0000000001
defaultMaxMsgSize = 1024 * 1024
defaultCleanInterval = time.Second * 60 * 10
defaultOutboxCapacity = 100000
pssProtocolName = "pss"
pssVersion = 2
hasherCount = 8
)
var (
addressLength = len(pot.Address{})
)
// cache is used for preventing backwards routing
// will also be instrumental in flood guard mechanism
// and mailbox implementation
type pssCacheEntry struct {
expiresAt time.Time
}
// abstraction to enable access to p2p.protocols.Peer.Send
type senderPeer interface {
Info() *p2p.PeerInfo
all: new p2p node representation (#17643) Package p2p/enode provides a generalized representation of p2p nodes which can contain arbitrary information in key/value pairs. It is also the new home for the node database. The "v4" identity scheme is also moved here from p2p/enr to remove the dependency on Ethereum crypto from that package. Record signature handling is changed significantly. The identity scheme registry is removed and acceptable schemes must be passed to any method that needs identity. This means records must now be validated explicitly after decoding. The enode API is designed to make signature handling easy and safe: most APIs around the codebase work with enode.Node, which is a wrapper around a valid record. Going from enr.Record to enode.Node requires a valid signature. * p2p/discover: port to p2p/enode This ports the discovery code to the new node representation in p2p/enode. The wire protocol is unchanged, this can be considered a refactoring change. The Kademlia table can now deal with nodes using an arbitrary identity scheme. This requires a few incompatible API changes: - Table.Lookup is not available anymore. It used to take a public key as argument because v4 protocol requires one. Its replacement is LookupRandom. - Table.Resolve takes *enode.Node instead of NodeID. This is also for v4 protocol compatibility because nodes cannot be looked up by ID alone. - Types Node and NodeID are gone. Further commits in the series will be fixes all over the the codebase to deal with those removals. * p2p: port to p2p/enode and discovery changes This adapts package p2p to the changes in p2p/discover. All uses of discover.Node and discover.NodeID are replaced by their equivalents from p2p/enode. New API is added to retrieve the enode.Node instance of a peer. The behavior of Server.Self with discovery disabled is improved. It now tries much harder to report a working IP address, falling back to 127.0.0.1 if no suitable address can be determined through other means. These changes were needed for tests of other packages later in the series. * p2p/simulations, p2p/testing: port to p2p/enode No surprises here, mostly replacements of discover.Node, discover.NodeID with their new equivalents. The 'interesting' API changes are: - testing.ProtocolSession tracks complete nodes, not just their IDs. - adapters.NodeConfig has a new method to create a complete node. These changes were needed to make swarm tests work. Note that the NodeID change makes the code incompatible with old simulation snapshots. * whisper/whisperv5, whisper/whisperv6: port to p2p/enode This port was easy because whisper uses []byte for node IDs and URL strings in the API. * eth: port to p2p/enode Again, easy to port because eth uses strings for node IDs and doesn't care about node information in any way. * les: port to p2p/enode Apart from replacing discover.NodeID with enode.ID, most changes are in the server pool code. It now deals with complete nodes instead of (Pubkey, IP, Port) triples. The database format is unchanged for now, but we should probably change it to use the node database later. * node: port to p2p/enode This change simply replaces discover.Node and discover.NodeID with their new equivalents. * swarm/network: port to p2p/enode Swarm has its own node address representation, BzzAddr, containing both an overlay address (the hash of a secp256k1 public key) and an underlay address (enode:// URL). There are no changes to the BzzAddr format in this commit, but certain operations such as creating a BzzAddr from a node ID are now impossible because node IDs aren't public keys anymore. Most swarm-related changes in the series remove uses of NewAddrFromNodeID, replacing it with NewAddr which takes a complete node as argument. ToOverlayAddr is removed because we can just use the node ID directly.
6 years ago
ID() enode.ID
Address() []byte
Send(context.Context, interface{}) error
}
// per-key peer related information
// member `protected` prevents garbage collection of the instance
type pssPeer struct {
lastSeen time.Time
address PssAddress
protected bool
}
// Pss configuration parameters
type PssParams struct {
MsgTTL time.Duration
CacheTTL time.Duration
privateKey *ecdsa.PrivateKey
SymKeyCacheCapacity int
AllowRaw bool // If true, enables sending and receiving messages without builtin pss encryption
}
// Sane defaults for Pss
func NewPssParams() *PssParams {
return &PssParams{
MsgTTL: DefaultMsgTTL,
CacheTTL: defaultDigestCacheTTL,
SymKeyCacheCapacity: defaultSymKeyCacheCapacity,
}
}
func (params *PssParams) WithPrivateKey(privatekey *ecdsa.PrivateKey) *PssParams {
params.privateKey = privatekey
return params
}
// Toplevel pss object, takes care of message sending, receiving, decryption and encryption, message handler dispatchers and message forwarding.
//
// Implements node.Service
type Pss struct {
*network.Kademlia // we can get the Kademlia address from this
privateKey *ecdsa.PrivateKey // pss can have it's own independent key
w *whisper.Whisper // key and encryption backend
auxAPIs []rpc.API // builtins (handshake, test) can add APIs
// sending and forwarding
fwdPool map[string]*protocols.Peer // keep track of all peers sitting on the pssmsg routing layer
fwdPoolMu sync.RWMutex
fwdCache map[pssDigest]pssCacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg
fwdCacheMu sync.RWMutex
cacheTTL time.Duration // how long to keep messages in fwdCache (not implemented)
msgTTL time.Duration
paddingByteSize int
capstring string
outbox chan *PssMsg
// keys and peers
pubKeyPool map[string]map[Topic]*pssPeer // mapping of hex public keys to peer address by topic.
pubKeyPoolMu sync.RWMutex
symKeyPool map[string]map[Topic]*pssPeer // mapping of symkeyids to peer address by topic.
symKeyPoolMu sync.RWMutex
symKeyDecryptCache []*string // fast lookup of symkeys recently used for decryption; last used is on top of stack
symKeyDecryptCacheCursor int // modular cursor pointing to last used, wraps on symKeyDecryptCache array
symKeyDecryptCacheCapacity int // max amount of symkeys to keep.
// message handling
handlers map[Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle()
handlersMu sync.RWMutex
hashPool sync.Pool
topicHandlerCaps map[Topic]*handlerCaps // caches capabilities of each topic's handlers (see handlerCap* consts in types.go)
// process
quitC chan struct{}
}
func (p *Pss) String() string {
return fmt.Sprintf("pss: addr %x, pubkey %v", p.BaseAddr(), common.ToHex(crypto.FromECDSAPub(&p.privateKey.PublicKey)))
}
// Creates a new Pss instance.
//
// In addition to params, it takes a swarm network Kademlia
// and a FileStore storage for message cache storage.
func NewPss(k *network.Kademlia, params *PssParams) (*Pss, error) {
if params.privateKey == nil {
return nil, errors.New("missing private key for pss")
}
cap := p2p.Cap{
Name: pssProtocolName,
Version: pssVersion,
}
ps := &Pss{
Kademlia: k,
privateKey: params.privateKey,
w: whisper.New(&whisper.DefaultConfig),
quitC: make(chan struct{}),
fwdPool: make(map[string]*protocols.Peer),
fwdCache: make(map[pssDigest]pssCacheEntry),
cacheTTL: params.CacheTTL,
msgTTL: params.MsgTTL,
paddingByteSize: defaultPaddingByteSize,
capstring: cap.String(),
outbox: make(chan *PssMsg, defaultOutboxCapacity),
pubKeyPool: make(map[string]map[Topic]*pssPeer),
symKeyPool: make(map[string]map[Topic]*pssPeer),
symKeyDecryptCache: make([]*string, params.SymKeyCacheCapacity),
symKeyDecryptCacheCapacity: params.SymKeyCacheCapacity,
handlers: make(map[Topic]map[*handler]bool),
topicHandlerCaps: make(map[Topic]*handlerCaps),
hashPool: sync.Pool{
New: func() interface{} {
return sha3.NewKeccak256()
},
},
}
for i := 0; i < hasherCount; i++ {
hashfunc := storage.MakeHashFunc(storage.DefaultHash)()
ps.hashPool.Put(hashfunc)
}
return ps, nil
}
/////////////////////////////////////////////////////////////////////
// SECTION: node.Service interface
/////////////////////////////////////////////////////////////////////
func (p *Pss) Start(srv *p2p.Server) error {
go func() {
ticker := time.NewTicker(defaultCleanInterval)
cacheTicker := time.NewTicker(p.cacheTTL)
defer ticker.Stop()
defer cacheTicker.Stop()
for {
select {
case <-cacheTicker.C:
p.cleanFwdCache()
case <-ticker.C:
p.cleanKeys()
case <-p.quitC:
return
}
}
}()
go func() {
for {
select {
case msg := <-p.outbox:
err := p.forward(msg)
if err != nil {
log.Error(err.Error())
metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1)
}
case <-p.quitC:
return
}
}
}()
log.Info("Started Pss")
log.Info("Loaded EC keys", "pubkey", common.ToHex(crypto.FromECDSAPub(p.PublicKey())), "secp256", common.ToHex(crypto.CompressPubkey(p.PublicKey())))
return nil
}
func (p *Pss) Stop() error {
log.Info("Pss shutting down")
close(p.quitC)
return nil
}
var pssSpec = &protocols.Spec{
Name: pssProtocolName,
Version: pssVersion,
MaxMsgSize: defaultMaxMsgSize,
Messages: []interface{}{
PssMsg{},
},
}
func (p *Pss) Protocols() []p2p.Protocol {
return []p2p.Protocol{
{
Name: pssSpec.Name,
Version: pssSpec.Version,
Length: pssSpec.Length(),
Run: p.Run,
},
}
}
func (p *Pss) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
pp := protocols.NewPeer(peer, rw, pssSpec)
p.fwdPoolMu.Lock()
p.fwdPool[peer.Info().ID] = pp
p.fwdPoolMu.Unlock()
return pp.Run(p.handlePssMsg)
}
func (p *Pss) APIs() []rpc.API {
apis := []rpc.API{
{
Namespace: "pss",
Version: "1.0",
Service: NewAPI(p),
Public: true,
},
}
apis = append(apis, p.auxAPIs...)
return apis
}
// add API methods to the pss API
// must be run before node is started
func (p *Pss) addAPI(api rpc.API) {
p.auxAPIs = append(p.auxAPIs, api)
}
// Returns the swarm Kademlia address of the pss node
func (p *Pss) BaseAddr() []byte {
return p.Kademlia.BaseAddr()
}
// Returns the pss node's public key
func (p *Pss) PublicKey() *ecdsa.PublicKey {
return &p.privateKey.PublicKey
}
/////////////////////////////////////////////////////////////////////
// SECTION: Message handling
/////////////////////////////////////////////////////////////////////
// Links a handler function to a Topic
//
// All incoming messages with an envelope Topic matching the
// topic specified will be passed to the given Handler function.
//
// There may be an arbitrary number of handler functions per topic.
//
// Returns a deregister function which needs to be called to
// deregister the handler,
func (p *Pss) Register(topic *Topic, hndlr *handler) func() {
p.handlersMu.Lock()
defer p.handlersMu.Unlock()
handlers := p.handlers[*topic]
if handlers == nil {
handlers = make(map[*handler]bool)
p.handlers[*topic] = handlers
log.Debug("registered handler", "caps", hndlr.caps)
}
if hndlr.caps == nil {
hndlr.caps = &handlerCaps{}
}
handlers[hndlr] = true
if _, ok := p.topicHandlerCaps[*topic]; !ok {
p.topicHandlerCaps[*topic] = &handlerCaps{}
}
if hndlr.caps.raw {
p.topicHandlerCaps[*topic].raw = true
}
if hndlr.caps.prox {
p.topicHandlerCaps[*topic].prox = true
}
return func() { p.deregister(topic, hndlr) }
}
func (p *Pss) deregister(topic *Topic, hndlr *handler) {
p.handlersMu.Lock()
defer p.handlersMu.Unlock()
handlers := p.handlers[*topic]
if len(handlers) > 1 {
delete(p.handlers, *topic)
// topic caps might have changed now that a handler is gone
caps := &handlerCaps{}
for h := range handlers {
if h.caps.raw {
caps.raw = true
}
if h.caps.prox {
caps.prox = true
}
}
p.topicHandlerCaps[*topic] = caps
return
}
delete(handlers, hndlr)
}
// get all registered handlers for respective topics
func (p *Pss) getHandlers(topic Topic) map[*handler]bool {
p.handlersMu.RLock()
defer p.handlersMu.RUnlock()
return p.handlers[topic]
}
// Filters incoming messages for processing or forwarding.
// Check if address partially matches
// If yes, it CAN be for us, and we process it
// Only passes error to pss protocol handler if payload is not valid pssmsg
func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error {
metrics.GetOrRegisterCounter("pss.handlepssmsg", nil).Inc(1)
pssmsg, ok := msg.(*PssMsg)
if !ok {
return fmt.Errorf("invalid message type. Expected *PssMsg, got %T ", msg)
}
log.Trace("handler", "self", label(p.Kademlia.BaseAddr()), "topic", label(pssmsg.Payload.Topic[:]))
if int64(pssmsg.Expire) < time.Now().Unix() {
metrics.GetOrRegisterCounter("pss.expire", nil).Inc(1)
log.Warn("pss filtered expired message", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", common.ToHex(pssmsg.To))
return nil
}
if p.checkFwdCache(pssmsg) {
log.Trace("pss relay block-cache match (process)", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", (common.ToHex(pssmsg.To)))
return nil
}
p.addFwdCache(pssmsg)
psstopic := Topic(pssmsg.Payload.Topic)
// raw is simplest handler contingency to check, so check that first
var isRaw bool
if pssmsg.isRaw() {
if _, ok := p.topicHandlerCaps[psstopic]; ok {
if !p.topicHandlerCaps[psstopic].raw {
log.Debug("No handler for raw message", "topic", psstopic)
return nil
}
}
isRaw = true
}
// check if we can be recipient:
// - no prox handler on message and partial address matches
// - prox handler on message and we are in prox regardless of partial address match
// store this result so we don't calculate again on every handler
var isProx bool
if _, ok := p.topicHandlerCaps[psstopic]; ok {
isProx = p.topicHandlerCaps[psstopic].prox
}
isRecipient := p.isSelfPossibleRecipient(pssmsg, isProx)
if !isRecipient {
log.Trace("pss was for someone else :'( ... forwarding", "pss", common.ToHex(p.BaseAddr()), "prox", isProx)
return p.enqueue(pssmsg)
}
log.Trace("pss for us, yay! ... let's process!", "pss", common.ToHex(p.BaseAddr()), "prox", isProx, "raw", isRaw, "topic", label(pssmsg.Payload.Topic[:]))
if err := p.process(pssmsg, isRaw, isProx); err != nil {
qerr := p.enqueue(pssmsg)
if qerr != nil {
return fmt.Errorf("process fail: processerr %v, queueerr: %v", err, qerr)
}
}
return nil
}
// Entry point to processing a message for which the current node can be the intended recipient.
// Attempts symmetric and asymmetric decryption with stored keys.
// Dispatches message to all handlers matching the message topic
func (p *Pss) process(pssmsg *PssMsg, raw bool, prox bool) error {
metrics.GetOrRegisterCounter("pss.process", nil).Inc(1)
var err error
var recvmsg *whisper.ReceivedMessage
var payload []byte
var from PssAddress
var asymmetric bool
var keyid string
var keyFunc func(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error)
envelope := pssmsg.Payload
psstopic := Topic(envelope.Topic)
if raw {
payload = pssmsg.Payload.Data
} else {
if pssmsg.isSym() {
keyFunc = p.processSym
} else {
asymmetric = true
keyFunc = p.processAsym
}
recvmsg, keyid, from, err = keyFunc(envelope)
if err != nil {
return errors.New("Decryption failed")
}
payload = recvmsg.Payload
}
if len(pssmsg.To) < addressLength {
if err := p.enqueue(pssmsg); err != nil {
return err
}
}
p.executeHandlers(psstopic, payload, from, raw, prox, asymmetric, keyid)
return nil
}
func (p *Pss) executeHandlers(topic Topic, payload []byte, from PssAddress, raw bool, prox bool, asymmetric bool, keyid string) {
handlers := p.getHandlers(topic)
all: new p2p node representation (#17643) Package p2p/enode provides a generalized representation of p2p nodes which can contain arbitrary information in key/value pairs. It is also the new home for the node database. The "v4" identity scheme is also moved here from p2p/enr to remove the dependency on Ethereum crypto from that package. Record signature handling is changed significantly. The identity scheme registry is removed and acceptable schemes must be passed to any method that needs identity. This means records must now be validated explicitly after decoding. The enode API is designed to make signature handling easy and safe: most APIs around the codebase work with enode.Node, which is a wrapper around a valid record. Going from enr.Record to enode.Node requires a valid signature. * p2p/discover: port to p2p/enode This ports the discovery code to the new node representation in p2p/enode. The wire protocol is unchanged, this can be considered a refactoring change. The Kademlia table can now deal with nodes using an arbitrary identity scheme. This requires a few incompatible API changes: - Table.Lookup is not available anymore. It used to take a public key as argument because v4 protocol requires one. Its replacement is LookupRandom. - Table.Resolve takes *enode.Node instead of NodeID. This is also for v4 protocol compatibility because nodes cannot be looked up by ID alone. - Types Node and NodeID are gone. Further commits in the series will be fixes all over the the codebase to deal with those removals. * p2p: port to p2p/enode and discovery changes This adapts package p2p to the changes in p2p/discover. All uses of discover.Node and discover.NodeID are replaced by their equivalents from p2p/enode. New API is added to retrieve the enode.Node instance of a peer. The behavior of Server.Self with discovery disabled is improved. It now tries much harder to report a working IP address, falling back to 127.0.0.1 if no suitable address can be determined through other means. These changes were needed for tests of other packages later in the series. * p2p/simulations, p2p/testing: port to p2p/enode No surprises here, mostly replacements of discover.Node, discover.NodeID with their new equivalents. The 'interesting' API changes are: - testing.ProtocolSession tracks complete nodes, not just their IDs. - adapters.NodeConfig has a new method to create a complete node. These changes were needed to make swarm tests work. Note that the NodeID change makes the code incompatible with old simulation snapshots. * whisper/whisperv5, whisper/whisperv6: port to p2p/enode This port was easy because whisper uses []byte for node IDs and URL strings in the API. * eth: port to p2p/enode Again, easy to port because eth uses strings for node IDs and doesn't care about node information in any way. * les: port to p2p/enode Apart from replacing discover.NodeID with enode.ID, most changes are in the server pool code. It now deals with complete nodes instead of (Pubkey, IP, Port) triples. The database format is unchanged for now, but we should probably change it to use the node database later. * node: port to p2p/enode This change simply replaces discover.Node and discover.NodeID with their new equivalents. * swarm/network: port to p2p/enode Swarm has its own node address representation, BzzAddr, containing both an overlay address (the hash of a secp256k1 public key) and an underlay address (enode:// URL). There are no changes to the BzzAddr format in this commit, but certain operations such as creating a BzzAddr from a node ID are now impossible because node IDs aren't public keys anymore. Most swarm-related changes in the series remove uses of NewAddrFromNodeID, replacing it with NewAddr which takes a complete node as argument. ToOverlayAddr is removed because we can just use the node ID directly.
6 years ago
peer := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%x", from), []p2p.Cap{})
for h := range handlers {
if !h.caps.raw && raw {
log.Warn("norawhandler")
continue
}
if !h.caps.prox && prox {
log.Warn("noproxhandler")
continue
}
err := (h.f)(payload, peer, asymmetric, keyid)
if err != nil {
log.Warn("Pss handler failed", "err", err)
}
}
}
// will return false if using partial address
func (p *Pss) isSelfRecipient(msg *PssMsg) bool {
return bytes.Equal(msg.To, p.Kademlia.BaseAddr())
}
// test match of leftmost bytes in given message to node's Kademlia address
func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool {
local := p.Kademlia.BaseAddr()
// if a partial address matches we are possible recipient regardless of prox
// if not and prox is not set, we are surely not
if bytes.Equal(msg.To, local[:len(msg.To)]) {
return true
} else if !prox {
return false
}
depth := p.Kademlia.NeighbourhoodDepth()
po, _ := p.Kademlia.Pof(p.Kademlia.BaseAddr(), msg.To, 0)
log.Trace("selfpossible", "po", po, "depth", depth)
return depth <= po
}
/////////////////////////////////////////////////////////////////////
// SECTION: Encryption
/////////////////////////////////////////////////////////////////////
// Links a peer ECDSA public key to a topic
//
// This is required for asymmetric message exchange
// on the given topic
//
// The value in `address` will be used as a routing hint for the
// public key / topic association
func (p *Pss) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic Topic, address PssAddress) error {
if err := validateAddress(address); err != nil {
return err
}
pubkeybytes := crypto.FromECDSAPub(pubkey)
if len(pubkeybytes) == 0 {
return fmt.Errorf("invalid public key: %v", pubkey)
}
pubkeyid := common.ToHex(pubkeybytes)
psp := &pssPeer{
address: address,
}
p.pubKeyPoolMu.Lock()
if _, ok := p.pubKeyPool[pubkeyid]; !ok {
p.pubKeyPool[pubkeyid] = make(map[Topic]*pssPeer)
}
p.pubKeyPool[pubkeyid][topic] = psp
p.pubKeyPoolMu.Unlock()
log.Trace("added pubkey", "pubkeyid", pubkeyid, "topic", topic, "address", address)
return nil
}
// Automatically generate a new symkey for a topic and address hint
func (p *Pss) GenerateSymmetricKey(topic Topic, address PssAddress, addToCache bool) (string, error) {
keyid, err := p.w.GenerateSymKey()
if err != nil {
return "", err
}
p.addSymmetricKeyToPool(keyid, topic, address, addToCache, false)
return keyid, nil
}
// Links a peer symmetric key (arbitrary byte sequence) to a topic
//
// This is required for symmetrically encrypted message exchange
// on the given topic
//
// The key is stored in the whisper backend.
//
// If addtocache is set to true, the key will be added to the cache of keys
// used to attempt symmetric decryption of incoming messages.
//
// Returns a string id that can be used to retrieve the key bytes
// from the whisper backend (see pss.GetSymmetricKey())
func (p *Pss) SetSymmetricKey(key []byte, topic Topic, address PssAddress, addtocache bool) (string, error) {
if err := validateAddress(address); err != nil {
return "", err
}
return p.setSymmetricKey(key, topic, address, addtocache, true)
}
func (p *Pss) setSymmetricKey(key []byte, topic Topic, address PssAddress, addtocache bool, protected bool) (string, error) {
keyid, err := p.w.AddSymKeyDirect(key)
if err != nil {
return "", err
}
p.addSymmetricKeyToPool(keyid, topic, address, addtocache, protected)
return keyid, nil
}
// adds a symmetric key to the pss key pool, and optionally adds the key
// to the collection of keys used to attempt symmetric decryption of
// incoming messages
func (p *Pss) addSymmetricKeyToPool(keyid string, topic Topic, address PssAddress, addtocache bool, protected bool) {
psp := &pssPeer{
address: address,
protected: protected,
}
p.symKeyPoolMu.Lock()
if _, ok := p.symKeyPool[keyid]; !ok {
p.symKeyPool[keyid] = make(map[Topic]*pssPeer)
}
p.symKeyPool[keyid][topic] = psp
p.symKeyPoolMu.Unlock()
if addtocache {
p.symKeyDecryptCacheCursor++
p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = &keyid
}
key, _ := p.GetSymmetricKey(keyid)
log.Trace("added symkey", "symkeyid", keyid, "symkey", common.ToHex(key), "topic", topic, "address", address, "cache", addtocache)
}
// Returns a symmetric key byte seqyence stored in the whisper backend
// by its unique id
//
// Passes on the error value from the whisper backend
func (p *Pss) GetSymmetricKey(symkeyid string) ([]byte, error) {
symkey, err := p.w.GetSymKey(symkeyid)
if err != nil {
return nil, err
}
return symkey, nil
}
// Returns all recorded topic and address combination for a specific public key
func (p *Pss) GetPublickeyPeers(keyid string) (topic []Topic, address []PssAddress, err error) {
p.pubKeyPoolMu.RLock()
defer p.pubKeyPoolMu.RUnlock()
for t, peer := range p.pubKeyPool[keyid] {
topic = append(topic, t)
address = append(address, peer.address)
}
return topic, address, nil
}
func (p *Pss) getPeerAddress(keyid string, topic Topic) (PssAddress, error) {
p.pubKeyPoolMu.RLock()
defer p.pubKeyPoolMu.RUnlock()
if peers, ok := p.pubKeyPool[keyid]; ok {
if t, ok := peers[topic]; ok {
return t.address, nil
}
}
return nil, fmt.Errorf("peer with pubkey %s, topic %x not found", keyid, topic)
}
// Attempt to decrypt, validate and unpack a
// symmetrically encrypted message
// If successful, returns the unpacked whisper ReceivedMessage struct
// encapsulating the decrypted message, and the whisper backend id
// of the symmetric key used to decrypt the message.
// It fails if decryption of the message fails or if the message is corrupted
func (p *Pss) processSym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error) {
metrics.GetOrRegisterCounter("pss.process.sym", nil).Inc(1)
for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- {
symkeyid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)]
symkey, err := p.w.GetSymKey(*symkeyid)
if err != nil {
continue
}
recvmsg, err := envelope.OpenSymmetric(symkey)
if err != nil {
continue
}
if !recvmsg.Validate() {
return nil, "", nil, fmt.Errorf("symmetrically encrypted message has invalid signature or is corrupt")
}
p.symKeyPoolMu.Lock()
from := p.symKeyPool[*symkeyid][Topic(envelope.Topic)].address
p.symKeyPoolMu.Unlock()
p.symKeyDecryptCacheCursor++
p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = symkeyid
return recvmsg, *symkeyid, from, nil
}
return nil, "", nil, fmt.Errorf("could not decrypt message")
}
// Attempt to decrypt, validate and unpack an
// asymmetrically encrypted message
// If successful, returns the unpacked whisper ReceivedMessage struct
// encapsulating the decrypted message, and the byte representation of
// the public key used to decrypt the message.
// It fails if decryption of message fails, or if the message is corrupted
func (p *Pss) processAsym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error) {
metrics.GetOrRegisterCounter("pss.process.asym", nil).Inc(1)
recvmsg, err := envelope.OpenAsymmetric(p.privateKey)
if err != nil {
return nil, "", nil, fmt.Errorf("could not decrypt message: %s", err)
}
// check signature (if signed), strip padding
if !recvmsg.Validate() {
return nil, "", nil, fmt.Errorf("invalid message")
}
pubkeyid := common.ToHex(crypto.FromECDSAPub(recvmsg.Src))
var from PssAddress
p.pubKeyPoolMu.Lock()
if p.pubKeyPool[pubkeyid][Topic(envelope.Topic)] != nil {
from = p.pubKeyPool[pubkeyid][Topic(envelope.Topic)].address
}
p.pubKeyPoolMu.Unlock()
return recvmsg, pubkeyid, from, nil
}
// Symkey garbage collection
// a key is removed if:
// - it is not marked as protected
// - it is not in the incoming decryption cache
func (p *Pss) cleanKeys() (count int) {
for keyid, peertopics := range p.symKeyPool {
var expiredtopics []Topic
for topic, psp := range peertopics {
if psp.protected {
continue
}
var match bool
for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- {
cacheid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)]
if *cacheid == keyid {
match = true
}
}
if !match {
expiredtopics = append(expiredtopics, topic)
}
}
for _, topic := range expiredtopics {
p.symKeyPoolMu.Lock()
delete(p.symKeyPool[keyid], topic)
log.Trace("symkey cleanup deletion", "symkeyid", keyid, "topic", topic, "val", p.symKeyPool[keyid])
p.symKeyPoolMu.Unlock()
count++
}
}
return
}
/////////////////////////////////////////////////////////////////////
// SECTION: Message sending
/////////////////////////////////////////////////////////////////////
func (p *Pss) enqueue(msg *PssMsg) error {
select {
case p.outbox <- msg:
return nil
default:
}
metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1)
return errors.New("outbox full")
}
// Send a raw message (any encryption is responsibility of calling client)
//
// Will fail if raw messages are disallowed
func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error {
if err := validateAddress(address); err != nil {
return err
}
pssMsgParams := &msgParams{
raw: true,
}
payload := &whisper.Envelope{
Data: msg,
Topic: whisper.TopicType(topic),
}
pssMsg := newPssMsg(pssMsgParams)
pssMsg.To = address
pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
pssMsg.Payload = payload
p.addFwdCache(pssMsg)
err := p.enqueue(pssMsg)
if err != nil {
return err
}
// if we have a proxhandler on this topic
// also deliver message to ourselves
if _, ok := p.topicHandlerCaps[topic]; ok {
if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox {
return p.process(pssMsg, true, true)
}
}
return nil
}
// Send a message using symmetric encryption
//
// Fails if the key id does not match any of the stored symmetric keys
func (p *Pss) SendSym(symkeyid string, topic Topic, msg []byte) error {
symkey, err := p.GetSymmetricKey(symkeyid)
if err != nil {
return fmt.Errorf("missing valid send symkey %s: %v", symkeyid, err)
}
p.symKeyPoolMu.Lock()
psp, ok := p.symKeyPool[symkeyid][topic]
p.symKeyPoolMu.Unlock()
if !ok {
return fmt.Errorf("invalid topic '%s' for symkey '%s'", topic.String(), symkeyid)
}
return p.send(psp.address, topic, msg, false, symkey)
}
// Send a message using asymmetric encryption
//
// Fails if the key id does not match any in of the stored public keys
func (p *Pss) SendAsym(pubkeyid string, topic Topic, msg []byte) error {
if _, err := crypto.UnmarshalPubkey(common.FromHex(pubkeyid)); err != nil {
return fmt.Errorf("Cannot unmarshal pubkey: %x", pubkeyid)
}
p.pubKeyPoolMu.Lock()
psp, ok := p.pubKeyPool[pubkeyid][topic]
p.pubKeyPoolMu.Unlock()
if !ok {
return fmt.Errorf("invalid topic '%s' for pubkey '%s'", topic.String(), pubkeyid)
}
return p.send(psp.address, topic, msg, true, common.FromHex(pubkeyid))
}
// Send is payload agnostic, and will accept any byte slice as payload
// It generates an whisper envelope for the specified recipient and topic,
// and wraps the message payload in it.
// TODO: Implement proper message padding
func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []byte) error {
metrics.GetOrRegisterCounter("pss.send", nil).Inc(1)
if key == nil || bytes.Equal(key, []byte{}) {
return fmt.Errorf("Zero length key passed to pss send")
}
padding := make([]byte, p.paddingByteSize)
c, err := rand.Read(padding)
if err != nil {
return err
} else if c < p.paddingByteSize {
return fmt.Errorf("invalid padding length: %d", c)
}
wparams := &whisper.MessageParams{
TTL: defaultWhisperTTL,
Src: p.privateKey,
Topic: whisper.TopicType(topic),
WorkTime: defaultWhisperWorkTime,
PoW: defaultWhisperPoW,
Payload: msg,
Padding: padding,
}
if asymmetric {
pk, err := crypto.UnmarshalPubkey(key)
if err != nil {
return fmt.Errorf("Cannot unmarshal pubkey: %x", key)
}
wparams.Dst = pk
} else {
wparams.KeySym = key
}
// set up outgoing message container, which does encryption and envelope wrapping
woutmsg, err := whisper.NewSentMessage(wparams)
if err != nil {
return fmt.Errorf("failed to generate whisper message encapsulation: %v", err)
}
// performs encryption.
// Does NOT perform / performs negligible PoW due to very low difficulty setting
// after this the message is ready for sending
envelope, err := woutmsg.Wrap(wparams)
if err != nil {
return fmt.Errorf("failed to perform whisper encryption: %v", err)
}
log.Trace("pssmsg whisper done", "env", envelope, "wparams payload", common.ToHex(wparams.Payload), "to", common.ToHex(to), "asym", asymmetric, "key", common.ToHex(key))
// prepare for devp2p transport
pssMsgParams := &msgParams{
sym: !asymmetric,
}
pssMsg := newPssMsg(pssMsgParams)
pssMsg.To = to
pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
pssMsg.Payload = envelope
err = p.enqueue(pssMsg)
if err != nil {
return err
}
if _, ok := p.topicHandlerCaps[topic]; ok {
if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox {
return p.process(pssMsg, true, true)
}
}
return nil
}
// Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct
// The recipient address can be of any length, and the byte slice will be matched to the MSB slice
// of the peer address of the equivalent length.
func (p *Pss) forward(msg *PssMsg) error {
metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1)
to := make([]byte, addressLength)
copy(to[:len(msg.To)], msg.To)
// send with kademlia
// find the closest peer to the recipient and attempt to send
sent := 0
p.Kademlia.EachConn(to, 256, func(sp *network.Peer, po int, isproxbin bool) bool {
info := sp.Info()
// check if the peer is running pss
var ispss bool
for _, cap := range info.Caps {
if cap == p.capstring {
ispss = true
break
}
}
if !ispss {
log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
return true
}
// get the protocol peer from the forwarding peer cache
sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), sp.Address())
p.fwdPoolMu.RLock()
pp := p.fwdPool[sp.Info().ID]
p.fwdPoolMu.RUnlock()
// attempt to send the message
err := pp.Send(context.TODO(), msg)
if err != nil {
metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
log.Error(err.Error())
return true
}
sent++
log.Trace(fmt.Sprintf("%v: successfully forwarded", sendMsg))
// continue forwarding if:
// - if the peer is end recipient but the full address has not been disclosed
// - if the peer address matches the partial address fully
// - if the peer is in proxbin
if len(msg.To) < addressLength && bytes.Equal(msg.To, sp.Address()[:len(msg.To)]) {
log.Trace(fmt.Sprintf("Pss keep forwarding: Partial address + full partial match"))
return true
} else if isproxbin {
log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(sp.Address())))
return true
}
// at this point we stop forwarding, and the state is as follows:
// - the peer is end recipient and we have full address
// - we are not in proxbin (directed routing)
// - partial addresses don't fully match
return false
})
if sent == 0 {
log.Debug("unable to forward to any peers")
if err := p.enqueue(msg); err != nil {
metrics.GetOrRegisterCounter("pss.forward.enqueue.error", nil).Inc(1)
log.Error(err.Error())
return err
}
}
// cache the message
p.addFwdCache(msg)
return nil
}
/////////////////////////////////////////////////////////////////////
// SECTION: Caching
/////////////////////////////////////////////////////////////////////
// cleanFwdCache is used to periodically remove expired entries from the forward cache
func (p *Pss) cleanFwdCache() {
metrics.GetOrRegisterCounter("pss.cleanfwdcache", nil).Inc(1)
p.fwdCacheMu.Lock()
defer p.fwdCacheMu.Unlock()
for k, v := range p.fwdCache {
if v.expiresAt.Before(time.Now()) {
delete(p.fwdCache, k)
}
}
}
func label(b []byte) string {
return fmt.Sprintf("%04x", b[:2])
}
// add a message to the cache
func (p *Pss) addFwdCache(msg *PssMsg) error {
metrics.GetOrRegisterCounter("pss.addfwdcache", nil).Inc(1)
var entry pssCacheEntry
var ok bool
p.fwdCacheMu.Lock()
defer p.fwdCacheMu.Unlock()
digest := p.digest(msg)
if entry, ok = p.fwdCache[digest]; !ok {
entry = pssCacheEntry{}
}
entry.expiresAt = time.Now().Add(p.cacheTTL)
p.fwdCache[digest] = entry
return nil
}
// check if message is in the cache
func (p *Pss) checkFwdCache(msg *PssMsg) bool {
p.fwdCacheMu.Lock()
defer p.fwdCacheMu.Unlock()
digest := p.digest(msg)
entry, ok := p.fwdCache[digest]
if ok {
if entry.expiresAt.After(time.Now()) {
log.Trace("unexpired cache", "digest", fmt.Sprintf("%x", digest))
metrics.GetOrRegisterCounter("pss.checkfwdcache.unexpired", nil).Inc(1)
return true
}
metrics.GetOrRegisterCounter("pss.checkfwdcache.expired", nil).Inc(1)
}
return false
}
// Digest of message
func (p *Pss) digest(msg *PssMsg) pssDigest {
return p.digestBytes(msg.serialize())
}
func (p *Pss) digestBytes(msg []byte) pssDigest {
hasher := p.hashPool.Get().(hash.Hash)
defer p.hashPool.Put(hasher)
hasher.Reset()
hasher.Write(msg)
digest := pssDigest{}
key := hasher.Sum(nil)
copy(digest[:], key[:digestLength])
return digest
}
func validateAddress(addr PssAddress) error {
if len(addr) > addressLength {
return errors.New("address too long")
}
return nil
}