mirror of https://github.com/ethereum/go-ethereum
commit
6a9730edaa
@ -0,0 +1,311 @@ |
|||||||
|
// Copyright 2017 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
/* |
||||||
|
Package protocols is an extension to p2p. It offers a user friendly simple way to define |
||||||
|
devp2p subprotocols by abstracting away code standardly shared by protocols. |
||||||
|
|
||||||
|
* automate assigments of code indexes to messages |
||||||
|
* automate RLP decoding/encoding based on reflecting |
||||||
|
* provide the forever loop to read incoming messages |
||||||
|
* standardise error handling related to communication |
||||||
|
* standardised handshake negotiation |
||||||
|
* TODO: automatic generation of wire protocol specification for peers |
||||||
|
|
||||||
|
*/ |
||||||
|
package protocols |
||||||
|
|
||||||
|
import ( |
||||||
|
"context" |
||||||
|
"fmt" |
||||||
|
"reflect" |
||||||
|
"sync" |
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/p2p" |
||||||
|
) |
||||||
|
|
||||||
|
// error codes used by this protocol scheme
|
||||||
|
const ( |
||||||
|
ErrMsgTooLong = iota |
||||||
|
ErrDecode |
||||||
|
ErrWrite |
||||||
|
ErrInvalidMsgCode |
||||||
|
ErrInvalidMsgType |
||||||
|
ErrHandshake |
||||||
|
ErrNoHandler |
||||||
|
ErrHandler |
||||||
|
) |
||||||
|
|
||||||
|
// error description strings associated with the codes
|
||||||
|
var errorToString = map[int]string{ |
||||||
|
ErrMsgTooLong: "Message too long", |
||||||
|
ErrDecode: "Invalid message (RLP error)", |
||||||
|
ErrWrite: "Error sending message", |
||||||
|
ErrInvalidMsgCode: "Invalid message code", |
||||||
|
ErrInvalidMsgType: "Invalid message type", |
||||||
|
ErrHandshake: "Handshake error", |
||||||
|
ErrNoHandler: "No handler registered error", |
||||||
|
ErrHandler: "Message handler error", |
||||||
|
} |
||||||
|
|
||||||
|
/* |
||||||
|
Error implements the standard go error interface. |
||||||
|
Use: |
||||||
|
|
||||||
|
errorf(code, format, params ...interface{}) |
||||||
|
|
||||||
|
Prints as: |
||||||
|
|
||||||
|
<description>: <details> |
||||||
|
|
||||||
|
where description is given by code in errorToString |
||||||
|
and details is fmt.Sprintf(format, params...) |
||||||
|
|
||||||
|
exported field Code can be checked |
||||||
|
*/ |
||||||
|
type Error struct { |
||||||
|
Code int |
||||||
|
message string |
||||||
|
format string |
||||||
|
params []interface{} |
||||||
|
} |
||||||
|
|
||||||
|
func (e Error) Error() (message string) { |
||||||
|
if len(e.message) == 0 { |
||||||
|
name, ok := errorToString[e.Code] |
||||||
|
if !ok { |
||||||
|
panic("invalid message code") |
||||||
|
} |
||||||
|
e.message = name |
||||||
|
if e.format != "" { |
||||||
|
e.message += ": " + fmt.Sprintf(e.format, e.params...) |
||||||
|
} |
||||||
|
} |
||||||
|
return e.message |
||||||
|
} |
||||||
|
|
||||||
|
func errorf(code int, format string, params ...interface{}) *Error { |
||||||
|
return &Error{ |
||||||
|
Code: code, |
||||||
|
format: format, |
||||||
|
params: params, |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// Spec is a protocol specification including its name and version as well as
|
||||||
|
// the types of messages which are exchanged
|
||||||
|
type Spec struct { |
||||||
|
// Name is the name of the protocol, often a three-letter word
|
||||||
|
Name string |
||||||
|
|
||||||
|
// Version is the version number of the protocol
|
||||||
|
Version uint |
||||||
|
|
||||||
|
// MaxMsgSize is the maximum accepted length of the message payload
|
||||||
|
MaxMsgSize uint32 |
||||||
|
|
||||||
|
// Messages is a list of message data types which this protocol uses, with
|
||||||
|
// each message type being sent with its array index as the code (so
|
||||||
|
// [&foo{}, &bar{}, &baz{}] would send foo, bar and baz with codes
|
||||||
|
// 0, 1 and 2 respectively)
|
||||||
|
// each message must have a single unique data type
|
||||||
|
Messages []interface{} |
||||||
|
|
||||||
|
initOnce sync.Once |
||||||
|
codes map[reflect.Type]uint64 |
||||||
|
types map[uint64]reflect.Type |
||||||
|
} |
||||||
|
|
||||||
|
func (s *Spec) init() { |
||||||
|
s.initOnce.Do(func() { |
||||||
|
s.codes = make(map[reflect.Type]uint64, len(s.Messages)) |
||||||
|
s.types = make(map[uint64]reflect.Type, len(s.Messages)) |
||||||
|
for i, msg := range s.Messages { |
||||||
|
code := uint64(i) |
||||||
|
typ := reflect.TypeOf(msg) |
||||||
|
if typ.Kind() == reflect.Ptr { |
||||||
|
typ = typ.Elem() |
||||||
|
} |
||||||
|
s.codes[typ] = code |
||||||
|
s.types[code] = typ |
||||||
|
} |
||||||
|
}) |
||||||
|
} |
||||||
|
|
||||||
|
// Length returns the number of message types in the protocol
|
||||||
|
func (s *Spec) Length() uint64 { |
||||||
|
return uint64(len(s.Messages)) |
||||||
|
} |
||||||
|
|
||||||
|
// GetCode returns the message code of a type, and boolean second argument is
|
||||||
|
// false if the message type is not found
|
||||||
|
func (s *Spec) GetCode(msg interface{}) (uint64, bool) { |
||||||
|
s.init() |
||||||
|
typ := reflect.TypeOf(msg) |
||||||
|
if typ.Kind() == reflect.Ptr { |
||||||
|
typ = typ.Elem() |
||||||
|
} |
||||||
|
code, ok := s.codes[typ] |
||||||
|
return code, ok |
||||||
|
} |
||||||
|
|
||||||
|
// NewMsg construct a new message type given the code
|
||||||
|
func (s *Spec) NewMsg(code uint64) (interface{}, bool) { |
||||||
|
s.init() |
||||||
|
typ, ok := s.types[code] |
||||||
|
if !ok { |
||||||
|
return nil, false |
||||||
|
} |
||||||
|
return reflect.New(typ).Interface(), true |
||||||
|
} |
||||||
|
|
||||||
|
// Peer represents a remote peer or protocol instance that is running on a peer connection with
|
||||||
|
// a remote peer
|
||||||
|
type Peer struct { |
||||||
|
*p2p.Peer // the p2p.Peer object representing the remote
|
||||||
|
rw p2p.MsgReadWriter // p2p.MsgReadWriter to send messages to and read messages from
|
||||||
|
spec *Spec |
||||||
|
} |
||||||
|
|
||||||
|
// NewPeer constructs a new peer
|
||||||
|
// this constructor is called by the p2p.Protocol#Run function
|
||||||
|
// the first two arguments are the arguments passed to p2p.Protocol.Run function
|
||||||
|
// the third argument is the Spec describing the protocol
|
||||||
|
func NewPeer(p *p2p.Peer, rw p2p.MsgReadWriter, spec *Spec) *Peer { |
||||||
|
return &Peer{ |
||||||
|
Peer: p, |
||||||
|
rw: rw, |
||||||
|
spec: spec, |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// Run starts the forever loop that handles incoming messages
|
||||||
|
// called within the p2p.Protocol#Run function
|
||||||
|
// the handler argument is a function which is called for each message received
|
||||||
|
// from the remote peer, a returned error causes the loop to exit
|
||||||
|
// resulting in disconnection
|
||||||
|
func (p *Peer) Run(handler func(msg interface{}) error) error { |
||||||
|
for { |
||||||
|
if err := p.handleIncoming(handler); err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// Drop disconnects a peer.
|
||||||
|
// TODO: may need to implement protocol drop only? don't want to kick off the peer
|
||||||
|
// if they are useful for other protocols
|
||||||
|
func (p *Peer) Drop(err error) { |
||||||
|
p.Disconnect(p2p.DiscSubprotocolError) |
||||||
|
} |
||||||
|
|
||||||
|
// Send takes a message, encodes it in RLP, finds the right message code and sends the
|
||||||
|
// message off to the peer
|
||||||
|
// this low level call will be wrapped by libraries providing routed or broadcast sends
|
||||||
|
// but often just used to forward and push messages to directly connected peers
|
||||||
|
func (p *Peer) Send(msg interface{}) error { |
||||||
|
code, found := p.spec.GetCode(msg) |
||||||
|
if !found { |
||||||
|
return errorf(ErrInvalidMsgType, "%v", code) |
||||||
|
} |
||||||
|
return p2p.Send(p.rw, code, msg) |
||||||
|
} |
||||||
|
|
||||||
|
// handleIncoming(code)
|
||||||
|
// is called each cycle of the main forever loop that dispatches incoming messages
|
||||||
|
// if this returns an error the loop returns and the peer is disconnected with the error
|
||||||
|
// this generic handler
|
||||||
|
// * checks message size,
|
||||||
|
// * checks for out-of-range message codes,
|
||||||
|
// * handles decoding with reflection,
|
||||||
|
// * call handlers as callbacks
|
||||||
|
func (p *Peer) handleIncoming(handle func(msg interface{}) error) error { |
||||||
|
msg, err := p.rw.ReadMsg() |
||||||
|
if err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
// make sure that the payload has been fully consumed
|
||||||
|
defer msg.Discard() |
||||||
|
|
||||||
|
if msg.Size > p.spec.MaxMsgSize { |
||||||
|
return errorf(ErrMsgTooLong, "%v > %v", msg.Size, p.spec.MaxMsgSize) |
||||||
|
} |
||||||
|
|
||||||
|
val, ok := p.spec.NewMsg(msg.Code) |
||||||
|
if !ok { |
||||||
|
return errorf(ErrInvalidMsgCode, "%v", msg.Code) |
||||||
|
} |
||||||
|
if err := msg.Decode(val); err != nil { |
||||||
|
return errorf(ErrDecode, "<= %v: %v", msg, err) |
||||||
|
} |
||||||
|
|
||||||
|
// call the registered handler callbacks
|
||||||
|
// a registered callback take the decoded message as argument as an interface
|
||||||
|
// which the handler is supposed to cast to the appropriate type
|
||||||
|
// it is entirely safe not to check the cast in the handler since the handler is
|
||||||
|
// chosen based on the proper type in the first place
|
||||||
|
if err := handle(val); err != nil { |
||||||
|
return errorf(ErrHandler, "(msg code %v): %v", msg.Code, err) |
||||||
|
} |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
// Handshake negotiates a handshake on the peer connection
|
||||||
|
// * arguments
|
||||||
|
// * context
|
||||||
|
// * the local handshake to be sent to the remote peer
|
||||||
|
// * funcion to be called on the remote handshake (can be nil)
|
||||||
|
// * expects a remote handshake back of the same type
|
||||||
|
// * the dialing peer needs to send the handshake first and then waits for remote
|
||||||
|
// * the listening peer waits for the remote handshake and then sends it
|
||||||
|
// returns the remote handshake and an error
|
||||||
|
func (p *Peer) Handshake(ctx context.Context, hs interface{}, verify func(interface{}) error) (rhs interface{}, err error) { |
||||||
|
if _, ok := p.spec.GetCode(hs); !ok { |
||||||
|
return nil, errorf(ErrHandshake, "unknown handshake message type: %T", hs) |
||||||
|
} |
||||||
|
errc := make(chan error, 2) |
||||||
|
handle := func(msg interface{}) error { |
||||||
|
rhs = msg |
||||||
|
if verify != nil { |
||||||
|
return verify(rhs) |
||||||
|
} |
||||||
|
return nil |
||||||
|
} |
||||||
|
send := func() { errc <- p.Send(hs) } |
||||||
|
receive := func() { errc <- p.handleIncoming(handle) } |
||||||
|
|
||||||
|
go func() { |
||||||
|
if p.Inbound() { |
||||||
|
receive() |
||||||
|
send() |
||||||
|
} else { |
||||||
|
send() |
||||||
|
receive() |
||||||
|
} |
||||||
|
}() |
||||||
|
|
||||||
|
for i := 0; i < 2; i++ { |
||||||
|
select { |
||||||
|
case err = <-errc: |
||||||
|
case <-ctx.Done(): |
||||||
|
err = ctx.Err() |
||||||
|
} |
||||||
|
if err != nil { |
||||||
|
return nil, errorf(ErrHandshake, err.Error()) |
||||||
|
} |
||||||
|
} |
||||||
|
return rhs, nil |
||||||
|
} |
@ -0,0 +1,389 @@ |
|||||||
|
// Copyright 2017 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package protocols |
||||||
|
|
||||||
|
import ( |
||||||
|
"context" |
||||||
|
"errors" |
||||||
|
"fmt" |
||||||
|
"testing" |
||||||
|
"time" |
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/p2p" |
||||||
|
"github.com/ethereum/go-ethereum/p2p/discover" |
||||||
|
"github.com/ethereum/go-ethereum/p2p/simulations/adapters" |
||||||
|
p2ptest "github.com/ethereum/go-ethereum/p2p/testing" |
||||||
|
) |
||||||
|
|
||||||
|
// handshake message type
|
||||||
|
type hs0 struct { |
||||||
|
C uint |
||||||
|
} |
||||||
|
|
||||||
|
// message to kill/drop the peer with nodeID
|
||||||
|
type kill struct { |
||||||
|
C discover.NodeID |
||||||
|
} |
||||||
|
|
||||||
|
// message to drop connection
|
||||||
|
type drop struct { |
||||||
|
} |
||||||
|
|
||||||
|
/// protoHandshake represents module-independent aspects of the protocol and is
|
||||||
|
// the first message peers send and receive as part the initial exchange
|
||||||
|
type protoHandshake struct { |
||||||
|
Version uint // local and remote peer should have identical version
|
||||||
|
NetworkID string // local and remote peer should have identical network id
|
||||||
|
} |
||||||
|
|
||||||
|
// checkProtoHandshake verifies local and remote protoHandshakes match
|
||||||
|
func checkProtoHandshake(testVersion uint, testNetworkID string) func(interface{}) error { |
||||||
|
return func(rhs interface{}) error { |
||||||
|
remote := rhs.(*protoHandshake) |
||||||
|
if remote.NetworkID != testNetworkID { |
||||||
|
return fmt.Errorf("%s (!= %s)", remote.NetworkID, testNetworkID) |
||||||
|
} |
||||||
|
|
||||||
|
if remote.Version != testVersion { |
||||||
|
return fmt.Errorf("%d (!= %d)", remote.Version, testVersion) |
||||||
|
} |
||||||
|
return nil |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// newProtocol sets up a protocol
|
||||||
|
// the run function here demonstrates a typical protocol using peerPool, handshake
|
||||||
|
// and messages registered to handlers
|
||||||
|
func newProtocol(pp *p2ptest.TestPeerPool) func(*p2p.Peer, p2p.MsgReadWriter) error { |
||||||
|
spec := &Spec{ |
||||||
|
Name: "test", |
||||||
|
Version: 42, |
||||||
|
MaxMsgSize: 10 * 1024, |
||||||
|
Messages: []interface{}{ |
||||||
|
protoHandshake{}, |
||||||
|
hs0{}, |
||||||
|
kill{}, |
||||||
|
drop{}, |
||||||
|
}, |
||||||
|
} |
||||||
|
return func(p *p2p.Peer, rw p2p.MsgReadWriter) error { |
||||||
|
peer := NewPeer(p, rw, spec) |
||||||
|
|
||||||
|
// initiate one-off protohandshake and check validity
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second) |
||||||
|
defer cancel() |
||||||
|
phs := &protoHandshake{42, "420"} |
||||||
|
hsCheck := checkProtoHandshake(phs.Version, phs.NetworkID) |
||||||
|
_, err := peer.Handshake(ctx, phs, hsCheck) |
||||||
|
if err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
|
||||||
|
lhs := &hs0{42} |
||||||
|
// module handshake demonstrating a simple repeatable exchange of same-type message
|
||||||
|
hs, err := peer.Handshake(ctx, lhs, nil) |
||||||
|
if err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
|
||||||
|
if rmhs := hs.(*hs0); rmhs.C > lhs.C { |
||||||
|
return fmt.Errorf("handshake mismatch remote %v > local %v", rmhs.C, lhs.C) |
||||||
|
} |
||||||
|
|
||||||
|
handle := func(msg interface{}) error { |
||||||
|
switch msg := msg.(type) { |
||||||
|
|
||||||
|
case *protoHandshake: |
||||||
|
return errors.New("duplicate handshake") |
||||||
|
|
||||||
|
case *hs0: |
||||||
|
rhs := msg |
||||||
|
if rhs.C > lhs.C { |
||||||
|
return fmt.Errorf("handshake mismatch remote %v > local %v", rhs.C, lhs.C) |
||||||
|
} |
||||||
|
lhs.C += rhs.C |
||||||
|
return peer.Send(lhs) |
||||||
|
|
||||||
|
case *kill: |
||||||
|
// demonstrates use of peerPool, killing another peer connection as a response to a message
|
||||||
|
id := msg.C |
||||||
|
pp.Get(id).Drop(errors.New("killed")) |
||||||
|
return nil |
||||||
|
|
||||||
|
case *drop: |
||||||
|
// for testing we can trigger self induced disconnect upon receiving drop message
|
||||||
|
return errors.New("dropped") |
||||||
|
|
||||||
|
default: |
||||||
|
return fmt.Errorf("unknown message type: %T", msg) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
pp.Add(peer) |
||||||
|
defer pp.Remove(peer) |
||||||
|
return peer.Run(handle) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func protocolTester(t *testing.T, pp *p2ptest.TestPeerPool) *p2ptest.ProtocolTester { |
||||||
|
conf := adapters.RandomNodeConfig() |
||||||
|
return p2ptest.NewProtocolTester(t, conf.ID, 2, newProtocol(pp)) |
||||||
|
} |
||||||
|
|
||||||
|
func protoHandshakeExchange(id discover.NodeID, proto *protoHandshake) []p2ptest.Exchange { |
||||||
|
|
||||||
|
return []p2ptest.Exchange{ |
||||||
|
{ |
||||||
|
Expects: []p2ptest.Expect{ |
||||||
|
{ |
||||||
|
Code: 0, |
||||||
|
Msg: &protoHandshake{42, "420"}, |
||||||
|
Peer: id, |
||||||
|
}, |
||||||
|
}, |
||||||
|
}, |
||||||
|
{ |
||||||
|
Triggers: []p2ptest.Trigger{ |
||||||
|
{ |
||||||
|
Code: 0, |
||||||
|
Msg: proto, |
||||||
|
Peer: id, |
||||||
|
}, |
||||||
|
}, |
||||||
|
}, |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func runProtoHandshake(t *testing.T, proto *protoHandshake, errs ...error) { |
||||||
|
pp := p2ptest.NewTestPeerPool() |
||||||
|
s := protocolTester(t, pp) |
||||||
|
// TODO: make this more than one handshake
|
||||||
|
id := s.IDs[0] |
||||||
|
if err := s.TestExchanges(protoHandshakeExchange(id, proto)...); err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
var disconnects []*p2ptest.Disconnect |
||||||
|
for i, err := range errs { |
||||||
|
disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.IDs[i], Error: err}) |
||||||
|
} |
||||||
|
if err := s.TestDisconnected(disconnects...); err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func TestProtoHandshakeVersionMismatch(t *testing.T) { |
||||||
|
runProtoHandshake(t, &protoHandshake{41, "420"}, errorf(ErrHandshake, errorf(ErrHandler, "(msg code 0): 41 (!= 42)").Error())) |
||||||
|
} |
||||||
|
|
||||||
|
func TestProtoHandshakeNetworkIDMismatch(t *testing.T) { |
||||||
|
runProtoHandshake(t, &protoHandshake{42, "421"}, errorf(ErrHandshake, errorf(ErrHandler, "(msg code 0): 421 (!= 420)").Error())) |
||||||
|
} |
||||||
|
|
||||||
|
func TestProtoHandshakeSuccess(t *testing.T) { |
||||||
|
runProtoHandshake(t, &protoHandshake{42, "420"}) |
||||||
|
} |
||||||
|
|
||||||
|
func moduleHandshakeExchange(id discover.NodeID, resp uint) []p2ptest.Exchange { |
||||||
|
|
||||||
|
return []p2ptest.Exchange{ |
||||||
|
{ |
||||||
|
Expects: []p2ptest.Expect{ |
||||||
|
{ |
||||||
|
Code: 1, |
||||||
|
Msg: &hs0{42}, |
||||||
|
Peer: id, |
||||||
|
}, |
||||||
|
}, |
||||||
|
}, |
||||||
|
{ |
||||||
|
Triggers: []p2ptest.Trigger{ |
||||||
|
{ |
||||||
|
Code: 1, |
||||||
|
Msg: &hs0{resp}, |
||||||
|
Peer: id, |
||||||
|
}, |
||||||
|
}, |
||||||
|
}, |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func runModuleHandshake(t *testing.T, resp uint, errs ...error) { |
||||||
|
pp := p2ptest.NewTestPeerPool() |
||||||
|
s := protocolTester(t, pp) |
||||||
|
id := s.IDs[0] |
||||||
|
if err := s.TestExchanges(protoHandshakeExchange(id, &protoHandshake{42, "420"})...); err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
if err := s.TestExchanges(moduleHandshakeExchange(id, resp)...); err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
var disconnects []*p2ptest.Disconnect |
||||||
|
for i, err := range errs { |
||||||
|
disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.IDs[i], Error: err}) |
||||||
|
} |
||||||
|
if err := s.TestDisconnected(disconnects...); err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func TestModuleHandshakeError(t *testing.T) { |
||||||
|
runModuleHandshake(t, 43, fmt.Errorf("handshake mismatch remote 43 > local 42")) |
||||||
|
} |
||||||
|
|
||||||
|
func TestModuleHandshakeSuccess(t *testing.T) { |
||||||
|
runModuleHandshake(t, 42) |
||||||
|
} |
||||||
|
|
||||||
|
// testing complex interactions over multiple peers, relaying, dropping
|
||||||
|
func testMultiPeerSetup(a, b discover.NodeID) []p2ptest.Exchange { |
||||||
|
|
||||||
|
return []p2ptest.Exchange{ |
||||||
|
{ |
||||||
|
Label: "primary handshake", |
||||||
|
Expects: []p2ptest.Expect{ |
||||||
|
{ |
||||||
|
Code: 0, |
||||||
|
Msg: &protoHandshake{42, "420"}, |
||||||
|
Peer: a, |
||||||
|
}, |
||||||
|
{ |
||||||
|
Code: 0, |
||||||
|
Msg: &protoHandshake{42, "420"}, |
||||||
|
Peer: b, |
||||||
|
}, |
||||||
|
}, |
||||||
|
}, |
||||||
|
{ |
||||||
|
Label: "module handshake", |
||||||
|
Triggers: []p2ptest.Trigger{ |
||||||
|
{ |
||||||
|
Code: 0, |
||||||
|
Msg: &protoHandshake{42, "420"}, |
||||||
|
Peer: a, |
||||||
|
}, |
||||||
|
{ |
||||||
|
Code: 0, |
||||||
|
Msg: &protoHandshake{42, "420"}, |
||||||
|
Peer: b, |
||||||
|
}, |
||||||
|
}, |
||||||
|
Expects: []p2ptest.Expect{ |
||||||
|
{ |
||||||
|
Code: 1, |
||||||
|
Msg: &hs0{42}, |
||||||
|
Peer: a, |
||||||
|
}, |
||||||
|
{ |
||||||
|
Code: 1, |
||||||
|
Msg: &hs0{42}, |
||||||
|
Peer: b, |
||||||
|
}, |
||||||
|
}, |
||||||
|
}, |
||||||
|
|
||||||
|
{Label: "alternative module handshake", Triggers: []p2ptest.Trigger{{Code: 1, Msg: &hs0{41}, Peer: a}, |
||||||
|
{Code: 1, Msg: &hs0{41}, Peer: b}}}, |
||||||
|
{Label: "repeated module handshake", Triggers: []p2ptest.Trigger{{Code: 1, Msg: &hs0{1}, Peer: a}}}, |
||||||
|
{Label: "receiving repeated module handshake", Expects: []p2ptest.Expect{{Code: 1, Msg: &hs0{43}, Peer: a}}}} |
||||||
|
} |
||||||
|
|
||||||
|
func runMultiplePeers(t *testing.T, peer int, errs ...error) { |
||||||
|
pp := p2ptest.NewTestPeerPool() |
||||||
|
s := protocolTester(t, pp) |
||||||
|
|
||||||
|
if err := s.TestExchanges(testMultiPeerSetup(s.IDs[0], s.IDs[1])...); err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
// after some exchanges of messages, we can test state changes
|
||||||
|
// here this is simply demonstrated by the peerPool
|
||||||
|
// after the handshake negotiations peers must be added to the pool
|
||||||
|
// time.Sleep(1)
|
||||||
|
tick := time.NewTicker(10 * time.Millisecond) |
||||||
|
timeout := time.NewTimer(1 * time.Second) |
||||||
|
WAIT: |
||||||
|
for { |
||||||
|
select { |
||||||
|
case <-tick.C: |
||||||
|
if pp.Has(s.IDs[0]) { |
||||||
|
break WAIT |
||||||
|
} |
||||||
|
case <-timeout.C: |
||||||
|
t.Fatal("timeout") |
||||||
|
} |
||||||
|
} |
||||||
|
if !pp.Has(s.IDs[1]) { |
||||||
|
t.Fatalf("missing peer test-1: %v (%v)", pp, s.IDs) |
||||||
|
} |
||||||
|
|
||||||
|
// peer 0 sends kill request for peer with index <peer>
|
||||||
|
err := s.TestExchanges(p2ptest.Exchange{ |
||||||
|
Triggers: []p2ptest.Trigger{ |
||||||
|
{ |
||||||
|
Code: 2, |
||||||
|
Msg: &kill{s.IDs[peer]}, |
||||||
|
Peer: s.IDs[0], |
||||||
|
}, |
||||||
|
}, |
||||||
|
}) |
||||||
|
|
||||||
|
if err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
|
||||||
|
// the peer not killed sends a drop request
|
||||||
|
err = s.TestExchanges(p2ptest.Exchange{ |
||||||
|
Triggers: []p2ptest.Trigger{ |
||||||
|
{ |
||||||
|
Code: 3, |
||||||
|
Msg: &drop{}, |
||||||
|
Peer: s.IDs[(peer+1)%2], |
||||||
|
}, |
||||||
|
}, |
||||||
|
}) |
||||||
|
|
||||||
|
if err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
|
||||||
|
// check the actual discconnect errors on the individual peers
|
||||||
|
var disconnects []*p2ptest.Disconnect |
||||||
|
for i, err := range errs { |
||||||
|
disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.IDs[i], Error: err}) |
||||||
|
} |
||||||
|
if err := s.TestDisconnected(disconnects...); err != nil { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
// test if disconnected peers have been removed from peerPool
|
||||||
|
if pp.Has(s.IDs[peer]) { |
||||||
|
t.Fatalf("peer test-%v not dropped: %v (%v)", peer, pp, s.IDs) |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
func TestMultiplePeersDropSelf(t *testing.T) { |
||||||
|
runMultiplePeers(t, 0, |
||||||
|
fmt.Errorf("subprotocol error"), |
||||||
|
fmt.Errorf("Message handler error: (msg code 3): dropped"), |
||||||
|
) |
||||||
|
} |
||||||
|
|
||||||
|
func TestMultiplePeersDropOther(t *testing.T) { |
||||||
|
runMultiplePeers(t, 1, |
||||||
|
fmt.Errorf("Message handler error: (msg code 3): dropped"), |
||||||
|
fmt.Errorf("subprotocol error"), |
||||||
|
) |
||||||
|
} |
@ -0,0 +1,67 @@ |
|||||||
|
// Copyright 2017 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package testing |
||||||
|
|
||||||
|
import ( |
||||||
|
"fmt" |
||||||
|
"sync" |
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/log" |
||||||
|
"github.com/ethereum/go-ethereum/p2p/discover" |
||||||
|
) |
||||||
|
|
||||||
|
type TestPeer interface { |
||||||
|
ID() discover.NodeID |
||||||
|
Drop(error) |
||||||
|
} |
||||||
|
|
||||||
|
// TestPeerPool is an example peerPool to demonstrate registration of peer connections
|
||||||
|
type TestPeerPool struct { |
||||||
|
lock sync.Mutex |
||||||
|
peers map[discover.NodeID]TestPeer |
||||||
|
} |
||||||
|
|
||||||
|
func NewTestPeerPool() *TestPeerPool { |
||||||
|
return &TestPeerPool{peers: make(map[discover.NodeID]TestPeer)} |
||||||
|
} |
||||||
|
|
||||||
|
func (self *TestPeerPool) Add(p TestPeer) { |
||||||
|
self.lock.Lock() |
||||||
|
defer self.lock.Unlock() |
||||||
|
log.Trace(fmt.Sprintf("pp add peer %v", p.ID())) |
||||||
|
self.peers[p.ID()] = p |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
func (self *TestPeerPool) Remove(p TestPeer) { |
||||||
|
self.lock.Lock() |
||||||
|
defer self.lock.Unlock() |
||||||
|
delete(self.peers, p.ID()) |
||||||
|
} |
||||||
|
|
||||||
|
func (self *TestPeerPool) Has(id discover.NodeID) bool { |
||||||
|
self.lock.Lock() |
||||||
|
defer self.lock.Unlock() |
||||||
|
_, ok := self.peers[id] |
||||||
|
return ok |
||||||
|
} |
||||||
|
|
||||||
|
func (self *TestPeerPool) Get(id discover.NodeID) TestPeer { |
||||||
|
self.lock.Lock() |
||||||
|
defer self.lock.Unlock() |
||||||
|
return self.peers[id] |
||||||
|
} |
@ -0,0 +1,280 @@ |
|||||||
|
// Copyright 2017 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package testing |
||||||
|
|
||||||
|
import ( |
||||||
|
"errors" |
||||||
|
"fmt" |
||||||
|
"sync" |
||||||
|
"time" |
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/log" |
||||||
|
"github.com/ethereum/go-ethereum/p2p" |
||||||
|
"github.com/ethereum/go-ethereum/p2p/discover" |
||||||
|
"github.com/ethereum/go-ethereum/p2p/simulations/adapters" |
||||||
|
) |
||||||
|
|
||||||
|
var errTimedOut = errors.New("timed out") |
||||||
|
|
||||||
|
// ProtocolSession is a quasi simulation of a pivot node running
|
||||||
|
// a service and a number of dummy peers that can send (trigger) or
|
||||||
|
// receive (expect) messages
|
||||||
|
type ProtocolSession struct { |
||||||
|
Server *p2p.Server |
||||||
|
IDs []discover.NodeID |
||||||
|
adapter *adapters.SimAdapter |
||||||
|
events chan *p2p.PeerEvent |
||||||
|
} |
||||||
|
|
||||||
|
// Exchange is the basic units of protocol tests
|
||||||
|
// the triggers and expects in the arrays are run immediately and asynchronously
|
||||||
|
// thus one cannot have multiple expects for the SAME peer with DIFFERENT message types
|
||||||
|
// because it's unpredictable which expect will receive which message
|
||||||
|
// (with expect #1 and #2, messages might be sent #2 and #1, and both expects will complain about wrong message code)
|
||||||
|
// an exchange is defined on a session
|
||||||
|
type Exchange struct { |
||||||
|
Label string |
||||||
|
Triggers []Trigger |
||||||
|
Expects []Expect |
||||||
|
Timeout time.Duration |
||||||
|
} |
||||||
|
|
||||||
|
// Trigger is part of the exchange, incoming message for the pivot node
|
||||||
|
// sent by a peer
|
||||||
|
type Trigger struct { |
||||||
|
Msg interface{} // type of message to be sent
|
||||||
|
Code uint64 // code of message is given
|
||||||
|
Peer discover.NodeID // the peer to send the message to
|
||||||
|
Timeout time.Duration // timeout duration for the sending
|
||||||
|
} |
||||||
|
|
||||||
|
// Expect is part of an exchange, outgoing message from the pivot node
|
||||||
|
// received by a peer
|
||||||
|
type Expect struct { |
||||||
|
Msg interface{} // type of message to expect
|
||||||
|
Code uint64 // code of message is now given
|
||||||
|
Peer discover.NodeID // the peer that expects the message
|
||||||
|
Timeout time.Duration // timeout duration for receiving
|
||||||
|
} |
||||||
|
|
||||||
|
// Disconnect represents a disconnect event, used and checked by TestDisconnected
|
||||||
|
type Disconnect struct { |
||||||
|
Peer discover.NodeID // discconnected peer
|
||||||
|
Error error // disconnect reason
|
||||||
|
} |
||||||
|
|
||||||
|
// trigger sends messages from peers
|
||||||
|
func (self *ProtocolSession) trigger(trig Trigger) error { |
||||||
|
simNode, ok := self.adapter.GetNode(trig.Peer) |
||||||
|
if !ok { |
||||||
|
return fmt.Errorf("trigger: peer %v does not exist (1- %v)", trig.Peer, len(self.IDs)) |
||||||
|
} |
||||||
|
mockNode, ok := simNode.Services()[0].(*mockNode) |
||||||
|
if !ok { |
||||||
|
return fmt.Errorf("trigger: peer %v is not a mock", trig.Peer) |
||||||
|
} |
||||||
|
|
||||||
|
errc := make(chan error) |
||||||
|
|
||||||
|
go func() { |
||||||
|
errc <- mockNode.Trigger(&trig) |
||||||
|
}() |
||||||
|
|
||||||
|
t := trig.Timeout |
||||||
|
if t == time.Duration(0) { |
||||||
|
t = 1000 * time.Millisecond |
||||||
|
} |
||||||
|
select { |
||||||
|
case err := <-errc: |
||||||
|
return err |
||||||
|
case <-time.After(t): |
||||||
|
return fmt.Errorf("timout expecting %v to send to peer %v", trig.Msg, trig.Peer) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// expect checks an expectation of a message sent out by the pivot node
|
||||||
|
func (self *ProtocolSession) expect(exps []Expect) error { |
||||||
|
// construct a map of expectations for each node
|
||||||
|
peerExpects := make(map[discover.NodeID][]Expect) |
||||||
|
for _, exp := range exps { |
||||||
|
if exp.Msg == nil { |
||||||
|
return errors.New("no message to expect") |
||||||
|
} |
||||||
|
peerExpects[exp.Peer] = append(peerExpects[exp.Peer], exp) |
||||||
|
} |
||||||
|
|
||||||
|
// construct a map of mockNodes for each node
|
||||||
|
mockNodes := make(map[discover.NodeID]*mockNode) |
||||||
|
for nodeID := range peerExpects { |
||||||
|
simNode, ok := self.adapter.GetNode(nodeID) |
||||||
|
if !ok { |
||||||
|
return fmt.Errorf("trigger: peer %v does not exist (1- %v)", nodeID, len(self.IDs)) |
||||||
|
} |
||||||
|
mockNode, ok := simNode.Services()[0].(*mockNode) |
||||||
|
if !ok { |
||||||
|
return fmt.Errorf("trigger: peer %v is not a mock", nodeID) |
||||||
|
} |
||||||
|
mockNodes[nodeID] = mockNode |
||||||
|
} |
||||||
|
|
||||||
|
// done chanell cancels all created goroutines when function returns
|
||||||
|
done := make(chan struct{}) |
||||||
|
defer close(done) |
||||||
|
// errc catches the first error from
|
||||||
|
errc := make(chan error) |
||||||
|
|
||||||
|
wg := &sync.WaitGroup{} |
||||||
|
wg.Add(len(mockNodes)) |
||||||
|
for nodeID, mockNode := range mockNodes { |
||||||
|
nodeID := nodeID |
||||||
|
mockNode := mockNode |
||||||
|
go func() { |
||||||
|
defer wg.Done() |
||||||
|
|
||||||
|
// Sum all Expect timeouts to give the maximum
|
||||||
|
// time for all expectations to finish.
|
||||||
|
// mockNode.Expect checks all received messages against
|
||||||
|
// a list of expected messages and timeout for each
|
||||||
|
// of them can not be checked separately.
|
||||||
|
var t time.Duration |
||||||
|
for _, exp := range peerExpects[nodeID] { |
||||||
|
if exp.Timeout == time.Duration(0) { |
||||||
|
t += 2000 * time.Millisecond |
||||||
|
} else { |
||||||
|
t += exp.Timeout |
||||||
|
} |
||||||
|
} |
||||||
|
alarm := time.NewTimer(t) |
||||||
|
defer alarm.Stop() |
||||||
|
|
||||||
|
// expectErrc is used to check if error returned
|
||||||
|
// from mockNode.Expect is not nil and to send it to
|
||||||
|
// errc only in that case.
|
||||||
|
// done channel will be closed when function
|
||||||
|
expectErrc := make(chan error) |
||||||
|
go func() { |
||||||
|
select { |
||||||
|
case expectErrc <- mockNode.Expect(peerExpects[nodeID]...): |
||||||
|
case <-done: |
||||||
|
case <-alarm.C: |
||||||
|
} |
||||||
|
}() |
||||||
|
|
||||||
|
select { |
||||||
|
case err := <-expectErrc: |
||||||
|
if err != nil { |
||||||
|
select { |
||||||
|
case errc <- err: |
||||||
|
case <-done: |
||||||
|
case <-alarm.C: |
||||||
|
errc <- errTimedOut |
||||||
|
} |
||||||
|
} |
||||||
|
case <-done: |
||||||
|
case <-alarm.C: |
||||||
|
errc <- errTimedOut |
||||||
|
} |
||||||
|
|
||||||
|
}() |
||||||
|
} |
||||||
|
|
||||||
|
go func() { |
||||||
|
wg.Wait() |
||||||
|
// close errc when all goroutines finish to return nill err from errc
|
||||||
|
close(errc) |
||||||
|
}() |
||||||
|
|
||||||
|
return <-errc |
||||||
|
} |
||||||
|
|
||||||
|
// TestExchanges tests a series of exchanges against the session
|
||||||
|
func (self *ProtocolSession) TestExchanges(exchanges ...Exchange) error { |
||||||
|
for i, e := range exchanges { |
||||||
|
if err := self.testExchange(e); err != nil { |
||||||
|
return fmt.Errorf("exchange #%d %q: %v", i, e.Label, err) |
||||||
|
} |
||||||
|
log.Trace(fmt.Sprintf("exchange #%d %q: run successfully", i, e.Label)) |
||||||
|
} |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
// testExchange tests a single Exchange.
|
||||||
|
// Default timeout value is 2 seconds.
|
||||||
|
func (self *ProtocolSession) testExchange(e Exchange) error { |
||||||
|
errc := make(chan error) |
||||||
|
done := make(chan struct{}) |
||||||
|
defer close(done) |
||||||
|
|
||||||
|
go func() { |
||||||
|
for _, trig := range e.Triggers { |
||||||
|
err := self.trigger(trig) |
||||||
|
if err != nil { |
||||||
|
errc <- err |
||||||
|
return |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
select { |
||||||
|
case errc <- self.expect(e.Expects): |
||||||
|
case <-done: |
||||||
|
} |
||||||
|
}() |
||||||
|
|
||||||
|
// time out globally or finish when all expectations satisfied
|
||||||
|
t := e.Timeout |
||||||
|
if t == 0 { |
||||||
|
t = 2000 * time.Millisecond |
||||||
|
} |
||||||
|
alarm := time.NewTimer(t) |
||||||
|
select { |
||||||
|
case err := <-errc: |
||||||
|
return err |
||||||
|
case <-alarm.C: |
||||||
|
return errTimedOut |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// TestDisconnected tests the disconnections given as arguments
|
||||||
|
// the disconnect structs describe what disconnect error is expected on which peer
|
||||||
|
func (self *ProtocolSession) TestDisconnected(disconnects ...*Disconnect) error { |
||||||
|
expects := make(map[discover.NodeID]error) |
||||||
|
for _, disconnect := range disconnects { |
||||||
|
expects[disconnect.Peer] = disconnect.Error |
||||||
|
} |
||||||
|
|
||||||
|
timeout := time.After(time.Second) |
||||||
|
for len(expects) > 0 { |
||||||
|
select { |
||||||
|
case event := <-self.events: |
||||||
|
if event.Type != p2p.PeerEventTypeDrop { |
||||||
|
continue |
||||||
|
} |
||||||
|
expectErr, ok := expects[event.Peer] |
||||||
|
if !ok { |
||||||
|
continue |
||||||
|
} |
||||||
|
|
||||||
|
if !(expectErr == nil && event.Error == "" || expectErr != nil && expectErr.Error() == event.Error) { |
||||||
|
return fmt.Errorf("unexpected error on peer %v. expected '%v', got '%v'", event.Peer, expectErr, event.Error) |
||||||
|
} |
||||||
|
delete(expects, event.Peer) |
||||||
|
case <-timeout: |
||||||
|
return fmt.Errorf("timed out waiting for peers to disconnect") |
||||||
|
} |
||||||
|
} |
||||||
|
return nil |
||||||
|
} |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue