@ -7,18 +7,16 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethutil"
ethlogger "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
)
var logger = ethlogger . NewLogger ( "SERV" )
// ethProtocol represents the ethereum wire protocol
// instance is running on each peer
type ethProtocol struct {
eth backend
td * big . Int
peer * p2p . Peer
id string
rw p2p . MsgReadWriter
}
@ -26,28 +24,21 @@ type ethProtocol struct {
// used as an argument to EthProtocol
type backend interface {
GetTransactions ( ) ( txs [ ] * types . Transaction )
AddTransactions ( txs [ ] * types . Transaction )
AddTransactions ( [ ] * types . Transaction )
GetBlockHashes ( hash [ ] byte , amount uint32 ) ( hashes [ ] [ ] byte )
AddHash ( hash [ ] byte , peer * p2p . Peer ) ( more bool )
AddBlockHashes ( next func ( ) ( [ ] byte , bool ) , peerId string )
GetBlock ( hash [ ] byte ) ( block * types . Block )
AddBlock ( td * big . Int , block * types . Block , peer * p2p . Peer ) ( fetchHashes bool , err error )
AddPeer ( td * big . Int , currentBlock [ ] byte , peer * p2p . Peer ) ( fetchHashes bool )
AddBlock ( block * types . Block , peerId string ) ( err error )
AddPeer ( td * big . Int , currentBlock [ ] byte , peerId string , requestHashes func ( [ ] byte ) error , requestBlocks func ( [ ] [ ] byte ) error , invalidBlock func ( error ) ) ( best bool )
RemovePeer ( peerId string )
Status ( ) ( td * big . Int , currentBlock [ ] byte , genesisBlock [ ] byte )
}
const (
ProtocolVersion = 43
// 0x00 // PoC-1
// 0x01 // PoC-2
// 0x07 // PoC-3
// 0x09 // PoC-4
// 0x17 // PoC-5
// 0x1c // PoC-6
ProtocolVersion = 43
NetworkId = 0
ProtocolLength = uint64 ( 8 )
ProtocolMaxMsgSize = 10 * 1024 * 1024
blockHashesBatchSize = 256
)
// eth protocol message codes
@ -74,7 +65,8 @@ type getBlockHashesMsgData struct {
}
// main entrypoint, wrappers starting a server running the eth protocol
// use this constructor to attach the protocol (class) to server caps
// use this constructor to attach the protocol ("class") to server caps
// the Dev p2p layer then runs the protocol instance on each peer
func EthProtocol ( eth backend ) * p2p . Protocol {
return & p2p . Protocol {
Name : "eth" ,
@ -86,11 +78,14 @@ func EthProtocol(eth backend) *p2p.Protocol {
}
}
// the main loop that handles incoming messages
// note RemovePeer in the post-disconnect hook
func runEthProtocol ( eth backend , peer * p2p . Peer , rw p2p . MsgReadWriter ) ( err error ) {
self := & ethProtocol {
eth : eth ,
rw : rw ,
peer : peer ,
id : ( string ) ( peer . Identity ( ) . Pubkey ( ) ) ,
}
err = self . handleStatus ( )
if err == nil {
@ -98,6 +93,7 @@ func runEthProtocol(eth backend, peer *p2p.Peer, rw p2p.MsgReadWriter) (err erro
for {
err = self . handle ( )
if err != nil {
self . eth . RemovePeer ( self . id )
break
}
}
@ -132,6 +128,7 @@ func (self *ethProtocol) handle() error {
return self . rw . EncodeMsg ( TxMsg , txsInterface ... )
case TxMsg :
// TODO: rework using lazy RLP stream
var txs [ ] * types . Transaction
if err := msg . Decode ( & txs ) ; err != nil {
return ProtocolError ( ErrDecode , "%v" , err )
@ -148,29 +145,26 @@ func (self *ethProtocol) handle() error {
case BlockHashesMsg :
// TODO: redo using lazy decode , this way very inefficient on known chains
// s := rlp.NewListStream(msg.Payload, uint64(msg.Size))
var blockHashes [ ] [ ] byte
if err := msg . Decode ( & blockHashes ) ; err != nil {
return ProtocolError ( ErrDecode , "%v" , err )
}
fetchMore := true
for _ , hash := range blockHashes {
fetchMore = self . eth . AddHash ( hash , self . peer )
if ! fetchMore {
break
msgStream := rlp . NewListStream ( msg . Payload , uint64 ( msg . Size ) )
var err error
iter := func ( ) ( hash [ ] byte , ok bool ) {
hash , err = msgStream . Bytes ( )
if err == nil {
ok = true
}
return
}
if fetchMore {
return self . FetchHashes ( blockHashes [ len ( blockHashes ) - 1 ] )
self . eth . AddBlockHashes ( iter , self . id )
if err != nil && err != rlp . EOL {
return ProtocolError ( ErrDecode , "%v" , err )
}
case GetBlocksMsg :
// Limit to max 300 blocks
var blockHashes [ ] [ ] byte
if err := msg . Decode ( & blockHashes ) ; err != nil {
return ProtocolError ( ErrDecode , "%v" , err )
}
max := int ( math . Min ( float64 ( len ( blockHashes ) ) , 300.0 ) )
max := int ( math . Min ( float64 ( len ( blockHashes ) ) , blockHashesBatchSize ) )
var blocks [ ] interface { }
for i , hash := range blockHashes {
if i >= max {
@ -184,20 +178,19 @@ func (self *ethProtocol) handle() error {
return self . rw . EncodeMsg ( BlocksMsg , blocks ... )
case BlocksMsg :
var blocks [ ] * types . Block
if err := msg . Decode ( & blocks ) ; err != nil {
return ProtocolError ( ErrDecode , "%v" , err )
}
for _ , block := range blocks {
fetchHashes , err := self . eth . AddBlock ( nil , block , self . peer )
if err != nil {
return ProtocolError ( ErrInvalidBlock , "%v" , err )
}
if fetchHashes {
if err := self . FetchHashes ( block . Hash ( ) ) ; err != nil {
return err
msgStream := rlp . NewListStream ( msg . Payload , uint64 ( msg . Size ) )
for {
var block * types . Block
if err := msgStream . Decode ( & block ) ; err != nil {
if err == rlp . EOL {
break
} else {
return ProtocolError ( ErrDecode , "%v" , err )
}
}
if err := self . eth . AddBlock ( block , self . id ) ; err != nil {
return ProtocolError ( ErrInvalidBlock , "%v" , err )
}
}
case NewBlockMsg :
@ -205,13 +198,24 @@ func (self *ethProtocol) handle() error {
if err := msg . Decode ( & request ) ; err != nil {
return ProtocolError ( ErrDecode , "%v" , err )
}
var fetchHashes bool
// this should reset td and offer blockpool as candidate new peer?
if fetchHashes , err = self . eth . AddBlock ( request . TD , request . Block , self . peer ) ; err != nil {
return ProtocolError ( ErrInvalidBlock , "%v" , err )
}
if fetchHashes {
return self . FetchHashes ( request . Block . Hash ( ) )
hash := request . Block . Hash ( )
// to simplify backend interface adding a new block
// uses AddPeer followed by AddHashes, AddBlock only if peer is the best peer
// (or selected as new best peer)
if self . eth . AddPeer ( request . TD , hash , self . id , self . requestBlockHashes , self . requestBlocks , self . invalidBlock ) {
called := true
iter := func ( ) ( hash [ ] byte , ok bool ) {
if called {
called = false
return hash , true
} else {
return
}
}
self . eth . AddBlockHashes ( iter , self . id )
if err := self . eth . AddBlock ( request . Block , self . id ) ; err != nil {
return ProtocolError ( ErrInvalidBlock , "%v" , err )
}
}
default :
@ -279,16 +283,34 @@ func (self *ethProtocol) handleStatus() error {
return ProtocolError ( ErrProtocolVersionMismatch , "%d (!= %d)" , status . ProtocolVersion , ProtocolVersion )
}
logg er. Infof ( "Peer is [eth] capable (%d/%d). TD = %v ~ %x" , status . ProtocolVersion , status . NetworkId , status . CurrentBlock )
self . pe er. Infof ( "Peer is [eth] capable (%d/%d). TD = %v ~ %x" , status . ProtocolVersion , status . NetworkId , status . CurrentBlock )
if self . eth . AddPeer ( status . TD , status . CurrentBlock , self . peer ) {
return self . FetchHashes ( status . CurrentBlock )
}
self . eth . AddPeer ( status . TD , status . CurrentBlock , self . id , self . requestBlockHashes , self . requestBlocks , self . invalidBlock )
return nil
}
func ( self * ethProtocol ) Fetch Hashes( from [ ] byte ) error {
logg er. Debugf ( "F etching hashes (%d) %x...\n" , blockHashesBatchSize , from [ 0 : 4 ] )
func ( self * ethProtocol ) requestBlock Hashes( from [ ] byte ) error {
self . pe er. Debugf ( "f etching hashes (%d) %x...\n" , blockHashesBatchSize , from [ 0 : 4 ] )
return self . rw . EncodeMsg ( GetBlockHashesMsg , from , blockHashesBatchSize )
}
func ( self * ethProtocol ) requestBlocks ( hashes [ ] [ ] byte ) error {
self . peer . Debugf ( "fetching %v blocks" , len ( hashes ) )
return self . rw . EncodeMsg ( GetBlocksMsg , ethutil . ByteSliceToInterface ( hashes ) )
}
func ( self * ethProtocol ) invalidBlock ( err error ) {
ProtocolError ( ErrInvalidBlock , "%v" , err )
self . peer . Disconnect ( p2p . DiscSubprotocolError )
}
func ( self * ethProtocol ) protoError ( code int , format string , params ... interface { } ) ( err * protocolError ) {
err = ProtocolError ( code , format , params ... )
if err . Fatal ( ) {
self . peer . Errorln ( err )
} else {
self . peer . Debugln ( err )
}
return
}