forked from mirror/go-ethereum
commit
71aa5fe8a3
@ -0,0 +1,79 @@ |
||||
package downloader |
||||
|
||||
import ( |
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/core/types" |
||||
"github.com/ethereum/go-ethereum/logger" |
||||
"github.com/ethereum/go-ethereum/logger/glog" |
||||
) |
||||
|
||||
// THIS IS PENDING AND TO DO CHANGES FOR MAKING THE DOWNLOADER SYNCHRONOUS
|
||||
|
||||
// SynchroniseWithPeer will select the peer and use it for synchronising. If an empty string is given
|
||||
// it will use the best peer possible and synchronise if it's TD is higher than our own. If any of the
|
||||
// checks fail an error will be returned. This method is synchronous
|
||||
func (d *Downloader) SynchroniseWithPeer(id string) (types.Blocks, error) { |
||||
// Check if we're busy
|
||||
if d.isBusy() { |
||||
return nil, errBusy |
||||
} |
||||
|
||||
// Attempt to select a peer. This can either be nothing, which returns, best peer
|
||||
// or selected peer. If no peer could be found an error will be returned
|
||||
var p *peer |
||||
if len(id) == 0 { |
||||
p = d.peers[id] |
||||
if p == nil { |
||||
return nil, errUnknownPeer |
||||
} |
||||
} else { |
||||
p = d.peers.bestPeer() |
||||
} |
||||
|
||||
// Make sure our td is lower than the peer's td
|
||||
if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) { |
||||
return nil, errLowTd |
||||
} |
||||
|
||||
// Get the hash from the peer and initiate the downloading progress.
|
||||
err := d.getFromPeer(p, p.recentHash, false) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return d.queue.blocks, nil |
||||
} |
||||
|
||||
// Synchronise will synchronise using the best peer.
|
||||
func (d *Downloader) Synchronise() (types.Blocks, error) { |
||||
return d.SynchroniseWithPeer("") |
||||
} |
||||
|
||||
func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) error { |
||||
d.activePeer = p.id |
||||
|
||||
glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id) |
||||
// Start the fetcher. This will block the update entirely
|
||||
// interupts need to be send to the appropriate channels
|
||||
// respectively.
|
||||
if err := d.startFetchingHashes(p, hash, ignoreInitial); err != nil { |
||||
// handle error
|
||||
glog.V(logger.Debug).Infoln("Error fetching hashes:", err) |
||||
// XXX Reset
|
||||
return err |
||||
} |
||||
|
||||
// Start fetching blocks in paralel. The strategy is simple
|
||||
// take any available peers, seserve a chunk for each peer available,
|
||||
// let the peer deliver the chunkn and periodically check if a peer
|
||||
// has timedout. When done downloading, process blocks.
|
||||
if err := d.startFetchingBlocks(p); err != nil { |
||||
glog.V(logger.Debug).Infoln("Error downloading blocks:", err) |
||||
// XXX reset
|
||||
return err |
||||
} |
||||
|
||||
glog.V(logger.Detail).Infoln("Sync completed") |
||||
|
||||
return nil |
||||
} |
@ -0,0 +1,334 @@ |
||||
package eth |
||||
|
||||
// XXX Fair warning, most of the code is re-used from the old protocol. Please be aware that most of this will actually change
|
||||
// The idea is that most of the calls within the protocol will become synchronous.
|
||||
// Block downloading and block processing will be complete seperate processes
|
||||
/* |
||||
# Possible scenarios |
||||
|
||||
// Synching scenario
|
||||
// Use the best peer to synchronise
|
||||
blocks, err := pm.downloader.Synchronise() |
||||
if err != nil { |
||||
// handle
|
||||
break |
||||
} |
||||
pm.chainman.InsertChain(blocks) |
||||
|
||||
// Receiving block with known parent
|
||||
if parent_exist { |
||||
if err := pm.chainman.InsertChain(block); err != nil { |
||||
// handle
|
||||
break |
||||
} |
||||
pm.BroadcastBlock(block) |
||||
} |
||||
|
||||
// Receiving block with unknown parent
|
||||
blocks, err := pm.downloader.SynchroniseWithPeer(peer) |
||||
if err != nil { |
||||
// handle
|
||||
break |
||||
} |
||||
pm.chainman.InsertChain(blocks) |
||||
|
||||
*/ |
||||
|
||||
import ( |
||||
"fmt" |
||||
"math" |
||||
"sync" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/core" |
||||
"github.com/ethereum/go-ethereum/core/types" |
||||
"github.com/ethereum/go-ethereum/eth/downloader" |
||||
"github.com/ethereum/go-ethereum/logger" |
||||
"github.com/ethereum/go-ethereum/logger/glog" |
||||
"github.com/ethereum/go-ethereum/p2p" |
||||
"github.com/ethereum/go-ethereum/rlp" |
||||
) |
||||
|
||||
func errResp(code errCode, format string, v ...interface{}) error { |
||||
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) |
||||
} |
||||
|
||||
type hashFetcherFn func(common.Hash) error |
||||
type blockFetcherFn func([]common.Hash) error |
||||
|
||||
// extProt is an interface which is passed around so we can expose GetHashes and GetBlock without exposing it to the rest of the protocol
|
||||
// extProt is passed around to peers which require to GetHashes and GetBlocks
|
||||
type extProt struct { |
||||
getHashes hashFetcherFn |
||||
getBlocks blockFetcherFn |
||||
} |
||||
|
||||
func (ep extProt) GetHashes(hash common.Hash) error { return ep.getHashes(hash) } |
||||
func (ep extProt) GetBlock(hashes []common.Hash) error { return ep.getBlocks(hashes) } |
||||
|
||||
type ProtocolManager struct { |
||||
protVer, netId int |
||||
txpool txPool |
||||
chainman *core.ChainManager |
||||
downloader *downloader.Downloader |
||||
|
||||
pmu sync.Mutex |
||||
peers map[string]*peer |
||||
|
||||
SubProtocol p2p.Protocol |
||||
} |
||||
|
||||
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
|
||||
// with the ethereum network.
|
||||
func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager { |
||||
manager := &ProtocolManager{ |
||||
txpool: txpool, |
||||
chainman: chainman, |
||||
downloader: downloader, |
||||
peers: make(map[string]*peer), |
||||
} |
||||
|
||||
manager.SubProtocol = p2p.Protocol{ |
||||
Name: "eth", |
||||
Version: uint(protocolVersion), |
||||
Length: ProtocolLength, |
||||
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { |
||||
peer := manager.newPeer(protocolVersion, networkId, p, rw) |
||||
err := manager.handle(peer) |
||||
//glog.V(logger.Detail).Infof("[%s]: %v\n", peer.id, err)
|
||||
|
||||
return err |
||||
}, |
||||
} |
||||
|
||||
return manager |
||||
} |
||||
|
||||
func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { |
||||
|
||||
td, current, genesis := pm.chainman.Status() |
||||
|
||||
return newPeer(pv, nv, genesis, current, td, p, rw) |
||||
} |
||||
|
||||
func (pm *ProtocolManager) handle(p *peer) error { |
||||
if err := p.handleStatus(); err != nil { |
||||
return err |
||||
} |
||||
pm.pmu.Lock() |
||||
pm.peers[p.id] = p |
||||
pm.pmu.Unlock() |
||||
|
||||
pm.downloader.RegisterPeer(p.id, p.td, p.currentHash, p.requestHashes, p.requestBlocks) |
||||
defer func() { |
||||
pm.pmu.Lock() |
||||
defer pm.pmu.Unlock() |
||||
delete(pm.peers, p.id) |
||||
pm.downloader.UnregisterPeer(p.id) |
||||
}() |
||||
|
||||
// propagate existing transactions. new transactions appearing
|
||||
// after this will be sent via broadcasts.
|
||||
if err := p.sendTransactions(pm.txpool.GetTransactions()); err != nil { |
||||
return err |
||||
} |
||||
|
||||
// main loop. handle incoming messages.
|
||||
for { |
||||
if err := pm.handleMsg(p); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (self *ProtocolManager) handleMsg(p *peer) error { |
||||
msg, err := p.rw.ReadMsg() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if msg.Size > ProtocolMaxMsgSize { |
||||
return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) |
||||
} |
||||
// make sure that the payload has been fully consumed
|
||||
defer msg.Discard() |
||||
|
||||
switch msg.Code { |
||||
case GetTxMsg: // ignore
|
||||
case StatusMsg: |
||||
return errResp(ErrExtraStatusMsg, "uncontrolled status message") |
||||
|
||||
case TxMsg: |
||||
// TODO: rework using lazy RLP stream
|
||||
var txs []*types.Transaction |
||||
if err := msg.Decode(&txs); err != nil { |
||||
return errResp(ErrDecode, "msg %v: %v", msg, err) |
||||
} |
||||
for i, tx := range txs { |
||||
if tx == nil { |
||||
return errResp(ErrDecode, "transaction %d is nil", i) |
||||
} |
||||
jsonlogger.LogJson(&logger.EthTxReceived{ |
||||
TxHash: tx.Hash().Hex(), |
||||
RemoteId: p.ID().String(), |
||||
}) |
||||
} |
||||
self.txpool.AddTransactions(txs) |
||||
|
||||
case GetBlockHashesMsg: |
||||
var request getBlockHashesMsgData |
||||
if err := msg.Decode(&request); err != nil { |
||||
return errResp(ErrDecode, "->msg %v: %v", msg, err) |
||||
} |
||||
|
||||
if request.Amount > maxHashes { |
||||
request.Amount = maxHashes |
||||
} |
||||
|
||||
hashes := self.chainman.GetBlockHashesFromHash(request.Hash, request.Amount) |
||||
|
||||
if glog.V(logger.Debug) { |
||||
if len(hashes) == 0 { |
||||
glog.Infof("invalid block hash %x", request.Hash.Bytes()[:4]) |
||||
} |
||||
} |
||||
|
||||
// returns either requested hashes or nothing (i.e. not found)
|
||||
return p.sendBlockHashes(hashes) |
||||
case BlockHashesMsg: |
||||
msgStream := rlp.NewStream(msg.Payload) |
||||
|
||||
var hashes []common.Hash |
||||
if err := msgStream.Decode(&hashes); err != nil { |
||||
break |
||||
} |
||||
err := self.downloader.AddHashes(p.id, hashes) |
||||
if err != nil { |
||||
glog.V(logger.Debug).Infoln(err) |
||||
} |
||||
|
||||
case GetBlocksMsg: |
||||
msgStream := rlp.NewStream(msg.Payload) |
||||
if _, err := msgStream.List(); err != nil { |
||||
return err |
||||
} |
||||
|
||||
var blocks []*types.Block |
||||
var i int |
||||
for { |
||||
i++ |
||||
var hash common.Hash |
||||
err := msgStream.Decode(&hash) |
||||
if err == rlp.EOL { |
||||
break |
||||
} else if err != nil { |
||||
return errResp(ErrDecode, "msg %v: %v", msg, err) |
||||
} |
||||
|
||||
block := self.chainman.GetBlock(hash) |
||||
if block != nil { |
||||
blocks = append(blocks, block) |
||||
} |
||||
if i == maxBlocks { |
||||
break |
||||
} |
||||
} |
||||
return p.sendBlocks(blocks) |
||||
case BlocksMsg: |
||||
msgStream := rlp.NewStream(msg.Payload) |
||||
|
||||
var blocks []*types.Block |
||||
if err := msgStream.Decode(&blocks); err != nil { |
||||
glog.V(logger.Detail).Infoln("Decode error", err) |
||||
blocks = nil |
||||
} |
||||
self.downloader.DeliverChunk(p.id, blocks) |
||||
|
||||
case NewBlockMsg: |
||||
var request newBlockMsgData |
||||
if err := msg.Decode(&request); err != nil { |
||||
return errResp(ErrDecode, "%v: %v", msg, err) |
||||
} |
||||
if err := request.Block.ValidateFields(); err != nil { |
||||
return errResp(ErrDecode, "block validation %v: %v", msg, err) |
||||
} |
||||
hash := request.Block.Hash() |
||||
// Add the block hash as a known hash to the peer. This will later be used to detirmine
|
||||
// who should receive this.
|
||||
p.blockHashes.Add(hash) |
||||
|
||||
_, chainHead, _ := self.chainman.Status() |
||||
|
||||
jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{ |
||||
BlockHash: hash.Hex(), |
||||
BlockNumber: request.Block.Number(), // this surely must be zero
|
||||
ChainHeadHash: chainHead.Hex(), |
||||
BlockPrevHash: request.Block.ParentHash().Hex(), |
||||
RemoteId: p.ID().String(), |
||||
}) |
||||
|
||||
// Make sure the block isn't already known. If this is the case simply drop
|
||||
// the message and move on. If the TD is < currentTd; drop it as well. If this
|
||||
// chain at some point becomes canonical, the downloader will fetch it.
|
||||
if self.chainman.HasBlock(hash) { |
||||
break |
||||
} |
||||
/* XXX unsure about this |
||||
if self.chainman.Td().Cmp(request.TD) > 0 && new(big.Int).Add(request.Block.Number(), big.NewInt(7)).Cmp(self.chainman.CurrentBlock().Number()) < 0 { |
||||
glog.V(logger.Debug).Infoln("dropped block", request.Block.Number(), "due to low TD", request.TD) |
||||
break |
||||
} |
||||
*/ |
||||
|
||||
// Attempt to insert the newly received by checking if the parent exists.
|
||||
// if the parent exists we process the block and propagate to our peers
|
||||
// if the parent does not exists we delegate to the downloader.
|
||||
// NOTE we can reduce chatter by dropping blocks with Td < currentTd
|
||||
if self.chainman.HasBlock(request.Block.ParentHash()) { |
||||
if err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil { |
||||
// handle error
|
||||
return nil |
||||
} |
||||
self.BroadcastBlock(hash, request.Block) |
||||
//fmt.Println(request.Block.Hash().Hex(), "our calculated TD =", request.Block.Td, "their TD =", request.TD)
|
||||
} else { |
||||
// adding blocks is synchronous
|
||||
go func() { |
||||
err := self.downloader.AddBlock(p.id, request.Block, request.TD) |
||||
if err != nil { |
||||
glog.V(logger.Detail).Infoln("downloader err:", err) |
||||
return |
||||
} |
||||
self.BroadcastBlock(hash, request.Block) |
||||
//fmt.Println(request.Block.Hash().Hex(), "our calculated TD =", request.Block.Td, "their TD =", request.TD)
|
||||
}() |
||||
} |
||||
default: |
||||
return errResp(ErrInvalidMsgCode, "%v", msg.Code) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// BroadcastBlock will propagate the block to its connected peers. It will sort
|
||||
// out which peers do not contain the block in their block set and will do a
|
||||
// sqrt(peers) to determine the amount of peers we broadcast to.
|
||||
func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) { |
||||
pm.pmu.Lock() |
||||
defer pm.pmu.Unlock() |
||||
|
||||
// Find peers who don't know anything about the given hash. Peers that
|
||||
// don't know about the hash will be a candidate for the broadcast loop
|
||||
var peers []*peer |
||||
for _, peer := range pm.peers { |
||||
if !peer.blockHashes.Has(hash) { |
||||
peers = append(peers, peer) |
||||
} |
||||
} |
||||
// Broadcast block to peer set
|
||||
peers = peers[:int(math.Sqrt(float64(len(peers))))] |
||||
for _, peer := range peers { |
||||
peer.sendNewBlock(block) |
||||
} |
||||
glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers") |
||||
} |
@ -0,0 +1,143 @@ |
||||
package eth |
||||
|
||||
import ( |
||||
"fmt" |
||||
"math/big" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/core/types" |
||||
"github.com/ethereum/go-ethereum/p2p" |
||||
"gopkg.in/fatih/set.v0" |
||||
) |
||||
|
||||
type statusMsgData struct { |
||||
ProtocolVersion uint32 |
||||
NetworkId uint32 |
||||
TD *big.Int |
||||
CurrentBlock common.Hash |
||||
GenesisBlock common.Hash |
||||
} |
||||
|
||||
type getBlockHashesMsgData struct { |
||||
Hash common.Hash |
||||
Amount uint64 |
||||
} |
||||
|
||||
type peer struct { |
||||
*p2p.Peer |
||||
|
||||
rw p2p.MsgReadWriter |
||||
|
||||
protv, netid int |
||||
|
||||
currentHash common.Hash |
||||
id string |
||||
td *big.Int |
||||
|
||||
genesis, ourHash common.Hash |
||||
ourTd *big.Int |
||||
|
||||
txHashes *set.Set |
||||
blockHashes *set.Set |
||||
} |
||||
|
||||
func newPeer(protv, netid int, genesis, currentHash common.Hash, td *big.Int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { |
||||
id := p.ID() |
||||
|
||||
return &peer{ |
||||
Peer: p, |
||||
rw: rw, |
||||
genesis: genesis, |
||||
ourHash: currentHash, |
||||
ourTd: td, |
||||
protv: protv, |
||||
netid: netid, |
||||
id: fmt.Sprintf("%x", id[:8]), |
||||
txHashes: set.New(), |
||||
blockHashes: set.New(), |
||||
} |
||||
} |
||||
|
||||
// sendTransactions sends transactions to the peer and includes the hashes
|
||||
// in it's tx hash set for future reference. The tx hash will allow the
|
||||
// manager to check whether the peer has already received this particular
|
||||
// transaction
|
||||
func (p *peer) sendTransactions(txs types.Transactions) error { |
||||
for _, tx := range txs { |
||||
p.txHashes.Add(tx.Hash()) |
||||
} |
||||
|
||||
return p2p.Send(p.rw, TxMsg, txs) |
||||
} |
||||
|
||||
func (p *peer) sendBlockHashes(hashes []common.Hash) error { |
||||
return p2p.Send(p.rw, BlockHashesMsg, hashes) |
||||
} |
||||
|
||||
func (p *peer) sendBlocks(blocks []*types.Block) error { |
||||
return p2p.Send(p.rw, BlocksMsg, blocks) |
||||
} |
||||
|
||||
func (p *peer) sendNewBlock(block *types.Block) error { |
||||
p.blockHashes.Add(block.Hash()) |
||||
|
||||
return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, block.Td}) |
||||
} |
||||
|
||||
func (p *peer) requestHashes(from common.Hash) error { |
||||
p.Debugf("fetching hashes (%d) %x...\n", maxHashes, from[0:4]) |
||||
return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, maxHashes}) |
||||
} |
||||
|
||||
func (p *peer) requestBlocks(hashes []common.Hash) error { |
||||
p.Debugf("fetching %v blocks", len(hashes)) |
||||
return p2p.Send(p.rw, GetBlocksMsg, hashes) |
||||
} |
||||
|
||||
func (p *peer) handleStatus() error { |
||||
errc := make(chan error, 1) |
||||
go func() { |
||||
errc <- p2p.Send(p.rw, StatusMsg, &statusMsgData{ |
||||
ProtocolVersion: uint32(p.protv), |
||||
NetworkId: uint32(p.netid), |
||||
TD: p.ourTd, |
||||
CurrentBlock: p.ourHash, |
||||
GenesisBlock: p.genesis, |
||||
}) |
||||
}() |
||||
|
||||
// read and handle remote status
|
||||
msg, err := p.rw.ReadMsg() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if msg.Code != StatusMsg { |
||||
return errResp(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg) |
||||
} |
||||
if msg.Size > ProtocolMaxMsgSize { |
||||
return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) |
||||
} |
||||
|
||||
var status statusMsgData |
||||
if err := msg.Decode(&status); err != nil { |
||||
return errResp(ErrDecode, "msg %v: %v", msg, err) |
||||
} |
||||
|
||||
if status.GenesisBlock != p.genesis { |
||||
return errResp(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, p.genesis) |
||||
} |
||||
|
||||
if int(status.NetworkId) != p.netid { |
||||
return errResp(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, p.netid) |
||||
} |
||||
|
||||
if int(status.ProtocolVersion) != p.protv { |
||||
return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, p.protv) |
||||
} |
||||
// Set the total difficulty of the peer
|
||||
p.td = status.TD |
||||
// set the best hash of the peer
|
||||
p.currentHash = status.CurrentBlock |
||||
|
||||
return <-errc |
||||
} |
Loading…
Reference in new issue