p2p/simulations: fix a deadlock and clean up adapters (#17891)

This fixes a rare deadlock with the inproc adapter:

- A node is stopped, which acquires Network.lock.
- The protocol code being simulated (swarm/network in my case)
  waits for its goroutines to shut down.
- One of those goroutines calls into the simulation to add a peer,
  which waits for Network.lock.

The fix for the deadlock is really simple, just release the lock
before stopping the simulation node.

Other changes in this PR clean up the exec adapter so it reports
node startup errors better and remove the docker adapter because
it just adds overhead.

In the exec adapter, node information is now posted to a one-shot
server. This avoids log parsing and allows reporting startup
errors to the simulation host.

A small change in package node was needed because simulation
nodes use port zero. Node.{HTTP,WS}Endpoint now return the live
endpoints after startup by checking the TCP listener.
pull/17872/head
Felix Lange 6 years ago committed by GitHub
parent f951e23fb5
commit dcae0d348b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      node/node.go
  2. 12
      p2p/simulations/README.md
  3. 190
      p2p/simulations/adapters/docker.go
  4. 225
      p2p/simulations/adapters/exec.go
  5. 51
      p2p/simulations/adapters/ws.go
  6. 21
      p2p/simulations/adapters/ws_test.go
  7. 8
      p2p/simulations/examples/ping-pong.go
  8. 44
      p2p/simulations/network.go
  9. 19
      swarm/network/simulations/discovery/discovery_test.go

@ -549,11 +549,23 @@ func (n *Node) IPCEndpoint() string {
// HTTPEndpoint retrieves the current HTTP endpoint used by the protocol stack. // HTTPEndpoint retrieves the current HTTP endpoint used by the protocol stack.
func (n *Node) HTTPEndpoint() string { func (n *Node) HTTPEndpoint() string {
n.lock.Lock()
defer n.lock.Unlock()
if n.httpListener != nil {
return n.httpListener.Addr().String()
}
return n.httpEndpoint return n.httpEndpoint
} }
// WSEndpoint retrieves the current WS endpoint used by the protocol stack. // WSEndpoint retrieves the current WS endpoint used by the protocol stack.
func (n *Node) WSEndpoint() string { func (n *Node) WSEndpoint() string {
n.lock.Lock()
defer n.lock.Unlock()
if n.wsListener != nil {
return n.wsListener.Addr().String()
}
return n.wsEndpoint return n.wsEndpoint
} }

@ -63,18 +63,6 @@ using the devp2p node stack rather than executing `main()`.
The nodes listen for devp2p connections and WebSocket RPC clients on random The nodes listen for devp2p connections and WebSocket RPC clients on random
localhost ports. localhost ports.
### DockerAdapter
The `DockerAdapter` is similar to the `ExecAdapter` but executes `docker run`
to run the node in a Docker container using a Docker image containing the
simulation binary at `/bin/p2p-node`.
The Docker image is built using `docker build` when the adapter is initialised,
meaning no prior setup is necessary other than having a working Docker client.
Each node listens on the external IP of the container and the default p2p and
RPC ports (`30303` and `8546` respectively).
## Network ## Network
A simulation network is created with an ID and default service (which is used A simulation network is created with an ID and default service (which is used

@ -1,190 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package adapters
import (
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"github.com/docker/docker/pkg/reexec"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
)
var (
ErrLinuxOnly = errors.New("DockerAdapter can only be used on Linux as it uses the current binary (which must be a Linux binary)")
)
// DockerAdapter is a NodeAdapter which runs simulation nodes inside Docker
// containers.
//
// A Docker image is built which contains the current binary at /bin/p2p-node
// which when executed runs the underlying service (see the description
// of the execP2PNode function for more details)
type DockerAdapter struct {
ExecAdapter
}
// NewDockerAdapter builds the p2p-node Docker image containing the current
// binary and returns a DockerAdapter
func NewDockerAdapter() (*DockerAdapter, error) {
// Since Docker containers run on Linux and this adapter runs the
// current binary in the container, it must be compiled for Linux.
//
// It is reasonable to require this because the caller can just
// compile the current binary in a Docker container.
if runtime.GOOS != "linux" {
return nil, ErrLinuxOnly
}
if err := buildDockerImage(); err != nil {
return nil, err
}
return &DockerAdapter{
ExecAdapter{
nodes: make(map[enode.ID]*ExecNode),
},
}, nil
}
// Name returns the name of the adapter for logging purposes
func (d *DockerAdapter) Name() string {
return "docker-adapter"
}
// NewNode returns a new DockerNode using the given config
func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) {
if len(config.Services) == 0 {
return nil, errors.New("node must have at least one service")
}
for _, service := range config.Services {
if _, exists := serviceFuncs[service]; !exists {
return nil, fmt.Errorf("unknown node service %q", service)
}
}
// generate the config
conf := &execNodeConfig{
Stack: node.DefaultConfig,
Node: config,
}
conf.Stack.DataDir = "/data"
conf.Stack.WSHost = "0.0.0.0"
conf.Stack.WSOrigins = []string{"*"}
conf.Stack.WSExposeAll = true
conf.Stack.P2P.EnableMsgEvents = false
conf.Stack.P2P.NoDiscovery = true
conf.Stack.P2P.NAT = nil
conf.Stack.NoUSB = true
// listen on all interfaces on a given port, which we set when we
// initialise NodeConfig (usually a random port)
conf.Stack.P2P.ListenAddr = fmt.Sprintf(":%d", config.Port)
node := &DockerNode{
ExecNode: ExecNode{
ID: config.ID,
Config: conf,
adapter: &d.ExecAdapter,
},
}
node.newCmd = node.dockerCommand
d.ExecAdapter.nodes[node.ID] = &node.ExecNode
return node, nil
}
// DockerNode wraps an ExecNode but exec's the current binary in a docker
// container rather than locally
type DockerNode struct {
ExecNode
}
// dockerCommand returns a command which exec's the binary in a Docker
// container.
//
// It uses a shell so that we can pass the _P2P_NODE_CONFIG environment
// variable to the container using the --env flag.
func (n *DockerNode) dockerCommand() *exec.Cmd {
return exec.Command(
"sh", "-c",
fmt.Sprintf(
`exec docker run --interactive --env _P2P_NODE_CONFIG="${_P2P_NODE_CONFIG}" %s p2p-node %s %s`,
dockerImage, strings.Join(n.Config.Node.Services, ","), n.ID.String(),
),
)
}
// dockerImage is the name of the Docker image which gets built to run the
// simulation node
const dockerImage = "p2p-node"
// buildDockerImage builds the Docker image which is used to run the simulation
// node in a Docker container.
//
// It adds the current binary as "p2p-node" so that it runs execP2PNode
// when executed.
func buildDockerImage() error {
// create a directory to use as the build context
dir, err := ioutil.TempDir("", "p2p-docker")
if err != nil {
return err
}
defer os.RemoveAll(dir)
// copy the current binary into the build context
bin, err := os.Open(reexec.Self())
if err != nil {
return err
}
defer bin.Close()
dst, err := os.OpenFile(filepath.Join(dir, "self.bin"), os.O_WRONLY|os.O_CREATE, 0755)
if err != nil {
return err
}
defer dst.Close()
if _, err := io.Copy(dst, bin); err != nil {
return err
}
// create the Dockerfile
dockerfile := []byte(`
FROM ubuntu:16.04
RUN mkdir /data
ADD self.bin /bin/p2p-node
`)
if err := ioutil.WriteFile(filepath.Join(dir, "Dockerfile"), dockerfile, 0644); err != nil {
return err
}
// run 'docker build'
cmd := exec.Command("docker", "build", "-t", dockerImage, dir)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
return fmt.Errorf("error building docker image: %s", err)
}
return nil
}

@ -17,7 +17,7 @@
package adapters package adapters
import ( import (
"bufio" "bytes"
"context" "context"
"crypto/ecdsa" "crypto/ecdsa"
"encoding/json" "encoding/json"
@ -25,6 +25,7 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"net/http"
"os" "os"
"os/exec" "os/exec"
"os/signal" "os/signal"
@ -43,12 +44,14 @@ import (
"golang.org/x/net/websocket" "golang.org/x/net/websocket"
) )
// ExecAdapter is a NodeAdapter which runs simulation nodes by executing the func init() {
// current binary as a child process. // Register a reexec function to start a simulation node when the current binary is
// // executed as "p2p-node" (rather than whataver the main() function would normally do).
// An init hook is used so that the child process executes the node services reexec.Register("p2p-node", execP2PNode)
// (rather than whataver the main() function would normally do), see the }
// execP2PNode function for more information.
// ExecAdapter is a NodeAdapter which runs simulation nodes by executing the current binary
// as a child process.
type ExecAdapter struct { type ExecAdapter struct {
// BaseDir is the directory under which the data directories for each // BaseDir is the directory under which the data directories for each
// simulation node are created. // simulation node are created.
@ -150,15 +153,13 @@ func (n *ExecNode) Client() (*rpc.Client, error) {
} }
// Start exec's the node passing the ID and service as command line arguments // Start exec's the node passing the ID and service as command line arguments
// and the node config encoded as JSON in the _P2P_NODE_CONFIG environment // and the node config encoded as JSON in an environment variable.
// variable
func (n *ExecNode) Start(snapshots map[string][]byte) (err error) { func (n *ExecNode) Start(snapshots map[string][]byte) (err error) {
if n.Cmd != nil { if n.Cmd != nil {
return errors.New("already started") return errors.New("already started")
} }
defer func() { defer func() {
if err != nil { if err != nil {
log.Error("node failed to start", "err", err)
n.Stop() n.Stop()
} }
}() }()
@ -175,59 +176,78 @@ func (n *ExecNode) Start(snapshots map[string][]byte) (err error) {
return fmt.Errorf("error generating node config: %s", err) return fmt.Errorf("error generating node config: %s", err)
} }
// use a pipe for stderr so we can both copy the node's stderr to // start the one-shot server that waits for startup information
// os.Stderr and read the WebSocket address from the logs ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
stderrR, stderrW := io.Pipe() defer cancel()
stderr := io.MultiWriter(os.Stderr, stderrW) statusURL, statusC := n.waitForStartupJSON(ctx)
// start the node // start the node
cmd := n.newCmd() cmd := n.newCmd()
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
cmd.Stderr = stderr cmd.Stderr = os.Stderr
cmd.Env = append(os.Environ(), fmt.Sprintf("_P2P_NODE_CONFIG=%s", confData)) cmd.Env = append(os.Environ(),
envStatusURL+"="+statusURL,
envNodeConfig+"="+string(confData),
)
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
return fmt.Errorf("error starting node: %s", err) return fmt.Errorf("error starting node: %s", err)
} }
n.Cmd = cmd n.Cmd = cmd
// read the WebSocket address from the stderr logs // read the WebSocket address from the stderr logs
var wsAddr string status := <-statusC
wsAddrC := make(chan string) if status.Err != "" {
go func() { return errors.New(status.Err)
s := bufio.NewScanner(stderrR)
for s.Scan() {
if strings.Contains(s.Text(), "WebSocket endpoint opened") {
wsAddrC <- wsAddrPattern.FindString(s.Text())
}
}
}()
select {
case wsAddr = <-wsAddrC:
if wsAddr == "" {
return errors.New("failed to read WebSocket address from stderr")
}
case <-time.After(10 * time.Second):
return errors.New("timed out waiting for WebSocket address on stderr")
} }
client, err := rpc.DialWebsocket(ctx, status.WSEndpoint, "http://localhost")
// create the RPC client and load the node info
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client, err := rpc.DialWebsocket(ctx, wsAddr, "")
if err != nil { if err != nil {
return fmt.Errorf("error dialing rpc websocket: %s", err) return fmt.Errorf("can't connect to RPC server: %v", err)
} }
var info p2p.NodeInfo
if err := client.CallContext(ctx, &info, "admin_nodeInfo"); err != nil {
return fmt.Errorf("error getting node info: %s", err)
}
n.client = client
n.wsAddr = wsAddr
n.Info = &info
// node ready :)
n.client = client
n.wsAddr = status.WSEndpoint
n.Info = status.NodeInfo
return nil return nil
} }
// waitForStartupJSON runs a one-shot HTTP server to receive a startup report.
func (n *ExecNode) waitForStartupJSON(ctx context.Context) (string, chan nodeStartupJSON) {
var (
ch = make(chan nodeStartupJSON, 1)
quitOnce sync.Once
srv http.Server
)
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
ch <- nodeStartupJSON{Err: err.Error()}
return "", ch
}
quit := func(status nodeStartupJSON) {
quitOnce.Do(func() {
l.Close()
ch <- status
})
}
srv.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var status nodeStartupJSON
if err := json.NewDecoder(r.Body).Decode(&status); err != nil {
status.Err = fmt.Sprintf("can't decode startup report: %v", err)
}
quit(status)
})
// Run the HTTP server, but don't wait forever and shut it down
// if the context is canceled.
go srv.Serve(l)
go func() {
<-ctx.Done()
quit(nodeStartupJSON{Err: "didn't get startup report"})
}()
url := "http://" + l.Addr().String()
return url, ch
}
// execCommand returns a command which runs the node locally by exec'ing // execCommand returns a command which runs the node locally by exec'ing
// the current binary but setting argv[0] to "p2p-node" so that the child // the current binary but setting argv[0] to "p2p-node" so that the child
// runs execP2PNode // runs execP2PNode
@ -318,12 +338,6 @@ func (n *ExecNode) Snapshots() (map[string][]byte, error) {
return snapshots, n.client.Call(&snapshots, "simulation_snapshot") return snapshots, n.client.Call(&snapshots, "simulation_snapshot")
} }
func init() {
// register a reexec function to start a devp2p node when the current
// binary is executed as "p2p-node"
reexec.Register("p2p-node", execP2PNode)
}
// execNodeConfig is used to serialize the node configuration so it can be // execNodeConfig is used to serialize the node configuration so it can be
// passed to the child process as a JSON encoded environment variable // passed to the child process as a JSON encoded environment variable
type execNodeConfig struct { type execNodeConfig struct {
@ -333,55 +347,69 @@ type execNodeConfig struct {
PeerAddrs map[string]string `json:"peer_addrs,omitempty"` PeerAddrs map[string]string `json:"peer_addrs,omitempty"`
} }
// ExternalIP gets an external IP address so that Enode URL is usable // execP2PNode starts a simulation node when the current binary is executed with
func ExternalIP() net.IP {
addrs, err := net.InterfaceAddrs()
if err != nil {
log.Crit("error getting IP address", "err", err)
}
for _, addr := range addrs {
if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() && !ip.IP.IsLinkLocalUnicast() {
return ip.IP
}
}
log.Warn("unable to determine explicit IP address, falling back to loopback")
return net.IP{127, 0, 0, 1}
}
// execP2PNode starts a devp2p node when the current binary is executed with
// argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2] // argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2]
// and the node config from the _P2P_NODE_CONFIG environment variable // and the node config from an environment variable.
func execP2PNode() { func execP2PNode() {
glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.LogfmtFormat())) glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.LogfmtFormat()))
glogger.Verbosity(log.LvlInfo) glogger.Verbosity(log.LvlInfo)
log.Root().SetHandler(glogger) log.Root().SetHandler(glogger)
statusURL := os.Getenv(envStatusURL)
if statusURL == "" {
log.Crit("missing " + envStatusURL)
}
// Start the node and gather startup report.
var status nodeStartupJSON
stack, stackErr := startExecNodeStack()
if stackErr != nil {
status.Err = stackErr.Error()
} else {
status.WSEndpoint = "ws://" + stack.WSEndpoint()
status.NodeInfo = stack.Server().NodeInfo()
}
// Send status to the host.
statusJSON, _ := json.Marshal(status)
if _, err := http.Post(statusURL, "application/json", bytes.NewReader(statusJSON)); err != nil {
log.Crit("Can't post startup info", "url", statusURL, "err", err)
}
if stackErr != nil {
os.Exit(1)
}
// Stop the stack if we get a SIGTERM signal.
go func() {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGTERM)
defer signal.Stop(sigc)
<-sigc
log.Info("Received SIGTERM, shutting down...")
stack.Stop()
}()
stack.Wait() // Wait for the stack to exit.
}
func startExecNodeStack() (*node.Node, error) {
// read the services from argv // read the services from argv
serviceNames := strings.Split(os.Args[1], ",") serviceNames := strings.Split(os.Args[1], ",")
// decode the config // decode the config
confEnv := os.Getenv("_P2P_NODE_CONFIG") confEnv := os.Getenv(envNodeConfig)
if confEnv == "" { if confEnv == "" {
log.Crit("missing _P2P_NODE_CONFIG") return nil, fmt.Errorf("missing " + envNodeConfig)
} }
var conf execNodeConfig var conf execNodeConfig
if err := json.Unmarshal([]byte(confEnv), &conf); err != nil { if err := json.Unmarshal([]byte(confEnv), &conf); err != nil {
log.Crit("error decoding _P2P_NODE_CONFIG", "err", err) return nil, fmt.Errorf("error decoding %s: %v", envNodeConfig, err)
} }
conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey
conf.Stack.Logger = log.New("node.id", conf.Node.ID.String()) conf.Stack.Logger = log.New("node.id", conf.Node.ID.String())
if strings.HasPrefix(conf.Stack.P2P.ListenAddr, ":") {
conf.Stack.P2P.ListenAddr = ExternalIP().String() + conf.Stack.P2P.ListenAddr
}
if conf.Stack.WSHost == "0.0.0.0" {
conf.Stack.WSHost = ExternalIP().String()
}
// initialize the devp2p stack // initialize the devp2p stack
stack, err := node.New(&conf.Stack) stack, err := node.New(&conf.Stack)
if err != nil { if err != nil {
log.Crit("error creating node stack", "err", err) return nil, fmt.Errorf("error creating node stack: %v", err)
} }
// register the services, collecting them into a map so we can wrap // register the services, collecting them into a map so we can wrap
@ -390,7 +418,7 @@ func execP2PNode() {
for _, name := range serviceNames { for _, name := range serviceNames {
serviceFunc, exists := serviceFuncs[name] serviceFunc, exists := serviceFuncs[name]
if !exists { if !exists {
log.Crit("unknown node service", "name", name) return nil, fmt.Errorf("unknown node service %q", err)
} }
constructor := func(nodeCtx *node.ServiceContext) (node.Service, error) { constructor := func(nodeCtx *node.ServiceContext) (node.Service, error) {
ctx := &ServiceContext{ ctx := &ServiceContext{
@ -409,34 +437,35 @@ func execP2PNode() {
return service, nil return service, nil
} }
if err := stack.Register(constructor); err != nil { if err := stack.Register(constructor); err != nil {
log.Crit("error starting service", "name", name, "err", err) return stack, fmt.Errorf("error registering service %q: %v", name, err)
} }
} }
// register the snapshot service // register the snapshot service
if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
return &snapshotService{services}, nil return &snapshotService{services}, nil
}); err != nil { })
log.Crit("error starting snapshot service", "err", err) if err != nil {
return stack, fmt.Errorf("error starting snapshot service: %v", err)
} }
// start the stack // start the stack
if err := stack.Start(); err != nil { if err = stack.Start(); err != nil {
log.Crit("error stating node stack", "err", err) err = fmt.Errorf("error starting stack: %v", err)
} }
return stack, err
}
// stop the stack if we get a SIGTERM signal const (
go func() { envStatusURL = "_P2P_STATUS_URL"
sigc := make(chan os.Signal, 1) envNodeConfig = "_P2P_NODE_CONFIG"
signal.Notify(sigc, syscall.SIGTERM) )
defer signal.Stop(sigc)
<-sigc
log.Info("Received SIGTERM, shutting down...")
stack.Stop()
}()
// wait for the stack to exit // nodeStartupJSON is sent to the simulation host after startup.
stack.Wait() type nodeStartupJSON struct {
Err string
WSEndpoint string
NodeInfo *p2p.NodeInfo
} }
// snapshotService is a node.Service which wraps a list of services and // snapshotService is a node.Service which wraps a list of services and

@ -1,51 +0,0 @@
package adapters
import (
"bufio"
"errors"
"io"
"regexp"
"strings"
"time"
)
// wsAddrPattern is a regex used to read the WebSocket address from the node's
// log
var wsAddrPattern = regexp.MustCompile(`ws://[\d.:]+`)
func matchWSAddr(str string) (string, bool) {
if !strings.Contains(str, "WebSocket endpoint opened") {
return "", false
}
return wsAddrPattern.FindString(str), true
}
// findWSAddr scans through reader r, looking for the log entry with
// WebSocket address information.
func findWSAddr(r io.Reader, timeout time.Duration) (string, error) {
ch := make(chan string)
go func() {
s := bufio.NewScanner(r)
for s.Scan() {
addr, ok := matchWSAddr(s.Text())
if ok {
ch <- addr
}
}
close(ch)
}()
var wsAddr string
select {
case wsAddr = <-ch:
if wsAddr == "" {
return "", errors.New("empty result")
}
case <-time.After(timeout):
return "", errors.New("timed out")
}
return wsAddr, nil
}

@ -1,21 +0,0 @@
package adapters
import (
"bytes"
"testing"
"time"
)
func TestFindWSAddr(t *testing.T) {
line := `t=2018-05-02T19:00:45+0200 lvl=info msg="WebSocket endpoint opened" node.id=26c65a606d1125a44695bc08573190d047152b6b9a776ccbbe593e90f91444d9c1ebdadac6a775ad9fdd0923468a1d698ed3a842c1fb89c1bc0f9d4801f8c39c url=ws://127.0.0.1:59975`
buf := bytes.NewBufferString(line)
got, err := findWSAddr(buf, 10*time.Second)
if err != nil {
t.Fatalf("Failed to find addr: %v", err)
}
expected := `ws://127.0.0.1:59975`
if got != expected {
t.Fatalf("Expected to get '%s', but got '%s'", expected, got)
}
}

