@ -32,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/fetcher"
"github.com/ethereum/go-ethereum/eth/fetcher"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
@ -41,7 +42,9 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/triedb/pathdb"
"github.com/ethereum/go-ethereum/triedb/pathdb"
"golang.org/x/crypto/sha3"
)
)
const (
const (
@ -84,6 +87,7 @@ type txPool interface {
// handlerConfig is the collection of initialization parameters to create a full
// handlerConfig is the collection of initialization parameters to create a full
// node network handler.
// node network handler.
type handlerConfig struct {
type handlerConfig struct {
NodeID enode . ID // P2P node ID used for tx propagation topology
Database ethdb . Database // Database for direct sync insertions
Database ethdb . Database // Database for direct sync insertions
Chain * core . BlockChain // Blockchain to serve data from
Chain * core . BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
TxPool txPool // Transaction pool to propagate from
@ -96,6 +100,7 @@ type handlerConfig struct {
}
}
type handler struct {
type handler struct {
nodeID enode . ID
networkID uint64
networkID uint64
forkFilter forkid . Filter // Fork ID filter, constant across the lifetime of the node
forkFilter forkid . Filter // Fork ID filter, constant across the lifetime of the node
@ -137,6 +142,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
config . EventMux = new ( event . TypeMux ) // Nicety initialization for tests
config . EventMux = new ( event . TypeMux ) // Nicety initialization for tests
}
}
h := & handler {
h := & handler {
nodeID : config . NodeID ,
networkID : config . Network ,
networkID : config . Network ,
forkFilter : forkid . NewFilter ( config . Chain ) ,
forkFilter : forkid . NewFilter ( config . Chain ) ,
eventMux : config . EventMux ,
eventMux : config . EventMux ,
@ -614,25 +620,54 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
annos = make ( map [ * ethPeer ] [ ] common . Hash ) // Set peer->hash to announce
annos = make ( map [ * ethPeer ] [ ] common . Hash ) // Set peer->hash to announce
)
)
// Broadcast transactions to a batch of peers not knowing about it
// Broadcast transactions to a batch of peers not knowing about it
for _ , tx := range txs {
direct := big . NewInt ( int64 ( math . Sqrt ( float64 ( h . peers . len ( ) ) ) ) ) // Approximate number of peers to broadcast to
peers := h . peers . peersWithoutTransaction ( tx . Hash ( ) )
if direct . BitLen ( ) == 0 {
direct = big . NewInt ( 1 )
}
total := new ( big . Int ) . Exp ( direct , big . NewInt ( 2 ) , nil ) // Stabilise total peer count a bit based on sqrt peers
var numDirect int
var (
signer = types . LatestSignerForChainID ( h . chain . Config ( ) . ChainID ) // Don't care about chain status, we just need *a* sender
hasher = sha3 . NewLegacyKeccak256 ( ) . ( crypto . KeccakState )
hash = make ( [ ] byte , 32 )
)
for _ , tx := range txs {
var maybeDirect bool
switch {
switch {
case tx . Type ( ) == types . BlobTxType :
case tx . Type ( ) == types . BlobTxType :
blobTxs ++
blobTxs ++
case tx . Size ( ) > txMaxBroadcastSize :
case tx . Size ( ) > txMaxBroadcastSize :
largeTxs ++
largeTxs ++
default :
default :
nu mDirect = int ( math . Sqrt ( float64 ( len ( peers ) ) ) )
maybe Direct = true
}
}
// Send the tx unconditionally to a subset of our peers
// Send the transaction (if it's small enough) directly to a subset of
for _ , peer := range peers [ : numDirect ] {
// the peers that have not received it yet, ensuring that the flow of
txset [ peer ] = append ( txset [ peer ] , tx . Hash ( ) )
// transactions is groupped by account to (try and) avoid nonce gaps.
}
//
// For the remaining peers, send announcement only
// To do this, we hash the local enode IW with together with a peer's
for _ , peer := range peers [ numDirect : ] {
// enode ID together with the transaction sender and broadcast if
annos [ peer ] = append ( annos [ peer ] , tx . Hash ( ) )
// `sha(self, peer, sender) mod peers < sqrt(peers)`.
for _ , peer := range h . peers . peersWithoutTransaction ( tx . Hash ( ) ) {
var broadcast bool
if maybeDirect {
hasher . Reset ( )
hasher . Write ( h . nodeID . Bytes ( ) )
hasher . Write ( peer . Node ( ) . ID ( ) . Bytes ( ) )
from , _ := types . Sender ( signer , tx ) // Ignore error, we only use the addr as a propagation target splitter
hasher . Write ( from . Bytes ( ) )
hasher . Read ( hash )
if new ( big . Int ) . Mod ( new ( big . Int ) . SetBytes ( hash ) , total ) . Cmp ( direct ) < 0 {
broadcast = true
}
}
if broadcast {
txset [ peer ] = append ( txset [ peer ] , tx . Hash ( ) )
} else {
annos [ peer ] = append ( annos [ peer ] , tx . Hash ( ) )
}
}
}
}
}
for peer , hashes := range txset {
for peer , hashes := range txset {