|
|
|
@ -37,8 +37,24 @@ var ( |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
|
maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
|
|
|
|
|
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
|
|
|
|
|
maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
|
|
|
|
|
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
|
|
|
|
|
|
|
|
|
|
// maxQueuedTxs is the maximum number of transaction lists to queue up before
|
|
|
|
|
// dropping broadcasts. This is a sensitive number as a transaction list might
|
|
|
|
|
// contain a single transaction, or thousands.
|
|
|
|
|
maxQueuedTxs = 128 |
|
|
|
|
|
|
|
|
|
// maxQueuedProps is the maximum number of block propagations to queue up before
|
|
|
|
|
// dropping broadcasts. There's not much point in queueing stale blocks, so a few
|
|
|
|
|
// that might cover uncles should be enough.
|
|
|
|
|
maxQueuedProps = 4 |
|
|
|
|
|
|
|
|
|
// maxQueuedAnns is the maximum number of block announcements to queue up before
|
|
|
|
|
// dropping broadcasts. Similarly to block propagations, there's no point to queue
|
|
|
|
|
// above some healthy uncle limit, so use that.
|
|
|
|
|
maxQueuedAnns = 4 |
|
|
|
|
|
|
|
|
|
handshakeTimeout = 5 * time.Second |
|
|
|
|
) |
|
|
|
|
|
|
|
|
@ -50,6 +66,12 @@ type PeerInfo struct { |
|
|
|
|
Head string `json:"head"` // SHA3 hash of the peer's best owned block
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// propEvent is a block propagation, waiting for its turn in the broadcast queue.
|
|
|
|
|
type propEvent struct { |
|
|
|
|
block *types.Block |
|
|
|
|
td *big.Int |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type peer struct { |
|
|
|
|
id string |
|
|
|
|
|
|
|
|
@ -63,23 +85,64 @@ type peer struct { |
|
|
|
|
td *big.Int |
|
|
|
|
lock sync.RWMutex |
|
|
|
|
|
|
|
|
|
knownTxs *set.Set // Set of transaction hashes known to be known by this peer
|
|
|
|
|
knownBlocks *set.Set // Set of block hashes known to be known by this peer
|
|
|
|
|
knownTxs *set.Set // Set of transaction hashes known to be known by this peer
|
|
|
|
|
knownBlocks *set.Set // Set of block hashes known to be known by this peer
|
|
|
|
|
queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
|
|
|
|
|
queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
|
|
|
|
|
queuedAnns chan *types.Block // Queue of blocks to announce to the peer
|
|
|
|
|
term chan struct{} // Termination channel to stop the broadcaster
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { |
|
|
|
|
id := p.ID() |
|
|
|
|
|
|
|
|
|
return &peer{ |
|
|
|
|
Peer: p, |
|
|
|
|
rw: rw, |
|
|
|
|
version: version, |
|
|
|
|
id: fmt.Sprintf("%x", id[:8]), |
|
|
|
|
id: fmt.Sprintf("%x", p.ID().Bytes()[:8]), |
|
|
|
|
knownTxs: set.New(), |
|
|
|
|
knownBlocks: set.New(), |
|
|
|
|
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), |
|
|
|
|
queuedProps: make(chan *propEvent, maxQueuedProps), |
|
|
|
|
queuedAnns: make(chan *types.Block, maxQueuedAnns), |
|
|
|
|
term: make(chan struct{}), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// broadcast is a write loop that multiplexes block propagations, announcements
|
|
|
|
|
// and transaction broadcasts into the remote peer. The goal is to have an async
|
|
|
|
|
// writer that does not lock up node internals.
|
|
|
|
|
func (p *peer) broadcast() { |
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case txs := <-p.queuedTxs: |
|
|
|
|
if err := p.SendTransactions(txs); err != nil { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
p.Log().Trace("Broadcast transactions", "count", len(txs)) |
|
|
|
|
|
|
|
|
|
case prop := <-p.queuedProps: |
|
|
|
|
if err := p.SendNewBlock(prop.block, prop.td); err != nil { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td) |
|
|
|
|
|
|
|
|
|
case block := <-p.queuedAnns: |
|
|
|
|
if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) |
|
|
|
|
|
|
|
|
|
case <-p.term: |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// close signals the broadcast goroutine to terminate.
|
|
|
|
|
func (p *peer) close() { |
|
|
|
|
close(p.term) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Info gathers and returns a collection of metadata known about a peer.
|
|
|
|
|
func (p *peer) Info() *PeerInfo { |
|
|
|
|
hash, td := p.Head() |
|
|
|
@ -139,6 +202,19 @@ func (p *peer) SendTransactions(txs types.Transactions) error { |
|
|
|
|
return p2p.Send(p.rw, TxMsg, txs) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// AsyncSendTransactions queues list of transactions propagation to a remote
|
|
|
|
|
// peer. If the peer's broadcast queue is full, the event is silently dropped.
|
|
|
|
|
func (p *peer) AsyncSendTransactions(txs []*types.Transaction) { |
|
|
|
|
select { |
|
|
|
|
case p.queuedTxs <- txs: |
|
|
|
|
for _, tx := range txs { |
|
|
|
|
p.knownTxs.Add(tx.Hash()) |
|
|
|
|
} |
|
|
|
|
default: |
|
|
|
|
p.Log().Debug("Dropping transaction propagation", "count", len(txs)) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// SendNewBlockHashes announces the availability of a number of blocks through
|
|
|
|
|
// a hash notification.
|
|
|
|
|
func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error { |
|
|
|
@ -153,12 +229,35 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error |
|
|
|
|
return p2p.Send(p.rw, NewBlockHashesMsg, request) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// AsyncSendNewBlockHash queues the availability of a block for propagation to a
|
|
|
|
|
// remote peer. If the peer's broadcast queue is full, the event is silently
|
|
|
|
|
// dropped.
|
|
|
|
|
func (p *peer) AsyncSendNewBlockHash(block *types.Block) { |
|
|
|
|
select { |
|
|
|
|
case p.queuedAnns <- block: |
|
|
|
|
p.knownBlocks.Add(block.Hash()) |
|
|
|
|
default: |
|
|
|
|
p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash()) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// SendNewBlock propagates an entire block to a remote peer.
|
|
|
|
|
func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error { |
|
|
|
|
p.knownBlocks.Add(block.Hash()) |
|
|
|
|
return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// AsyncSendNewBlock queues an entire block for propagation to a remote peer. If
|
|
|
|
|
// the peer's broadcast queue is full, the event is silently dropped.
|
|
|
|
|
func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { |
|
|
|
|
select { |
|
|
|
|
case p.queuedProps <- &propEvent{block: block, td: td}: |
|
|
|
|
p.knownBlocks.Add(block.Hash()) |
|
|
|
|
default: |
|
|
|
|
p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash()) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// SendBlockHeaders sends a batch of block headers to the remote peer.
|
|
|
|
|
func (p *peer) SendBlockHeaders(headers []*types.Header) error { |
|
|
|
|
return p2p.Send(p.rw, BlockHeadersMsg, headers) |
|
|
|
@ -313,7 +412,8 @@ func newPeerSet() *peerSet { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Register injects a new peer into the working set, or returns an error if the
|
|
|
|
|
// peer is already known.
|
|
|
|
|
// peer is already known. If a new peer it registered, its broadcast loop is also
|
|
|
|
|
// started.
|
|
|
|
|
func (ps *peerSet) Register(p *peer) error { |
|
|
|
|
ps.lock.Lock() |
|
|
|
|
defer ps.lock.Unlock() |
|
|
|
@ -325,6 +425,8 @@ func (ps *peerSet) Register(p *peer) error { |
|
|
|
|
return errAlreadyRegistered |
|
|
|
|
} |
|
|
|
|
ps.peers[p.id] = p |
|
|
|
|
go p.broadcast() |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -334,10 +436,13 @@ func (ps *peerSet) Unregister(id string) error { |
|
|
|
|
ps.lock.Lock() |
|
|
|
|
defer ps.lock.Unlock() |
|
|
|
|
|
|
|
|
|
if _, ok := ps.peers[id]; !ok { |
|
|
|
|
p, ok := ps.peers[id] |
|
|
|
|
if !ok { |
|
|
|
|
return errNotRegistered |
|
|
|
|
} |
|
|
|
|
delete(ps.peers, id) |
|
|
|
|
p.close() |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|