@ -70,14 +70,6 @@ func main() {
log.Info("using exec adapter", "tmpdir", tmpdir) log.Info("using exec adapter", "tmpdir", tmpdir)
adapter = adapters.NewExecAdapter(tmpdir) adapter = adapters.NewExecAdapter(tmpdir)
case "docker":
log.Info("using docker adapter")
var err error
adapter, err = adapters.NewDockerAdapter()
if err != nil {
log.Crit("error creating docker adapter", "err", err)
}
default: default:
log.Crit(fmt.Sprintf("unknown node adapter %q", *adapterType)) log.Crit(fmt.Sprintf("unknown node adapter %q", *adapterType))
} }

@ -116,7 +116,7 @@ func (net *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error)
Node: adapterNode, Node: adapterNode,
Config: conf, Config: conf,
} }
log.Trace(fmt.Sprintf("node %v created", conf.ID)) log.Trace("Node created", "id", conf.ID)
net.nodeMap[conf.ID] = len(net.Nodes) net.nodeMap[conf.ID] = len(net.Nodes)
net.Nodes = append(net.Nodes, node) net.Nodes = append(net.Nodes, node)
@ -167,6 +167,7 @@ func (net *Network) Start(id enode.ID) error {
func (net *Network) startWithSnapshots(id enode.ID, snapshots map[string][]byte) error { func (net *Network) startWithSnapshots(id enode.ID, snapshots map[string][]byte) error {
net.lock.Lock() net.lock.Lock()
defer net.lock.Unlock() defer net.lock.Unlock()
node := net.getNode(id) node := net.getNode(id)
if node == nil { if node == nil {
return fmt.Errorf("node %v does not exist", id) return fmt.Errorf("node %v does not exist", id)
@ -174,13 +175,13 @@ func (net *Network) startWithSnapshots(id enode.ID, snapshots map[string][]byte)
if node.Up { if node.Up {
return fmt.Errorf("node %v already up", id) return fmt.Errorf("node %v already up", id)
} }
log.Trace(fmt.Sprintf("starting node %v: %v using %v", id, node.Up, net.nodeAdapter.Name())) log.Trace("Starting node", "id", id, "adapter", net.nodeAdapter.Name())
if err := node.Start(snapshots); err != nil { if err := node.Start(snapshots); err != nil {
log.Warn(fmt.Sprintf("start up failed: %v", err)) log.Warn("Node startup failed", "id", id, "err", err)
return err return err
} }
node.Up = true node.Up = true
log.Info(fmt.Sprintf("started node %v: %v", id, node.Up)) log.Info("Started node", "id", id)
net.events.Send(NewEvent(node)) net.events.Send(NewEvent(node))
@ -209,7 +210,6 @@ func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub
defer net.lock.Unlock() defer net.lock.Unlock()
node := net.getNode(id) node := net.getNode(id)
if node == nil { if node == nil {
log.Error("Can not find node for id", "id", id)
return return
} }
node.Up = false node.Up = false
@ -240,7 +240,7 @@ func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub
case err := <-sub.Err(): case err := <-sub.Err():
if err != nil { if err != nil {
log.Error(fmt.Sprintf("error getting peer events for node %v", id), "err", err) log.Error("Error in peer event subscription", "id", id, "err", err)
} }
return return
} }
@ -250,7 +250,6 @@ func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub
// Stop stops the node with the given ID // Stop stops the node with the given ID
func (net *Network) Stop(id enode.ID) error { func (net *Network) Stop(id enode.ID) error {
net.lock.Lock() net.lock.Lock()
defer net.lock.Unlock()
node := net.getNode(id) node := net.getNode(id)
if node == nil { if node == nil {
return fmt.Errorf("node %v does not exist", id) return fmt.Errorf("node %v does not exist", id)
@ -258,12 +257,17 @@ func (net *Network) Stop(id enode.ID) error {
if !node.Up { if !node.Up {
return fmt.Errorf("node %v already down", id) return fmt.Errorf("node %v already down", id)
} }
if err := node.Stop(); err != nil {
return err
}
node.Up = false node.Up = false
log.Info(fmt.Sprintf("stop node %v: %v", id, node.Up)) net.lock.Unlock()
err := node.Stop()
if err != nil {
net.lock.Lock()
node.Up = true
net.lock.Unlock()
return err
}
log.Info("Stopped node", "id", id, "err", err)
net.events.Send(ControlEvent(node)) net.events.Send(ControlEvent(node))
return nil return nil
} }
@ -271,7 +275,7 @@ func (net *Network) Stop(id enode.ID) error {
// Connect connects two nodes together by calling the "admin_addPeer" RPC // Connect connects two nodes together by calling the "admin_addPeer" RPC
// method on the "one" node so that it connects to the "other" node // method on the "one" node so that it connects to the "other" node
func (net *Network) Connect(oneID, otherID enode.ID) error { func (net *Network) Connect(oneID, otherID enode.ID) error {
log.Debug(fmt.Sprintf("connecting %s to %s", oneID, otherID)) log.Debug("Connecting nodes with addPeer", "id", oneID, "other", otherID)
conn, err := net.InitConn(oneID, otherID) conn, err := net.InitConn(oneID, otherID)
if err != nil { if err != nil {
return err return err
@ -481,10 +485,10 @@ func (net *Network) InitConn(oneID, otherID enode.ID) (*Conn, error) {
err = conn.nodesUp() err = conn.nodesUp()
if err != nil { if err != nil {
log.Trace(fmt.Sprintf("nodes not up: %v", err)) log.Trace("Nodes not up", "err", err)
return nil, fmt.Errorf("nodes not up: %v", err) return nil, fmt.Errorf("nodes not up: %v", err)
} }
log.Debug("InitConn - connection initiated") log.Debug("Connection initiated", "id", oneID, "other", otherID)
conn.initiated = time.Now() conn.initiated = time.Now()
return conn, nil return conn, nil
} }
@ -492,9 +496,9 @@ func (net *Network) InitConn(oneID, otherID enode.ID) (*Conn, error) {
// Shutdown stops all nodes in the network and closes the quit channel // Shutdown stops all nodes in the network and closes the quit channel
func (net *Network) Shutdown() { func (net *Network) Shutdown() {
for _, node := range net.Nodes { for _, node := range net.Nodes {
log.Debug(fmt.Sprintf("stopping node %s", node.ID().TerminalString())) log.Debug("Stopping node", "id", node.ID())
if err := node.Stop(); err != nil { if err := node.Stop(); err != nil {
log.Warn(fmt.Sprintf("error stopping node %s", node.ID().TerminalString()), "err", err) log.Warn("Can't stop node", "id", node.ID(), "err", err)
} }
} }
close(net.quitc) close(net.quitc)
@ -708,18 +712,18 @@ func (net *Network) Subscribe(events chan *Event) {
} }
func (net *Network) executeControlEvent(event *Event) { func (net *Network) executeControlEvent(event *Event) {
log.Trace("execute control event", "type", event.Type, "event", event) log.Trace("Executing control event", "type", event.Type, "event", event)
switch event.Type { switch event.Type {
case EventTypeNode: case EventTypeNode:
if err := net.executeNodeEvent(event); err != nil { if err := net.executeNodeEvent(event); err != nil {
log.Error("error executing node event", "event", event, "err", err) log.Error("Error executing node event", "event", event, "err", err)
} }
case EventTypeConn: case EventTypeConn:
if err := net.executeConnEvent(event); err != nil { if err := net.executeConnEvent(event); err != nil {
log.Error("error executing conn event", "event", event, "err", err) log.Error("Error executing conn event", "event", event, "err", err)
} }
case EventTypeMsg: case EventTypeMsg:
log.Warn("ignoring control msg event") log.Warn("Ignoring control msg event")
} }
} }

@ -125,22 +125,6 @@ func BenchmarkDiscovery_64_4(b *testing.B) { benchmarkDiscovery(b, 64, 4) }
func BenchmarkDiscovery_128_4(b *testing.B) { benchmarkDiscovery(b, 128, 4) } func BenchmarkDiscovery_128_4(b *testing.B) { benchmarkDiscovery(b, 128, 4) }
func BenchmarkDiscovery_256_4(b *testing.B) { benchmarkDiscovery(b, 256, 4) } func BenchmarkDiscovery_256_4(b *testing.B) { benchmarkDiscovery(b, 256, 4) }
func TestDiscoverySimulationDockerAdapter(t *testing.T) {
testDiscoverySimulationDockerAdapter(t, *nodeCount, *initCount)
}
func testDiscoverySimulationDockerAdapter(t *testing.T, nodes, conns int) {
adapter, err := adapters.NewDockerAdapter()
if err != nil {
if err == adapters.ErrLinuxOnly {
t.Skip(err)
} else {
t.Fatal(err)
}
}
testDiscoverySimulation(t, nodes, conns, adapter)
}
func TestDiscoverySimulationExecAdapter(t *testing.T) { func TestDiscoverySimulationExecAdapter(t *testing.T) {
testDiscoverySimulationExecAdapter(t, *nodeCount, *initCount) testDiscoverySimulationExecAdapter(t, *nodeCount, *initCount)
} }
@ -545,8 +529,7 @@ func triggerChecks(trigger chan enode.ID, net *simulations.Network, id enode.ID)
} }
func newService(ctx *adapters.ServiceContext) (node.Service, error) { func newService(ctx *adapters.ServiceContext) (node.Service, error) {
node := enode.NewV4(&ctx.Config.PrivateKey.PublicKey, adapters.ExternalIP(), int(ctx.Config.Port), int(ctx.Config.Port)) addr := network.NewAddr(ctx.Config.Node())
addr := network.NewAddr(node)
kp := network.NewKadParams() kp := network.NewKadParams()
kp.MinProxBinSize = testMinProxBinSize kp.MinProxBinSize = testMinProxBinSize

Loading…
Cancel
Save