Merge netsim mig to master (#17241)

* swarm: merged stream-tests migration to develop

* swarm/network: expose simulation RandomUpNode to use in stream tests

* swarm/network: wait for subs in PeerEvents and fix stream.runSyncTest

* swarm: enforce waitkademlia for snapshot tests

* swarm: fixed syncer tests and snapshot_sync_test

* swarm: linting of simulation package

* swarm: address review comments

* swarm/network/stream: fix delivery_test bugs and refactor

* swarm/network/stream: addressed PR comments @janos

* swarm/network/stream: enforce waitKademlia, improve TestIntervals

* swarm/network/stream: TestIntervals not waiting for chunk to be stored
pull/17285/head
holisticode 6 years ago committed by Balint Gabor
parent 3ea8ac6a9a
commit d6efa69187
  1. 2
      swarm/network/simulation/bucket.go
  2. 4
      swarm/network/simulation/connect.go
  3. 11
      swarm/network/simulation/events.go
  4. 11
      swarm/network/simulation/http.go
  5. 7
      swarm/network/simulation/http_test.go
  6. 19
      swarm/network/simulation/node.go
  7. 2
      swarm/network/simulation/service.go
  8. 352
      swarm/network/stream/common_test.go
  9. 460
      swarm/network/stream/delivery_test.go
  10. 336
      swarm/network/stream/intervals_test.go
  11. 795
      swarm/network/stream/snapshot_retrieval_test.go
  12. 721
      swarm/network/stream/snapshot_sync_test.go
  13. 281
      swarm/network/stream/syncer_test.go
  14. 293
      swarm/network/stream/testing/testing.go

@ -43,7 +43,7 @@ func (s *Simulation) SetNodeItem(id discover.NodeID, key interface{}, value inte
s.buckets[id].Store(key, value)
}
// NodeItems returns a map of items from all nodes that are all set under the
// NodesItems returns a map of items from all nodes that are all set under the
// same BucketKey.
func (s *Simulation) NodesItems(key interface{}) (values map[discover.NodeID]interface{}) {
s.mu.RLock()

@ -54,7 +54,7 @@ func (s *Simulation) ConnectToLastNode(id discover.NodeID) (err error) {
// ConnectToRandomNode connects the node with provieded NodeID
// to a random node that is up.
func (s *Simulation) ConnectToRandomNode(id discover.NodeID) (err error) {
n := s.randomUpNode(id)
n := s.RandomUpNode(id)
if n == nil {
return ErrNodeNotFound
}
@ -135,7 +135,7 @@ func (s *Simulation) ConnectNodesStar(id discover.NodeID, ids []discover.NodeID)
return nil
}
// ConnectNodesStar connects all nodes in a star topology
// ConnectNodesStarPivot connects all nodes in a star topology
// with the center at already set pivot node.
// If ids argument is nil, all nodes that are up will be connected.
func (s *Simulation) ConnectNodesStarPivot(ids []discover.NodeID) (err error) {

@ -18,6 +18,7 @@ package simulation
import (
"context"
"sync"
"github.com/ethereum/go-ethereum/p2p/discover"
@ -71,24 +72,32 @@ func (f *PeerEventsFilter) MsgCode(c uint64) *PeerEventsFilter {
func (s *Simulation) PeerEvents(ctx context.Context, ids []discover.NodeID, filters ...*PeerEventsFilter) <-chan PeerEvent {
eventC := make(chan PeerEvent)
// wait group to make sure all subscriptions to admin peerEvents are established
// before this function returns.
var subsWG sync.WaitGroup
for _, id := range ids {
s.shutdownWG.Add(1)
subsWG.Add(1)
go func(id discover.NodeID) {
defer s.shutdownWG.Done()
client, err := s.Net.GetNode(id).Client()
if err != nil {
subsWG.Done()
eventC <- PeerEvent{NodeID: id, Error: err}
return
}
events := make(chan *p2p.PeerEvent)
sub, err := client.Subscribe(ctx, "admin", events, "peerEvents")
if err != nil {
subsWG.Done()
eventC <- PeerEvent{NodeID: id, Error: err}
return
}
defer sub.Unsubscribe()
subsWG.Done()
for {
select {
case <-ctx.Done():
@ -153,5 +162,7 @@ func (s *Simulation) PeerEvents(ctx context.Context, ids []discover.NodeID, filt
}(id)
}
// wait all subscriptions
subsWG.Wait()
return eventC
}

@ -29,7 +29,7 @@ var (
DefaultHTTPSimAddr = ":8888"
)
//`With`(builder) pattern constructor for Simulation to
//WithServer implements the builder pattern constructor for Simulation to
//start with a HTTP server
func (s *Simulation) WithServer(addr string) *Simulation {
//assign default addr if nothing provided
@ -46,7 +46,12 @@ func (s *Simulation) WithServer(addr string) *Simulation {
Addr: addr,
Handler: s.handler,
}
go s.httpSrv.ListenAndServe()
go func() {
err := s.httpSrv.ListenAndServe()
if err != nil {
log.Error("Error starting the HTTP server", "error", err)
}
}()
return s
}
@ -55,7 +60,7 @@ func (s *Simulation) addSimulationRoutes() {
s.handler.POST("/runsim", s.RunSimulation)
}
// StartNetwork starts all nodes in the network
// RunSimulation is the actual POST endpoint runner
func (s *Simulation) RunSimulation(w http.ResponseWriter, req *http.Request) {
log.Debug("RunSimulation endpoint running")
s.runC <- struct{}{}

@ -96,7 +96,12 @@ func sendRunSignal(t *testing.T) {
if err != nil {
t.Fatalf("Request failed: %v", err)
}
defer resp.Body.Close()
defer func() {
err := resp.Body.Close()
if err != nil {
log.Error("Error closing response body", "err", err)
}
}()
log.Debug("Signal sent")
if resp.StatusCode != http.StatusOK {
t.Fatalf("err %s", resp.Status)

@ -195,7 +195,7 @@ func (s *Simulation) AddNodesAndConnectStar(count int, opts ...AddNodeOption) (i
return ids, nil
}
//Upload a snapshot
//UploadSnapshot uploads a snapshot to the simulation
//This method tries to open the json file provided, applies the config to all nodes
//and then loads the snapshot into the Simulation network
func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption) error {
@ -203,7 +203,12 @@ func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption)
if err != nil {
return err
}
defer f.Close()
defer func() {
err := f.Close()
if err != nil {
log.Error("Error closing snapshot file", "err", err)
}
}()
jsonbyte, err := ioutil.ReadAll(f)
if err != nil {
return err
@ -294,7 +299,7 @@ func (s *Simulation) StopNode(id discover.NodeID) (err error) {
// StopRandomNode stops a random node.
func (s *Simulation) StopRandomNode() (id discover.NodeID, err error) {
n := s.randomUpNode()
n := s.RandomUpNode()
if n == nil {
return id, ErrNodeNotFound
}
@ -324,18 +329,18 @@ func init() {
rand.Seed(time.Now().UnixNano())
}
// randomUpNode returns a random SimNode that is up.
// RandomUpNode returns a random SimNode that is up.
// Arguments are NodeIDs for nodes that should not be returned.
func (s *Simulation) randomUpNode(exclude ...discover.NodeID) *adapters.SimNode {
func (s *Simulation) RandomUpNode(exclude ...discover.NodeID) *adapters.SimNode {
return s.randomNode(s.UpNodeIDs(), exclude...)
}
// randomUpNode returns a random SimNode that is not up.
// randomDownNode returns a random SimNode that is not up.
func (s *Simulation) randomDownNode(exclude ...discover.NodeID) *adapters.SimNode {
return s.randomNode(s.DownNodeIDs(), exclude...)
}
// randomUpNode returns a random SimNode from the slice of NodeIDs.
// randomNode returns a random SimNode from the slice of NodeIDs.
func (s *Simulation) randomNode(ids []discover.NodeID, exclude ...discover.NodeID) *adapters.SimNode {
for _, e := range exclude {
var i int

@ -39,7 +39,7 @@ func (s *Simulation) Service(name string, id discover.NodeID) node.Service {
// RandomService returns a single Service by name on a
// randomly chosen node that is up.
func (s *Simulation) RandomService(name string) node.Service {
n := s.randomUpNode()
n := s.RandomUpNode()
if n == nil {
return nil
}

@ -18,135 +18,70 @@ package stream
import (
"context"
"encoding/binary"
crand "crypto/rand"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/pot"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
"github.com/ethereum/go-ethereum/swarm/storage/mock/db"
mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db"
colorable "github.com/mattn/go-colorable"
)
var (
deliveries map[discover.NodeID]*Delivery
stores map[discover.NodeID]storage.ChunkStore
toAddr func(discover.NodeID) *network.BzzAddr
peerCount func(discover.NodeID) int
adapter = flag.String("adapter", "sim", "type of simulation: sim|exec|docker")
loglevel = flag.Int("loglevel", 2, "verbosity of logs")
nodes = flag.Int("nodes", 0, "number of nodes")
chunks = flag.Int("chunks", 0, "number of chunks")
useMockStore = flag.Bool("mockstore", false, "disabled mock store (default: enabled)")
)
longrunning = flag.Bool("longrunning", false, "do run long-running tests")
bucketKeyDB = simulation.BucketKey("db")
bucketKeyStore = simulation.BucketKey("store")
bucketKeyFileStore = simulation.BucketKey("filestore")
bucketKeyNetStore = simulation.BucketKey("netstore")
bucketKeyDelivery = simulation.BucketKey("delivery")
bucketKeyRegistry = simulation.BucketKey("registry")
var (
defaultSkipCheck bool
waitPeerErrC chan error
chunkSize = 4096
registries map[discover.NodeID]*TestRegistry
createStoreFunc func(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error)
getRetrieveFunc = defaultRetrieveFunc
subscriptionCount = 0
globalStore mock.GlobalStorer
globalStoreDir string
pof = pot.DefaultPof(256)
)
var services = adapters.Services{
"streamer": NewStreamerService,
"intervalsStreamer": newIntervalsStreamerService,
}
func init() {
flag.Parse()
// register the Delivery service which will run as a devp2p
// protocol when using the exec adapter
adapters.RegisterServices(services)
rand.Seed(time.Now().UnixNano())
log.PrintOrigins(true)
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
}
func createGlobalStore() {
var err error
globalStoreDir, err = ioutil.TempDir("", "global.store")
func createGlobalStore() (string, *mockdb.GlobalStore, error) {
var globalStore *mockdb.GlobalStore
globalStoreDir, err := ioutil.TempDir("", "global.store")
if err != nil {
log.Error("Error initiating global store temp directory!", "err", err)
return
return "", nil, err
}
globalStore, err = db.NewGlobalStore(globalStoreDir)
globalStore, err = mockdb.NewGlobalStore(globalStoreDir)
if err != nil {
log.Error("Error initiating global store!", "err", err)
return "", nil, err
}
}
// NewStreamerService
func NewStreamerService(ctx *adapters.ServiceContext) (node.Service, error) {
var err error
id := ctx.Config.ID
addr := toAddr(id)
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
stores[id], err = createStoreFunc(id, addr)
if err != nil {
return nil, err
}
store := stores[id].(*storage.LocalStore)
db := storage.NewDBAPI(store)
delivery := NewDelivery(kad, db)
deliveries[id] = delivery
r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: defaultSkipCheck,
DoRetrieve: false,
})
RegisterSwarmSyncerServer(r, db)
RegisterSwarmSyncerClient(r, db)
go func() {
waitPeerErrC <- waitForPeers(r, 1*time.Second, peerCount(id))
}()
fileStore := storage.NewFileStore(storage.NewNetStore(store, getRetrieveFunc(id)), storage.NewFileStoreParams())
testRegistry := &TestRegistry{Registry: r, fileStore: fileStore}
registries[id] = testRegistry
return testRegistry, nil
}
func defaultRetrieveFunc(id discover.NodeID) func(ctx context.Context, chunk *storage.Chunk) error {
return nil
}
func datadirsCleanup() {
for _, id := range ids {
os.RemoveAll(datadirs[id])
}
if globalStoreDir != "" {
os.RemoveAll(globalStoreDir)
}
}
//local stores need to be cleaned up after the sim is done
func localStoreCleanup() {
log.Info("Cleaning up...")
for _, id := range ids {
registries[id].Close()
stores[id].Close()
}
log.Info("Local store cleanup done")
return globalStoreDir, globalStore, nil
}
func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
@ -174,9 +109,7 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora
db := storage.NewDBAPI(localStore)
delivery := NewDelivery(to, db)
streamer := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: defaultSkipCheck,
})
streamer := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil)
teardown := func() {
streamer.Close()
removeDataDir()
@ -233,22 +166,6 @@ func (rrs *roundRobinStore) Close() {
}
}
type TestRegistry struct {
*Registry
fileStore *storage.FileStore
}
func (r *TestRegistry) APIs() []rpc.API {
a := r.Registry.APIs()
a = append(a, rpc.API{
Namespace: "stream",
Version: "3.0",
Service: r,
Public: true,
})
return a
}
func readAll(fileStore *storage.FileStore, hash []byte) (int64, error) {
r, _ := fileStore.Retrieve(context.TODO(), hash)
buf := make([]byte, 1024)
@ -265,185 +182,74 @@ func readAll(fileStore *storage.FileStore, hash []byte) (int64, error) {
return total, nil
}
func (r *TestRegistry) ReadAll(hash common.Hash) (int64, error) {
return readAll(r.fileStore, hash[:])
}
func (r *TestRegistry) Start(server *p2p.Server) error {
return r.Registry.Start(server)
}
func (r *TestRegistry) Stop() error {
return r.Registry.Stop()
}
type TestExternalRegistry struct {
*Registry
}
func (r *TestExternalRegistry) APIs() []rpc.API {
a := r.Registry.APIs()
a = append(a, rpc.API{
Namespace: "stream",
Version: "3.0",
Service: r,
Public: true,
})
return a
}
func (r *TestExternalRegistry) GetHashes(ctx context.Context, peerId discover.NodeID, s Stream) (*rpc.Subscription, error) {
peer := r.getPeer(peerId)
func uploadFilesToNodes(sim *simulation.Simulation) ([]storage.Address, []string, error) {
nodes := sim.UpNodeIDs()
nodeCnt := len(nodes)
log.Debug(fmt.Sprintf("Uploading %d files to nodes", nodeCnt))
//array holding generated files
rfiles := make([]string, nodeCnt)
//array holding the root hashes of the files
rootAddrs := make([]storage.Address, nodeCnt)
client, err := peer.getClient(ctx, s)
var err error
//for every node, generate a file and upload
for i, id := range nodes {
item, ok := sim.NodeItem(id, bucketKeyFileStore)
if !ok {
return nil, nil, fmt.Errorf("Error accessing localstore")
}
fileStore := item.(*storage.FileStore)
//generate a file
rfiles[i], err = generateRandomFile()
if err != nil {
return nil, err
}
c := client.Client.(*testExternalClient)
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return nil, fmt.Errorf("Subscribe not supported")
return nil, nil, err
}
sub := notifier.CreateSubscription()
go func() {
// if we begin sending event immediately some events
// will probably be dropped since the subscription ID might not be send to
// the client.
// ref: rpc/subscription_test.go#L65
time.Sleep(1 * time.Second)
for {
select {
case h := <-c.hashes:
<-c.enableNotificationsC // wait for notification subscription to complete
if err := notifier.Notify(sub.ID, h); err != nil {
log.Warn(fmt.Sprintf("rpc sub notifier notify stream %s: %v", s, err))
}
case err := <-sub.Err():
//store it (upload it) on the FileStore
ctx := context.TODO()
rk, wait, err := fileStore.Store(ctx, strings.NewReader(rfiles[i]), int64(len(rfiles[i])), false)
log.Debug("Uploaded random string file to node")
if err != nil {
log.Warn(fmt.Sprintf("caught subscription error in stream %s: %v", s, err))
}
case <-notifier.Closed():
log.Trace(fmt.Sprintf("rpc sub notifier closed"))
return
return nil, nil, err
}
}
}()
return sub, nil
}
func (r *TestExternalRegistry) EnableNotifications(peerId discover.NodeID, s Stream) error {
peer := r.getPeer(peerId)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
client, err := peer.getClient(ctx, s)
err = wait(ctx)
if err != nil {
return err
}
close(client.Client.(*testExternalClient).enableNotificationsC)
return nil
}
// TODO: merge functionalities of testExternalClient and testExternalServer
// with testClient and testServer.
type testExternalClient struct {
hashes chan []byte
db *storage.DBAPI
enableNotificationsC chan struct{}
}
func newTestExternalClient(db *storage.DBAPI) *testExternalClient {
return &testExternalClient{
hashes: make(chan []byte),
db: db,
enableNotificationsC: make(chan struct{}),
return nil, nil, err
}
}
func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() {
chunk, _ := c.db.GetOrCreateRequest(ctx, hash)
if chunk.ReqC == nil {
return nil
}
c.hashes <- hash
return func() {
chunk.WaitToStore()
rootAddrs[i] = rk
}
return rootAddrs, rfiles, nil
}
func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
return nil
}
func (c *testExternalClient) Close() {}
const testExternalServerBatchSize = 10
type testExternalServer struct {
t string
keyFunc func(key []byte, index uint64)
sessionAt uint64
maxKeys uint64
streamer *TestExternalRegistry
}
func newTestExternalServer(t string, sessionAt, maxKeys uint64, keyFunc func(key []byte, index uint64)) *testExternalServer {
if keyFunc == nil {
keyFunc = binary.BigEndian.PutUint64
}
return &testExternalServer{
t: t,
keyFunc: keyFunc,
sessionAt: sessionAt,
maxKeys: maxKeys,
//generate a random file (string)
func generateRandomFile() (string, error) {
//generate a random file size between minFileSize and maxFileSize
fileSize := rand.Intn(maxFileSize-minFileSize) + minFileSize
log.Debug(fmt.Sprintf("Generated file with filesize %d kB", fileSize))
b := make([]byte, fileSize*1024)
_, err := crand.Read(b)
if err != nil {
log.Error("Error generating random file.", "err", err)
return "", err
}
return string(b), nil
}
func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
if from == 0 && to == 0 {
from = s.sessionAt
to = s.sessionAt + testExternalServerBatchSize
}
if to-from > testExternalServerBatchSize {
to = from + testExternalServerBatchSize - 1
}
if from >= s.maxKeys && to > s.maxKeys {
return nil, 0, 0, nil, io.EOF
}
if to > s.maxKeys {
to = s.maxKeys
//create a local store for the given node
func createTestLocalStorageForID(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, string, error) {
var datadir string
var err error
datadir, err = ioutil.TempDir("", fmt.Sprintf("syncer-test-%s", id.TerminalString()))
if err != nil {
return nil, "", err
}
b := make([]byte, HashSize*(to-from+1))
for i := from; i <= to; i++ {
s.keyFunc(b[(i-from)*HashSize:(i-from+1)*HashSize], i)
var store storage.ChunkStore
params := storage.NewDefaultLocalStoreParams()
params.ChunkDbPath = datadir
params.BaseKey = addr.Over()
store, err = storage.NewTestLocalStoreForAddr(params)
if err != nil {
os.RemoveAll(datadir)
return nil, "", err
}
return b, from, to, nil, nil
}
func (s *testExternalServer) GetData(context.Context, []byte) ([]byte, error) {
return make([]byte, 4096), nil
}
func (s *testExternalServer) Close() {}
// Sets the global value defaultSkipCheck.
// It should be used in test function defer to reset the global value
// to the original value.
//
// defer setDefaultSkipCheck(defaultSkipCheck)
// defaultSkipCheck = skipCheck
//
// This works as defer function arguments evaluations are evaluated as ususal,
// but only the function body invocation is deferred.
func setDefaultSkipCheck(skipCheck bool) {
defaultSkipCheck = skipCheck
return store, datadir, nil
}

@ -22,18 +22,19 @@ import (
crand "crypto/rand"
"fmt"
"io"
"os"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
)
@ -308,159 +309,164 @@ func TestDeliveryFromNodes(t *testing.T) {
}
func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck bool) {
defaultSkipCheck = skipCheck
toAddr = network.NewAddrFromNodeID
createStoreFunc = createTestLocalStorageFromSim
conf := &streamTesting.RunConfig{
Adapter: *adapter,
NodeCount: nodes,
ConnLevel: conns,
ToAddr: toAddr,
Services: services,
EnableMsgEvents: false,
}
sim, teardown, err := streamTesting.NewSimulation(conf)
var rpcSubscriptionsWg sync.WaitGroup
defer func() {
rpcSubscriptionsWg.Wait()
teardown()
}()
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
id := ctx.Config.ID
addr := network.NewAddrFromNodeID(id)
store, datadir, err := createTestLocalStorageForID(id, addr)
if err != nil {
t.Fatal(err.Error())
return nil, nil, err
}
stores = make(map[discover.NodeID]storage.ChunkStore)
for i, id := range sim.IDs {
stores[id] = sim.Stores[i]
bucket.Store(bucketKeyStore, store)
cleanup = func() {
os.RemoveAll(datadir)
store.Close()
}
registries = make(map[discover.NodeID]*TestRegistry)
deliveries = make(map[discover.NodeID]*Delivery)
peerCount = func(id discover.NodeID) int {
if sim.IDs[0] == id || sim.IDs[nodes-1] == id {
return 1
localStore := store.(*storage.LocalStore)
db := storage.NewDBAPI(localStore)
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, db)
r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
})
bucket.Store(bucketKeyRegistry, r)
retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
}
return 2
netStore := storage.NewNetStore(localStore, retrieveFunc)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
return r, cleanup, nil
},
})
defer sim.Close()
log.Info("Adding nodes to simulation")
_, err := sim.AddNodesAndConnectChain(nodes)
if err != nil {
t.Fatal(err)
}
// here we distribute chunks of a random file into Stores of nodes 1 to nodes
rrFileStore := storage.NewFileStore(newRoundRobinStore(sim.Stores[1:]...), storage.NewFileStoreParams())
log.Info("Starting simulation")
ctx := context.Background()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
//determine the pivot node to be the first node of the simulation
sim.SetPivotNode(nodeIDs[0])
//distribute chunks of a random file into Stores of nodes 1 to nodes
//we will do this by creating a file store with an underlying round-robin store:
//the file store will create a hash for the uploaded file, but every chunk will be
//distributed to different nodes via round-robin scheduling
log.Debug("Writing file to round-robin file store")
//to do this, we create an array for chunkstores (length minus one, the pivot node)
stores := make([]storage.ChunkStore, len(nodeIDs)-1)
//we then need to get all stores from the sim....
lStores := sim.NodesItems(bucketKeyStore)
i := 0
//...iterate the buckets...
for id, bucketVal := range lStores {
//...and remove the one which is the pivot node
if id == *sim.PivotNodeID() {
continue
}
//the other ones are added to the array...
stores[i] = bucketVal.(storage.ChunkStore)
i++
}
//...which then gets passed to the round-robin file store
roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
//now we can actually upload a (random) file to the round-robin store
size := chunkCount * chunkSize
ctx := context.TODO()
fileHash, wait, err := rrFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
log.Debug("Storing data to file store")
fileHash, wait, err := roundRobinFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
// wait until all chunks stored
if err != nil {
t.Fatal(err.Error())
return err
}
err = wait(ctx)
if err != nil {
t.Fatal(err.Error())
}
errc := make(chan error, 1)
waitPeerErrC = make(chan error)
quitC := make(chan struct{})
defer close(quitC)
action := func(ctx context.Context) error {
// each node Subscribes to each other's swarmChunkServerStreamName
// need to wait till an aynchronous process registers the peers in streamer.peers
// that is used by Subscribe
// using a global err channel to share betweem action and node service
i := 0
for err := range waitPeerErrC {
if err != nil {
return fmt.Errorf("error waiting for peers: %s", err)
}
i++
if i == nodes {
break
}
return err
}
// each node subscribes to the upstream swarm chunk server stream
// which responds to chunk retrieve requests all but the last node in the chain does not
for j := 0; j < nodes-1; j++ {
id := sim.IDs[j]
err := sim.CallClient(id, func(client *rpc.Client) error {
doneC, err := streamTesting.WatchDisconnections(id, client, errc, quitC)
if err != nil {
return err
//each of the nodes (except pivot node) subscribes to the stream of the next node
for j, node := range nodeIDs[0 : nodes-1] {
sid := nodeIDs[j+1]
item, ok := sim.NodeItem(node, bucketKeyRegistry)
if !ok {
return fmt.Errorf("No registry")
}
rpcSubscriptionsWg.Add(1)
go func() {
<-doneC
rpcSubscriptionsWg.Done()
}()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
sid := sim.IDs[j+1]
return client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top)
})
registry := item.(*Registry)
err = registry.Subscribe(sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top)
if err != nil {
return err
}
}
// create a retriever FileStore for the pivot node
delivery := deliveries[sim.IDs[0]]
retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
}
netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
//get the pivot node's filestore
item, ok := sim.NodeItem(*sim.PivotNodeID(), bucketKeyFileStore)
if !ok {
return fmt.Errorf("No filestore")
}
pivotFileStore := item.(*storage.FileStore)
log.Debug("Starting retrieval routine")
go func() {
// start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
// we must wait for the peer connections to have started before requesting
n, err := readAll(fileStore, fileHash)
n, err := readAll(pivotFileStore, fileHash)
log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
if err != nil {
errc <- fmt.Errorf("requesting chunks action error: %v", err)
t.Fatalf("requesting chunks action error: %v", err)
}
}()
return nil
log.Debug("Waiting for kademlia")
if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
return err
}
log.Debug("Watching for disconnections")
disconnections := sim.PeerEvents(
context.Background(),
sim.NodeIDs(),
simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
)
go func() {
for d := range disconnections {
if d.Error != nil {
log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
t.Fatal(d.Error)
}
check := func(ctx context.Context, id discover.NodeID) (bool, error) {
select {
case err := <-errc:
return false, err
case <-ctx.Done():
return false, ctx.Err()
default:
}
}()
//finally check that the pivot node gets all chunks via the root hash
log.Debug("Check retrieval")
success := true
var total int64
err := sim.CallClient(id, func(client *rpc.Client) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
return client.CallContext(ctx, &total, "stream_readAll", common.BytesToHash(fileHash))
})
total, err = readAll(pivotFileStore, fileHash)
if err != nil {
return err
}
log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
if err != nil || total != int64(size) {
return false, nil
}
return true, nil
success = false
}
conf.Step = &simulations.Step{
Action: action,
Trigger: streamTesting.Trigger(10*time.Millisecond, quitC, sim.IDs[0]),
// we are only testing the pivot node (net.Nodes[0])
Expect: &simulations.Expectation{
Nodes: sim.IDs[0:1],
Check: check,
},
}
startedAt := time.Now()
timeout := 300 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
result, err := sim.Run(ctx, conf)
finishedAt := time.Now()
if err != nil {
t.Fatalf("Setting up simulation failed: %v", err)
if !success {
return fmt.Errorf("Test failed, chunks not available on all nodes")
}
log.Debug("Test terminated successfully")
return nil
})
if result.Error != nil {
t.Fatalf("Simulation failed: %s", result.Error)
t.Fatal(result.Error)
}
streamTesting.CheckResult(t, result, startedAt, finishedAt)
}
func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) {
@ -490,142 +496,90 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
}
func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skipCheck bool) {
defaultSkipCheck = skipCheck
toAddr = network.NewAddrFromNodeID
createStoreFunc = createTestLocalStorageFromSim
registries = make(map[discover.NodeID]*TestRegistry)
timeout := 300 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
conf := &streamTesting.RunConfig{
Adapter: *adapter,
NodeCount: nodes,
ConnLevel: conns,
ToAddr: toAddr,
Services: services,
EnableMsgEvents: false,
}
sim, teardown, err := streamTesting.NewSimulation(conf)
var rpcSubscriptionsWg sync.WaitGroup
defer func() {
rpcSubscriptionsWg.Wait()
teardown()
}()
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
id := ctx.Config.ID
addr := network.NewAddrFromNodeID(id)
store, datadir, err := createTestLocalStorageForID(id, addr)
if err != nil {
b.Fatal(err.Error())
}
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
cleanup = func() {
os.RemoveAll(datadir)
store.Close()
}
localStore := store.(*storage.LocalStore)
db := storage.NewDBAPI(localStore)
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, db)
r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
DoSync: true,
SyncUpdateDelay: 0,
})
stores = make(map[discover.NodeID]storage.ChunkStore)
deliveries = make(map[discover.NodeID]*Delivery)
for i, id := range sim.IDs {
stores[id] = sim.Stores[i]
}
peerCount = func(id discover.NodeID) int {
if sim.IDs[0] == id || sim.IDs[nodes-1] == id {
return 1
}
return 2
retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
}
// wait channel for all nodes all peer connections to set up
waitPeerErrC = make(chan error)
// create a FileStore for the last node in the chain which we are gonna write to
remoteFileStore := storage.NewFileStore(sim.Stores[nodes-1], storage.NewFileStoreParams())
// channel to signal simulation initialisation with action call complete
// or node disconnections
disconnectC := make(chan error)
quitC := make(chan struct{})
netStore := storage.NewNetStore(localStore, retrieveFunc)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
initC := make(chan error)
return r, cleanup, nil
action := func(ctx context.Context) error {
// each node Subscribes to each other's swarmChunkServerStreamName
// need to wait till an aynchronous process registers the peers in streamer.peers
// that is used by Subscribe
// waitPeerErrC using a global err channel to share betweem action and node service
i := 0
for err := range waitPeerErrC {
if err != nil {
return fmt.Errorf("error waiting for peers: %s", err)
}
i++
if i == nodes {
break
}
}
var err error
// each node except the last one subscribes to the upstream swarm chunk server stream
// which responds to chunk retrieve requests
for j := 0; j < nodes-1; j++ {
id := sim.IDs[j]
err = sim.CallClient(id, func(client *rpc.Client) error {
doneC, err := streamTesting.WatchDisconnections(id, client, disconnectC, quitC)
if err != nil {
return err
}
rpcSubscriptionsWg.Add(1)
go func() {
<-doneC
rpcSubscriptionsWg.Done()
}()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
sid := sim.IDs[j+1] // the upstream peer's id
return client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top)
},
})
defer sim.Close()
log.Info("Initializing test config")
_, err := sim.AddNodesAndConnectChain(nodes)
if err != nil {
break
b.Fatal(err)
}
ctx := context.Background()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
node := nodeIDs[len(nodeIDs)-1]
item, ok := sim.NodeItem(node, bucketKeyFileStore)
if !ok {
b.Fatal("No filestore")
}
initC <- err
return nil
}
remoteFileStore := item.(*storage.FileStore)
// the check function is only triggered when the benchmark finishes
trigger := make(chan discover.NodeID)
check := func(ctx context.Context, id discover.NodeID) (_ bool, err error) {
return true, nil
pivotNode := nodeIDs[0]
item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore)
if !ok {
b.Fatal("No filestore")
}
netStore := item.(*storage.NetStore)
conf.Step = &simulations.Step{
Action: action,
Trigger: trigger,
// we are only testing the pivot node (net.Nodes[0])
Expect: &simulations.Expectation{
Nodes: sim.IDs[0:1],
Check: check,
},
if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
return err
}
// run the simulation in the background
errc := make(chan error)
go func() {
_, err := sim.Run(ctx, conf)
close(quitC)
errc <- err
}()
disconnections := sim.PeerEvents(
context.Background(),
sim.NodeIDs(),
simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
)
// wait for simulation action to complete stream subscriptions
err = <-initC
if err != nil {
b.Fatalf("simulation failed to initialise. expected no error. got %v", err)
go func() {
for d := range disconnections {
if d.Error != nil {
log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
b.Fatal(d.Error)
}
// create a retriever FileStore for the pivot node
// by now deliveries are set for each node by the streamer service
delivery := deliveries[sim.IDs[0]]
retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
}
netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc)
}()
// benchmark loop
b.ResetTimer()
b.StopTimer()
Loop:
Loop:
for i := 0; i < b.N; i++ {
// uploading chunkCount random chunks to the last node
hashes := make([]storage.Address, chunkCount)
@ -670,38 +624,18 @@ Loop:
}
b.StopTimer()
select {
case err = <-disconnectC:
if err != nil {
break Loop
}
default:
}
if misses > 0 {
err = fmt.Errorf("%v chunk not found out of %v", misses, total)
break Loop
}
}
select {
case <-quitC:
case trigger <- sim.IDs[0]:
}
if err == nil {
err = <-errc
} else {
if e := <-errc; e != nil {
b.Errorf("sim.Run function error: %v", e)
}
}
// benchmark over, trigger the check function to conclude the simulation
if err != nil {
b.Fatalf("expected no error. got %v", err)
b.Fatal(err)
}
return nil
})
if result.Error != nil {
b.Fatal(result.Error)
}
}
func createTestLocalStorageFromSim(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error) {
return stores[id], nil
}

@ -22,52 +22,22 @@ import (
"encoding/binary"
"fmt"
"io"
"os"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/network"
streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
)
var (
externalStreamName = "externalStream"
externalStreamSessionAt uint64 = 50
externalStreamMaxKeys uint64 = 100
)
func newIntervalsStreamerService(ctx *adapters.ServiceContext) (node.Service, error) {
id := ctx.Config.ID
addr := toAddr(id)
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
store := stores[id].(*storage.LocalStore)
db := storage.NewDBAPI(store)
delivery := NewDelivery(kad, db)
deliveries[id] = delivery
r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: defaultSkipCheck,
})
r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) {
return newTestExternalClient(db), nil
})
r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) {
return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
})
go func() {
waitPeerErrC <- waitForPeers(r, 1*time.Second, peerCount(id))
}()
return &TestExternalRegistry{r}, nil
}
func TestIntervals(t *testing.T) {
testIntervals(t, true, nil, false)
testIntervals(t, false, NewRange(9, 26), false)
@ -81,94 +51,111 @@ func TestIntervals(t *testing.T) {
func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
nodes := 2
chunkCount := dataChunkCount
externalStreamName := "externalStream"
externalStreamSessionAt := uint64(50)
externalStreamMaxKeys := uint64(100)
defer setDefaultSkipCheck(defaultSkipCheck)
defaultSkipCheck = skipCheck
sim := simulation.New(map[string]simulation.ServiceFunc{
"intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
toAddr = network.NewAddrFromNodeID
conf := &streamTesting.RunConfig{
Adapter: *adapter,
NodeCount: nodes,
ConnLevel: 1,
ToAddr: toAddr,
Services: services,
DefaultService: "intervalsStreamer",
id := ctx.Config.ID
addr := network.NewAddrFromNodeID(id)
store, datadir, err := createTestLocalStorageForID(id, addr)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
cleanup = func() {
store.Close()
os.RemoveAll(datadir)
}
localStore := store.(*storage.LocalStore)
db := storage.NewDBAPI(localStore)
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, db)
sim, teardown, err := streamTesting.NewSimulation(conf)
var rpcSubscriptionsWg sync.WaitGroup
defer func() {
rpcSubscriptionsWg.Wait()
teardown()
}()
r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
})
bucket.Store(bucketKeyRegistry, r)
r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) {
return newTestExternalClient(db), nil
})
r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) {
return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
})
fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
return r, cleanup, nil
},
})
defer sim.Close()
log.Info("Adding nodes to simulation")
_, err := sim.AddNodesAndConnectChain(nodes)
if err != nil {
t.Fatal(err)
}
stores = make(map[discover.NodeID]storage.ChunkStore)
deliveries = make(map[discover.NodeID]*Delivery)
for i, id := range sim.IDs {
stores[id] = sim.Stores[i]
}
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()
peerCount = func(id discover.NodeID) int {
return 1
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
storer := nodeIDs[0]
checker := nodeIDs[1]
item, ok := sim.NodeItem(storer, bucketKeyFileStore)
if !ok {
return fmt.Errorf("No filestore")
}
fileStore := item.(*storage.FileStore)
fileStore := storage.NewFileStore(sim.Stores[0], storage.NewFileStoreParams())
size := chunkCount * chunkSize
ctx := context.TODO()
_, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
if err != nil {
log.Error("Store error: %v", "err", err)
t.Fatal(err)
}
err = wait(ctx)
if err != nil {
log.Error("Wait error: %v", "err", err)
t.Fatal(err)
}
errc := make(chan error, 1)
waitPeerErrC = make(chan error)
quitC := make(chan struct{})
defer close(quitC)
action := func(ctx context.Context) error {
i := 0
for err := range waitPeerErrC {
if err != nil {
return fmt.Errorf("error waiting for peers: %s", err)
item, ok = sim.NodeItem(checker, bucketKeyRegistry)
if !ok {
return fmt.Errorf("No registry")
}
i++
if i == nodes {
break
}
}
id := sim.IDs[1]
err := sim.CallClient(id, func(client *rpc.Client) error {
registry := item.(*Registry)
sid := sim.IDs[0]
liveErrC := make(chan error)
historyErrC := make(chan error)
doneC, err := streamTesting.WatchDisconnections(id, client, errc, quitC)
if err != nil {
if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
log.Error("WaitKademlia error: %v", "err", err)
return err
}
rpcSubscriptionsWg.Add(1)
go func() {
<-doneC
rpcSubscriptionsWg.Done()
}()
ctx, cancel := context.WithTimeout(ctx, 100*time.Second)
defer cancel()
err = client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream(externalStreamName, "", live), history, Top)
if err != nil {
return err
}
log.Debug("Watching for disconnections")
disconnections := sim.PeerEvents(
context.Background(),
sim.NodeIDs(),
simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
)
liveErrC := make(chan error)
historyErrC := make(chan error)
go func() {
for d := range disconnections {
if d.Error != nil {
log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
t.Fatal(d.Error)
}
}
}()
go func() {
if !live {
@ -182,17 +169,16 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
}()
// live stream
liveHashesChan := make(chan []byte)
liveSubscription, err := client.Subscribe(ctx, "stream", liveHashesChan, "getHashes", sid, NewStream(externalStreamName, "", true))
var liveHashesChan chan []byte
liveHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", true))
if err != nil {
log.Error("Subscription error: %v", "err", err)
return
}
defer liveSubscription.Unsubscribe()
i := externalStreamSessionAt
// we have subscribed, enable notifications
err = client.CallContext(ctx, nil, "stream_enableNotifications", sid, NewStream(externalStreamName, "", true))
err = enableNotifications(registry, storer, NewStream(externalStreamName, "", true))
if err != nil {
return
}
@ -209,8 +195,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
if i > externalStreamMaxKeys {
return
}
case err = <-liveSubscription.Err():
return
case <-ctx.Done():
return
}
@ -229,12 +213,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
}()
// history stream
historyHashesChan := make(chan []byte)
historySubscription, err := client.Subscribe(ctx, "stream", historyHashesChan, "getHashes", sid, NewStream(externalStreamName, "", false))
var historyHashesChan chan []byte
historyHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", false))
if err != nil {
return
}
defer historySubscription.Unsubscribe()
var i uint64
historyTo := externalStreamMaxKeys
@ -246,7 +229,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
}
// we have subscribed, enable notifications
err = client.CallContext(ctx, nil, "stream_enableNotifications", sid, NewStream(externalStreamName, "", false))
err = enableNotifications(registry, storer, NewStream(externalStreamName, "", false))
if err != nil {
return
}
@ -263,14 +246,16 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
if i > historyTo {
return
}
case err = <-historySubscription.Err():
return
case <-ctx.Done():
return
}
}
}()
err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
if err != nil {
return err
}
if err := <-liveErrC; err != nil {
return err
}
@ -280,38 +265,123 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
return nil
})
if result.Error != nil {
t.Fatal(result.Error)
}
}
func getHashes(ctx context.Context, r *Registry, peerID discover.NodeID, s Stream) (chan []byte, error) {
peer := r.getPeer(peerID)
client, err := peer.getClient(ctx, s)
if err != nil {
return nil, err
}
c := client.Client.(*testExternalClient)
return c.hashes, nil
}
func enableNotifications(r *Registry, peerID discover.NodeID, s Stream) error {
peer := r.getPeer(peerID)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
client, err := peer.getClient(ctx, s)
if err != nil {
return err
}
check := func(ctx context.Context, id discover.NodeID) (bool, error) {
select {
case err := <-errc:
return false, err
case <-ctx.Done():
return false, ctx.Err()
default:
close(client.Client.(*testExternalClient).enableNotificationsC)
return nil
}
type testExternalClient struct {
hashes chan []byte
db *storage.DBAPI
enableNotificationsC chan struct{}
}
func newTestExternalClient(db *storage.DBAPI) *testExternalClient {
return &testExternalClient{
hashes: make(chan []byte),
db: db,
enableNotificationsC: make(chan struct{}),
}
}
func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() {
chunk, _ := c.db.GetOrCreateRequest(ctx, hash)
if chunk.ReqC == nil {
return nil
}
return true, nil
c.hashes <- hash
//NOTE: This was failing on go1.9.x with a deadlock.
//Sometimes this function would just block
//It is commented now, but it may be well worth after the chunk refactor
//to re-enable this and see if the problem has been addressed
/*
return func() {
return chunk.WaitToStore()
}
*/
return nil
}
conf.Step = &simulations.Step{
Action: action,
Trigger: streamTesting.Trigger(10*time.Millisecond, quitC, sim.IDs[0]),
Expect: &simulations.Expectation{
Nodes: sim.IDs[1:1],
Check: check,
},
func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
return nil
}
func (c *testExternalClient) Close() {}
const testExternalServerBatchSize = 10
type testExternalServer struct {
t string
keyFunc func(key []byte, index uint64)
sessionAt uint64
maxKeys uint64
}
func newTestExternalServer(t string, sessionAt, maxKeys uint64, keyFunc func(key []byte, index uint64)) *testExternalServer {
if keyFunc == nil {
keyFunc = binary.BigEndian.PutUint64
}
startedAt := time.Now()
timeout := 300 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
result, err := sim.Run(ctx, conf)
finishedAt := time.Now()
if err != nil {
t.Fatalf("Setting up simulation failed: %v", err)
return &testExternalServer{
t: t,
keyFunc: keyFunc,
sessionAt: sessionAt,
maxKeys: maxKeys,
}
if result.Error != nil {
t.Fatalf("Simulation failed: %s", result.Error)
}
func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
if from == 0 && to == 0 {
from = s.sessionAt
to = s.sessionAt + testExternalServerBatchSize
}
streamTesting.CheckResult(t, result, startedAt, finishedAt)
if to-from > testExternalServerBatchSize {
to = from + testExternalServerBatchSize - 1
}
if from >= s.maxKeys && to > s.maxKeys {
return nil, 0, 0, nil, io.EOF
}
if to > s.maxKeys {
to = s.maxKeys
}
b := make([]byte, HashSize*(to-from+1))
for i := from; i <= to; i++ {
s.keyFunc(b[(i-from)*HashSize:(i-from+1)*HashSize], i)
}
return b, from, to, nil, nil
}
func (s *testExternalServer) GetData(context.Context, []byte) ([]byte, error) {
return make([]byte, 4096), nil
}
func (s *testExternalServer) Close() {}

@ -17,20 +17,19 @@ package stream
import (
"context"
crand "crypto/rand"
"fmt"
"math/rand"
"strings"
"os"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
)
@ -40,40 +39,6 @@ const (
maxFileSize = 40
)
func initRetrievalTest() {
//global func to get overlay address from discover ID
toAddr = func(id discover.NodeID) *network.BzzAddr {
addr := network.NewAddrFromNodeID(id)
return addr
}
//global func to create local store
createStoreFunc = createTestLocalStorageForId
//local stores
stores = make(map[discover.NodeID]storage.ChunkStore)
//data directories for each node and store
datadirs = make(map[discover.NodeID]string)
//deliveries for each node
deliveries = make(map[discover.NodeID]*Delivery)
//global retrieve func
getRetrieveFunc = func(id discover.NodeID) func(ctx context.Context, chunk *storage.Chunk) error {
return func(ctx context.Context, chunk *storage.Chunk) error {
skipCheck := true
return deliveries[id].RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
}
}
//registries, map of discover.NodeID to its streamer
registries = make(map[discover.NodeID]*TestRegistry)
//not needed for this test but required from common_test for NewStreamService
waitPeerErrC = make(chan error)
//also not needed for this test but required for NewStreamService
peerCount = func(id discover.NodeID) int {
if ids[0] == id || ids[len(ids)-1] == id {
return 1
}
return 2
}
}
//This test is a retrieval test for nodes.
//A configurable number of nodes can be
//provided to the test.
@ -81,7 +46,10 @@ func initRetrievalTest() {
//Number of nodes can be provided via commandline too.
func TestFileRetrieval(t *testing.T) {
if *nodes != 0 {
fileRetrievalTest(t, *nodes)
err := runFileRetrievalTest(*nodes)
if err != nil {
t.Fatal(err)
}
} else {
nodeCnt := []int{16}
//if the `longrunning` flag has been provided
@ -90,7 +58,10 @@ func TestFileRetrieval(t *testing.T) {
nodeCnt = append(nodeCnt, 32, 64, 128)
}
for _, n := range nodeCnt {
fileRetrievalTest(t, n)
err := runFileRetrievalTest(n)
if err != nil {
t.Fatal(err)
}
}
}
}
@ -105,7 +76,10 @@ func TestRetrieval(t *testing.T) {
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
retrievalTest(t, *chunks, *nodes)
err := runRetrievalTest(*chunks, *nodes)
if err != nil {
t.Fatal(err)
}
} else {
var nodeCnt []int
var chnkCnt []int
@ -121,76 +95,17 @@ func TestRetrieval(t *testing.T) {
}
for _, n := range nodeCnt {
for _, c := range chnkCnt {
retrievalTest(t, c, n)
}
}
}
}
//Every test runs 3 times, a live, a history, and a live AND history
func fileRetrievalTest(t *testing.T, nodeCount int) {
//test live and NO history
log.Info("Testing live and no history", "nodeCount", nodeCount)
live = true
history = false
err := runFileRetrievalTest(nodeCount)
err := runRetrievalTest(c, n)
if err != nil {
t.Fatal(err)
}
//test history only
log.Info("Testing history only", "nodeCount", nodeCount)
live = false
history = true
err = runFileRetrievalTest(nodeCount)
if err != nil {
t.Fatal(err)
}
//finally test live and history
log.Info("Testing live and history", "nodeCount", nodeCount)
live = true
err = runFileRetrievalTest(nodeCount)
if err != nil {
t.Fatal(err)
}
}
//Every test runs 3 times, a live, a history, and a live AND history
func retrievalTest(t *testing.T, chunkCount int, nodeCount int) {
//test live and NO history
log.Info("Testing live and no history", "chunkCount", chunkCount, "nodeCount", nodeCount)
live = true
history = false
err := runRetrievalTest(chunkCount, nodeCount)
if err != nil {
t.Fatal(err)
}
//test history only
log.Info("Testing history only", "chunkCount", chunkCount, "nodeCount", nodeCount)
live = false
history = true
err = runRetrievalTest(chunkCount, nodeCount)
if err != nil {
t.Fatal(err)
}
//finally test live and history
log.Info("Testing live and history", "chunkCount", chunkCount, "nodeCount", nodeCount)
live = true
err = runRetrievalTest(chunkCount, nodeCount)
if err != nil {
t.Fatal(err)
}
}
/*
The upload is done by dependency to the global
`live` and `history` variables;
If `live` is set, first stream subscriptions are established,
then files are uploaded to nodes.
If `history` is enabled, first upload files, then build up subscriptions.
The test loads a snapshot file to construct the swarm network,
assuming that the snapshot file identifies a healthy
kademlia network. Nevertheless a health check runs in the
@ -199,261 +114,129 @@ simulation's `action` function.
The snapshot should have 'streamer' in its service list.
*/
func runFileRetrievalTest(nodeCount int) error {
//for every run (live, history), int the variables
initRetrievalTest()
//the ids of the snapshot nodes, initiate only now as we need nodeCount
ids = make([]discover.NodeID, nodeCount)
//channel to check for disconnection errors
disconnectC := make(chan error)
//channel to close disconnection watcher routine
quitC := make(chan struct{})
//the test conf (using same as in `snapshot_sync_test`
conf = &synctestConfig{}
//map of overlay address to discover ID
conf.addrToIdMap = make(map[string]discover.NodeID)
//array where the generated chunk hashes will be stored
conf.hashes = make([]storage.Address, 0)
//load nodes from the snapshot file
net, err := initNetWithSnapshot(nodeCount)
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
id := ctx.Config.ID
addr := network.NewAddrFromNodeID(id)
store, datadir, err := createTestLocalStorageForID(id, addr)
if err != nil {
return err
return nil, nil, err
}
var rpcSubscriptionsWg sync.WaitGroup
//do cleanup after test is terminated
defer func() {
//shutdown the snapshot network
net.Shutdown()
//after the test, clean up local stores initialized with createLocalStoreForId
localStoreCleanup()
//finally clear all data directories
datadirsCleanup()
}()
//get the nodes of the network
nodes := net.GetNodes()
//iterate over all nodes...
for c := 0; c < len(nodes); c++ {
//create an array of discovery nodeIDS
ids[c] = nodes[c].ID()
a := network.ToOverlayAddr(ids[c].Bytes())
//append it to the array of all overlay addresses
conf.addrs = append(conf.addrs, a)
conf.addrToIdMap[string(a)] = ids[c]
bucket.Store(bucketKeyStore, store)
cleanup = func() {
os.RemoveAll(datadir)
store.Close()
}
localStore := store.(*storage.LocalStore)
db := storage.NewDBAPI(localStore)
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, db)
//needed for healthy call
ppmap = network.NewPeerPotMap(testMinProxBinSize, conf.addrs)
r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
DoSync: true,
SyncUpdateDelay: 3 * time.Second,
})
//an array for the random files
var randomFiles []string
//channel to signal when the upload has finished
uploadFinished := make(chan struct{})
//channel to trigger new node checks
trigger := make(chan discover.NodeID)
//simulation action
action := func(ctx context.Context) error {
//first run the health check on all nodes,
//wait until nodes are all healthy
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
healthy := true
for _, id := range ids {
r := registries[id]
//PeerPot for this node
addr := common.Bytes2Hex(r.addr.OAddr)
pp := ppmap[addr]
//call Healthy RPC
h := r.delivery.overlay.Healthy(pp)
//print info
log.Debug(r.delivery.overlay.String())
log.Debug(fmt.Sprintf("IS HEALTHY: %t", h.GotNN && h.KnowNN && h.Full))
if !h.GotNN || !h.Full {
healthy = false
break
}
}
if healthy {
break
}
}
if history {
log.Info("Uploading for history")
//If testing only history, we upload the chunk(s) first
conf.hashes, randomFiles, err = uploadFilesToNodes(nodes)
if err != nil {
return err
}
}
fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
//variables needed to wait for all subscriptions established before uploading
errc := make(chan error)
return r, cleanup, nil
//now setup and start event watching in order to know when we can upload
ctx, watchCancel := context.WithTimeout(context.Background(), MaxTimeout*time.Second)
defer watchCancel()
},
})
defer sim.Close()
log.Info("Setting up stream subscription")
//We need two iterations, one to subscribe to the subscription events
//(so we know when setup phase is finished), and one to
//actually run the stream subscriptions. We can't do it in the same iteration,
//because while the first nodes in the loop are setting up subscriptions,
//the latter ones have not subscribed to listen to peer events yet,
//and then we miss events.
log.Info("Initializing test config")
//first iteration: setup disconnection watcher and subscribe to peer events
for j, id := range ids {
log.Trace(fmt.Sprintf("Subscribe to subscription events: %d", j))
client, err := net.GetNode(id).Client()
if err != nil {
return err
}
wsDoneC := watchSubscriptionEvents(ctx, id, client, errc, quitC)
// doneC is nil, the error happened which is sent to errc channel, already
if wsDoneC == nil {
continue
}
rpcSubscriptionsWg.Add(1)
go func() {
<-wsDoneC
rpcSubscriptionsWg.Done()
}()
conf := &synctestConfig{}
//map of discover ID to indexes of chunks expected at that ID
conf.idToChunksMap = make(map[discover.NodeID][]int)
//map of overlay address to discover ID
conf.addrToIDMap = make(map[string]discover.NodeID)
//array where the generated chunk hashes will be stored
conf.hashes = make([]storage.Address, 0)
//watch for peers disconnecting
wdDoneC, err := streamTesting.WatchDisconnections(id, client, disconnectC, quitC)
err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
if err != nil {
return err
}
rpcSubscriptionsWg.Add(1)
go func() {
<-wdDoneC
rpcSubscriptionsWg.Done()
}()
}
//second iteration: start syncing and setup stream subscriptions
for j, id := range ids {
log.Trace(fmt.Sprintf("Start syncing and stream subscriptions: %d", j))
client, err := net.GetNode(id).Client()
if err != nil {
return err
}
//start syncing!
var cnt int
err = client.CallContext(ctx, &cnt, "stream_startSyncing")
if err != nil {
return err
}
//increment the number of subscriptions we need to wait for
//by the count returned from startSyncing (SYNC subscriptions)
subscriptionCount += cnt
//now also add the number of RETRIEVAL_REQUEST subscriptions
for snid := range registries[id].peers {
subscriptionCount++
err = client.CallContext(ctx, nil, "stream_subscribeStream", snid, NewStream(swarmChunkServerStreamName, "", false), nil, Top)
if err != nil {
return err
}
}
}
ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancelSimRun()
//now wait until the number of expected subscriptions has been finished
//`watchSubscriptionEvents` will write with a `nil` value to errc
//every time a `SubscriptionMsg` has been received
for err := range errc {
if err != nil {
return err
}
//`nil` received, decrement count
subscriptionCount--
//all subscriptions received
if subscriptionCount == 0 {
break
}
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
for _, n := range nodeIDs {
//get the kademlia overlay address from this ID
a := network.ToOverlayAddr(n.Bytes())
//append it to the array of all overlay addresses
conf.addrs = append(conf.addrs, a)
//the proximity calculation is on overlay addr,
//the p2p/simulations check func triggers on discover.NodeID,
//so we need to know which overlay addr maps to which nodeID
conf.addrToIDMap[string(a)] = n
}
log.Info("Stream subscriptions successfully requested, action terminated")
//an array for the random files
var randomFiles []string
//channel to signal when the upload has finished
//uploadFinished := make(chan struct{})
//channel to trigger new node checks
if live {
//upload generated files to nodes
var hashes []storage.Address
var rfiles []string
hashes, rfiles, err = uploadFilesToNodes(nodes)
conf.hashes, randomFiles, err = uploadFilesToNodes(sim)
if err != nil {
return err
}
conf.hashes = append(conf.hashes, hashes...)
randomFiles = append(randomFiles, rfiles...)
//signal to the trigger loop that the upload has finished
uploadFinished <- struct{}{}
}
return nil
}
//check defines what will be checked during the test
check := func(ctx context.Context, id discover.NodeID) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
case e := <-disconnectC:
log.Error(e.Error())
return false, fmt.Errorf("Disconnect event detected, network unhealthy")
default:
if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
return err
}
log.Trace(fmt.Sprintf("Checking node: %s", id))
//if there are more than one chunk, test only succeeds if all expected chunks are found
allSuccess := true
// File retrieval check is repeated until all uploaded files are retrieved from all nodes
// or until the timeout is reached.
allSuccess := false
for !allSuccess {
for _, id := range nodeIDs {
//for each expected chunk, check if it is in the local store
localChunks := conf.idToChunksMap[id]
localSuccess := true
for _, ch := range localChunks {
//get the real chunk by the index in the index array
chunk := conf.hashes[ch]
log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
//check if the expected chunk is indeed in the localstore
var err error
//check on the node's FileStore (netstore)
fileStore := registries[id].fileStore
item, ok := sim.NodeItem(id, bucketKeyFileStore)
if !ok {
return fmt.Errorf("No registry")
}
fileStore := item.(*storage.FileStore)
//check all chunks
for i, hash := range conf.hashes {
reader, _ := fileStore.Retrieve(context.TODO(), hash)
//check that we can read the file size and that it corresponds to the generated file size
if s, err := reader.Size(context.TODO(), nil); err != nil || s != int64(len(randomFiles[i])) {
if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) {
allSuccess = false
log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id)
} else {
log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash))
}
}
return allSuccess, nil
if err != nil {
log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
localSuccess = false
} else {
log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
}
//for each tick, run the checks on all nodes
timingTicker := time.NewTicker(5 * time.Second)
defer timingTicker.Stop()
go func() {
//for live upload, we should wait for uploads to have finished
//before starting to trigger the checks, due to file size
if live {
<-uploadFinished
}
for range timingTicker.C {
for i := 0; i < len(ids); i++ {
log.Trace(fmt.Sprintf("triggering step %d, id %s", i, ids[i]))
trigger <- ids[i]
allSuccess = localSuccess
}
}
}()
log.Info("Starting simulation run...")
timeout := MaxTimeout * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
//run the simulation
result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{
Action: action,
Trigger: trigger,
Expect: &simulations.Expectation{
Nodes: ids,
Check: check,
},
if !allSuccess {
return fmt.Errorf("Not all chunks succeeded!")
}
return nil
})
if result.Error != nil {
@ -466,14 +249,6 @@ func runFileRetrievalTest(nodeCount int) error {
/*
The test generates the given number of chunks.
The upload is done by dependency to the global
`live` and `history` variables;
If `live` is set, first stream subscriptions are established, then
upload to a random node.
If `history` is enabled, first upload then build up subscriptions.
The test loads a snapshot file to construct the swarm network,
assuming that the snapshot file identifies a healthy
kademlia network. Nevertheless a health check runs in the
@ -482,259 +257,129 @@ simulation's `action` function.
The snapshot should have 'streamer' in its service list.
*/
func runRetrievalTest(chunkCount int, nodeCount int) error {
//for every run (live, history), int the variables
initRetrievalTest()
//the ids of the snapshot nodes, initiate only now as we need nodeCount
ids = make([]discover.NodeID, nodeCount)
//channel to check for disconnection errors
disconnectC := make(chan error)
//channel to close disconnection watcher routine
quitC := make(chan struct{})
//the test conf (using same as in `snapshot_sync_test`
conf = &synctestConfig{}
//map of overlay address to discover ID
conf.addrToIdMap = make(map[string]discover.NodeID)
//array where the generated chunk hashes will be stored
conf.hashes = make([]storage.Address, 0)
//load nodes from the snapshot file
net, err := initNetWithSnapshot(nodeCount)
if err != nil {
return err
}
var rpcSubscriptionsWg sync.WaitGroup
//do cleanup after test is terminated
defer func() {
//shutdown the snapshot network
net.Shutdown()
//after the test, clean up local stores initialized with createLocalStoreForId
localStoreCleanup()
//finally clear all data directories
datadirsCleanup()
}()
//get the nodes of the network
nodes := net.GetNodes()
//select one index at random...
idx := rand.Intn(len(nodes))
//...and get the the node at that index
//this is the node selected for upload
uploadNode := nodes[idx]
//iterate over all nodes...
for c := 0; c < len(nodes); c++ {
//create an array of discovery nodeIDS
ids[c] = nodes[c].ID()
a := network.ToOverlayAddr(ids[c].Bytes())
//append it to the array of all overlay addresses
conf.addrs = append(conf.addrs, a)
conf.addrToIdMap[string(a)] = ids[c]
}
//needed for healthy call
ppmap = network.NewPeerPotMap(testMinProxBinSize, conf.addrs)
trigger := make(chan discover.NodeID)
//simulation action
action := func(ctx context.Context) error {
//first run the health check on all nodes,
//wait until nodes are all healthy
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
healthy := true
for _, id := range ids {
r := registries[id]
//PeerPot for this node
addr := common.Bytes2Hex(network.ToOverlayAddr(id.Bytes()))
pp := ppmap[addr]
//call Healthy RPC
h := r.delivery.overlay.Healthy(pp)
//print info
log.Debug(r.delivery.overlay.String())
log.Debug(fmt.Sprintf("IS HEALTHY: %t", h.GotNN && h.KnowNN && h.Full))
if !h.GotNN || !h.Full {
healthy = false
break
}
}
if healthy {
break
}
}
if history {
log.Info("Uploading for history")
//If testing only history, we upload the chunk(s) first
conf.hashes, err = uploadFileToSingleNodeStore(uploadNode.ID(), chunkCount)
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
id := ctx.Config.ID
addr := network.NewAddrFromNodeID(id)
store, datadir, err := createTestLocalStorageForID(id, addr)
if err != nil {
return err
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
cleanup = func() {
os.RemoveAll(datadir)
store.Close()
}
localStore := store.(*storage.LocalStore)
db := storage.NewDBAPI(localStore)
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, db)
//variables needed to wait for all subscriptions established before uploading
errc := make(chan error)
r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
DoSync: true,
SyncUpdateDelay: 0,
})
//now setup and start event watching in order to know when we can upload
ctx, watchCancel := context.WithTimeout(context.Background(), MaxTimeout*time.Second)
defer watchCancel()
fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
bucketKeyFileStore = simulation.BucketKey("filestore")
bucket.Store(bucketKeyFileStore, fileStore)
log.Info("Setting up stream subscription")
//We need two iterations, one to subscribe to the subscription events
//(so we know when setup phase is finished), and one to
//actually run the stream subscriptions. We can't do it in the same iteration,
//because while the first nodes in the loop are setting up subscriptions,
//the latter ones have not subscribed to listen to peer events yet,
//and then we miss events.
return r, cleanup, nil
//first iteration: setup disconnection watcher and subscribe to peer events
for j, id := range ids {
log.Trace(fmt.Sprintf("Subscribe to subscription events: %d", j))
client, err := net.GetNode(id).Client()
if err != nil {
return err
}
},
})
defer sim.Close()
//check for `SubscribeMsg` events to know when setup phase is complete
wsDoneC := watchSubscriptionEvents(ctx, id, client, errc, quitC)
// doneC is nil, the error happened which is sent to errc channel, already
if wsDoneC == nil {
continue
}
rpcSubscriptionsWg.Add(1)
go func() {
<-wsDoneC
rpcSubscriptionsWg.Done()
}()
conf := &synctestConfig{}
//map of discover ID to indexes of chunks expected at that ID
conf.idToChunksMap = make(map[discover.NodeID][]int)
//map of overlay address to discover ID
conf.addrToIDMap = make(map[string]discover.NodeID)
//array where the generated chunk hashes will be stored
conf.hashes = make([]storage.Address, 0)
//watch for peers disconnecting
wdDoneC, err := streamTesting.WatchDisconnections(id, client, disconnectC, quitC)
err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
if err != nil {
return err
}
rpcSubscriptionsWg.Add(1)
go func() {
<-wdDoneC
rpcSubscriptionsWg.Done()
}()
ctx := context.Background()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
for _, n := range nodeIDs {
//get the kademlia overlay address from this ID
a := network.ToOverlayAddr(n.Bytes())
//append it to the array of all overlay addresses
conf.addrs = append(conf.addrs, a)
//the proximity calculation is on overlay addr,
//the p2p/simulations check func triggers on discover.NodeID,
//so we need to know which overlay addr maps to which nodeID
conf.addrToIDMap[string(a)] = n
}
//second iteration: start syncing and setup stream subscriptions
for j, id := range ids {
log.Trace(fmt.Sprintf("Start syncing and stream subscriptions: %d", j))
client, err := net.GetNode(id).Client()
if err != nil {
return err
//an array for the random files
var randomFiles []string
//this is the node selected for upload
node := sim.RandomUpNode()
item, ok := sim.NodeItem(node.ID, bucketKeyStore)
if !ok {
return fmt.Errorf("No localstore")
}
//start syncing!
var cnt int
err = client.CallContext(ctx, &cnt, "stream_startSyncing")
lstore := item.(*storage.LocalStore)
conf.hashes, err = uploadFileToSingleNodeStore(node.ID, chunkCount, lstore)
if err != nil {
return err
}
//increment the number of subscriptions we need to wait for
//by the count returned from startSyncing (SYNC subscriptions)
subscriptionCount += cnt
//now also add the number of RETRIEVAL_REQUEST subscriptions
for snid := range registries[id].peers {
subscriptionCount++
err = client.CallContext(ctx, nil, "stream_subscribeStream", snid, NewStream(swarmChunkServerStreamName, "", false), nil, Top)
if err != nil {
if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
return err
}
}
}
//now wait until the number of expected subscriptions has been finished
//`watchSubscriptionEvents` will write with a `nil` value to errc
//every time a `SubscriptionMsg` has been received
for err := range errc {
if err != nil {
return err
// File retrieval check is repeated until all uploaded files are retrieved from all nodes
// or until the timeout is reached.
allSuccess := false
for !allSuccess {
for _, id := range nodeIDs {
//for each expected chunk, check if it is in the local store
localChunks := conf.idToChunksMap[id]
localSuccess := true
for _, ch := range localChunks {
//get the real chunk by the index in the index array
chunk := conf.hashes[ch]
log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
//check if the expected chunk is indeed in the localstore
var err error
//check on the node's FileStore (netstore)
item, ok := sim.NodeItem(id, bucketKeyFileStore)
if !ok {
return fmt.Errorf("No registry")
}
//`nil` received, decrement count
subscriptionCount--
//all subscriptions received
if subscriptionCount == 0 {
break
fileStore := item.(*storage.FileStore)
//check all chunks
for i, hash := range conf.hashes {
reader, _ := fileStore.Retrieve(context.TODO(), hash)
//check that we can read the file size and that it corresponds to the generated file size
if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) {
allSuccess = false
log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id)
} else {
log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash))
}
}
log.Info("Stream subscriptions successfully requested, action terminated")
if live {
//now upload the chunks to the selected random single node
chnks, err := uploadFileToSingleNodeStore(uploadNode.ID(), chunkCount)
if err != nil {
return err
}
conf.hashes = append(conf.hashes, chnks...)
}
return nil
}
chunkSize := storage.DefaultChunkSize
//check defines what will be checked during the test
check := func(ctx context.Context, id discover.NodeID) (bool, error) {
//don't check the uploader node
if id == uploadNode.ID() {
return true, nil
}
select {
case <-ctx.Done():
return false, ctx.Err()
case e := <-disconnectC:
log.Error(e.Error())
return false, fmt.Errorf("Disconnect event detected, network unhealthy")
default:
}
log.Trace(fmt.Sprintf("Checking node: %s", id))
//if there are more than one chunk, test only succeeds if all expected chunks are found
allSuccess := true
//check on the node's FileStore (netstore)
fileStore := registries[id].fileStore
//check all chunks
for _, chnk := range conf.hashes {
reader, _ := fileStore.Retrieve(context.TODO(), chnk)
//assuming that reading the Size of the chunk is enough to know we found it
if s, err := reader.Size(context.TODO(), nil); err != nil || s != chunkSize {
allSuccess = false
log.Warn("Retrieve error", "err", err, "chunk", chnk, "nodeId", id)
log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
localSuccess = false
} else {
log.Debug(fmt.Sprintf("Chunk %x found", chnk))
log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
}
}
return allSuccess, nil
allSuccess = localSuccess
}
//for each tick, run the checks on all nodes
timingTicker := time.NewTicker(5 * time.Second)
defer timingTicker.Stop()
go func() {
for range timingTicker.C {
for i := 0; i < len(ids); i++ {
log.Trace(fmt.Sprintf("triggering step %d, id %s", i, ids[i]))
trigger <- ids[i]
}
if !allSuccess {
return fmt.Errorf("Not all chunks succeeded!")
}
}()
log.Info("Starting simulation run...")
timeout := MaxTimeout * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
//run the simulation
result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{
Action: action,
Trigger: trigger,
Expect: &simulations.Expectation{
Nodes: ids,
Check: check,
},
return nil
})
if result.Error != nil {
@ -743,53 +388,3 @@ func runRetrievalTest(chunkCount int, nodeCount int) error {
return nil
}
//upload generated files to nodes
//every node gets one file uploaded
func uploadFilesToNodes(nodes []*simulations.Node) ([]storage.Address, []string, error) {
nodeCnt := len(nodes)
log.Debug(fmt.Sprintf("Uploading %d files to nodes", nodeCnt))
//array holding generated files
rfiles := make([]string, nodeCnt)
//array holding the root hashes of the files
rootAddrs := make([]storage.Address, nodeCnt)
var err error
//for every node, generate a file and upload
for i, n := range nodes {
id := n.ID()
fileStore := registries[id].fileStore
//generate a file
rfiles[i], err = generateRandomFile()
if err != nil {
return nil, nil, err
}
//store it (upload it) on the FileStore
ctx := context.TODO()
rk, wait, err := fileStore.Store(ctx, strings.NewReader(rfiles[i]), int64(len(rfiles[i])), false)
log.Debug("Uploaded random string file to node")
if err != nil {
return nil, nil, err
}
err = wait(ctx)
if err != nil {
return nil, nil, err
}
rootAddrs[i] = rk
}
return rootAddrs, rfiles, nil
}
//generate a random file (string)
func generateRandomFile() (string, error) {
//generate a random file size between minFileSize and maxFileSize
fileSize := rand.Intn(maxFileSize-minFileSize) + minFileSize
log.Debug(fmt.Sprintf("Generated file with filesize %d kB", fileSize))
b := make([]byte, fileSize*1024)
_, err := crand.Read(b)
if err != nil {
log.Error("Error generating random file.", "err", err)
return "", err
}
return string(b), nil
}

@ -18,12 +18,8 @@ package stream
import (
"context"
crand "crypto/rand"
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"sync"
"testing"
@ -31,82 +27,27 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/network"
streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/pot"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db"
)
const testMinProxBinSize = 2
const MaxTimeout = 600
var (
pof = pot.DefaultPof(256)
conf *synctestConfig
ids []discover.NodeID
datadirs map[discover.NodeID]string
ppmap map[string]*network.PeerPot
live bool
history bool
longrunning = flag.Bool("longrunning", false, "do run long-running tests")
)
type synctestConfig struct {
addrs [][]byte
hashes []storage.Address
idToChunksMap map[discover.NodeID][]int
chunksToNodesMap map[string][]int
addrToIdMap map[string]discover.NodeID
}
func init() {
rand.Seed(time.Now().Unix())
}
//common_test needs to initialize the test in a init() func
//in order for adapters to register the NewStreamerService;
//this service is dependent on some global variables
//we thus need to initialize first as init() as well.
func initSyncTest() {
//assign the toAddr func so NewStreamerService can build the addr
toAddr = func(id discover.NodeID) *network.BzzAddr {
addr := network.NewAddrFromNodeID(id)
return addr
}
//global func to create local store
if *useMockStore {
createStoreFunc = createMockStore
} else {
createStoreFunc = createTestLocalStorageForId
}
//local stores
stores = make(map[discover.NodeID]storage.ChunkStore)
//data directories for each node and store
datadirs = make(map[discover.NodeID]string)
//deliveries for each node
deliveries = make(map[discover.NodeID]*Delivery)
//registries, map of discover.NodeID to its streamer
registries = make(map[discover.NodeID]*TestRegistry)
//not needed for this test but required from common_test for NewStreamService
waitPeerErrC = make(chan error)
//also not needed for this test but required for NewStreamService
peerCount = func(id discover.NodeID) int {
if ids[0] == id || ids[len(ids)-1] == id {
return 1
}
return 2
}
if *useMockStore {
createGlobalStore()
}
addrToIDMap map[string]discover.NodeID
}
//This test is a syncing test for nodes.
@ -116,12 +57,12 @@ func initSyncTest() {
//to the pivot node, and we check that nodes get the chunks
//they are expected to store based on the syncing protocol.
//Number of chunks and nodes can be provided via commandline too.
func TestSyncing(t *testing.T) {
func TestSyncingViaGlobalSync(t *testing.T) {
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
testSyncing(t, *chunks, *nodes)
testSyncingViaGlobalSync(t, *chunks, *nodes)
} else {
var nodeCnt []int
var chnkCnt []int
@ -138,230 +79,279 @@ func TestSyncing(t *testing.T) {
for _, chnk := range chnkCnt {
for _, n := range nodeCnt {
log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
testSyncing(t, chnk, n)
testSyncingViaGlobalSync(t, chnk, n)
}
}
}
}
//Do run the tests
//Every test runs 3 times, a live, a history, and a live AND history
func testSyncing(t *testing.T, chunkCount int, nodeCount int) {
//test live and NO history
log.Info("Testing live and no history")
live = true
history = false
err := runSyncTest(chunkCount, nodeCount, live, history)
func TestSyncingViaDirectSubscribe(t *testing.T) {
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
err := testSyncingViaDirectSubscribe(*chunks, *nodes)
if err != nil {
t.Fatal(err)
}
//test history only
log.Info("Testing history only")
live = false
history = true
err = runSyncTest(chunkCount, nodeCount, live, history)
if err != nil {
t.Fatal(err)
} else {
var nodeCnt []int
var chnkCnt []int
//if the `longrunning` flag has been provided
//run more test combinations
if *longrunning {
chnkCnt = []int{1, 8, 32, 256, 1024}
nodeCnt = []int{32, 16}
} else {
//default test
chnkCnt = []int{4, 32}
nodeCnt = []int{32, 16}
}
//finally test live and history
log.Info("Testing live and history")
live = true
err = runSyncTest(chunkCount, nodeCount, live, history)
for _, chnk := range chnkCnt {
for _, n := range nodeCnt {
log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
err := testSyncingViaDirectSubscribe(chnk, n)
if err != nil {
t.Fatal(err)
}
}
}
}
}
/*
The test generates the given number of chunks
func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
The upload is done by dependency to the global
`live` and `history` variables;
id := ctx.Config.ID
addr := network.NewAddrFromNodeID(id)
store, datadir, err := createTestLocalStorageForID(id, addr)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
cleanup = func() {
os.RemoveAll(datadir)
store.Close()
}
localStore := store.(*storage.LocalStore)
db := storage.NewDBAPI(localStore)
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, db)
If `live` is set, first stream subscriptions are established, then
upload to a random node.
r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
DoSync: true,
SyncUpdateDelay: 3 * time.Second,
})
bucket.Store(bucketKeyRegistry, r)
If `history` is enabled, first upload then build up subscriptions.
return r, cleanup, nil
For every chunk generated, the nearest node addresses
are identified, we verify that the nodes closer to the
chunk addresses actually do have the chunks in their local stores.
},
})
defer sim.Close()
The test loads a snapshot file to construct the swarm network,
assuming that the snapshot file identifies a healthy
kademlia network. The snapshot should have 'streamer' in its service list.
log.Info("Initializing test config")
For every test run, a series of three tests will be executed:
- a LIVE test first, where first subscriptions are established,
then a file (random chunks) is uploaded
- a HISTORY test, where the file is uploaded first, and then
the subscriptions are established
- a crude LIVE AND HISTORY test last, where (different) chunks
are uploaded twice, once before and once after subscriptions
*/
func runSyncTest(chunkCount int, nodeCount int, live bool, history bool) error {
initSyncTest()
//the ids of the snapshot nodes, initiate only now as we need nodeCount
ids = make([]discover.NodeID, nodeCount)
//initialize the test struct
conf = &synctestConfig{}
conf := &synctestConfig{}
//map of discover ID to indexes of chunks expected at that ID
conf.idToChunksMap = make(map[discover.NodeID][]int)
//map of overlay address to discover ID
conf.addrToIdMap = make(map[string]discover.NodeID)
conf.addrToIDMap = make(map[string]discover.NodeID)
//array where the generated chunk hashes will be stored
conf.hashes = make([]storage.Address, 0)
//channel to trigger node checks in the simulation
trigger := make(chan discover.NodeID)
//channel to check for disconnection errors
disconnectC := make(chan error)
//channel to close disconnection watcher routine
quitC := make(chan struct{})
//load nodes from the snapshot file
net, err := initNetWithSnapshot(nodeCount)
err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
if err != nil {
return err
t.Fatal(err)
}
var rpcSubscriptionsWg sync.WaitGroup
//do cleanup after test is terminated
defer func() {
// close quitC channel to signall all goroutines to clanup
// before calling simulation network shutdown.
close(quitC)
//wait for all rpc subscriptions to unsubscribe
rpcSubscriptionsWg.Wait()
//shutdown the snapshot network
net.Shutdown()
//after the test, clean up local stores initialized with createLocalStoreForId
localStoreCleanup()
//finally clear all data directories
datadirsCleanup()
}()
//get the nodes of the network
nodes := net.GetNodes()
//select one index at random...
idx := rand.Intn(len(nodes))
//...and get the the node at that index
//this is the node selected for upload
node := nodes[idx]
log.Info("Initializing test config")
//iterate over all nodes...
for c := 0; c < len(nodes); c++ {
//create an array of discovery node IDs
ids[c] = nodes[c].ID()
ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancelSimRun()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
for _, n := range nodeIDs {
//get the kademlia overlay address from this ID
a := network.ToOverlayAddr(ids[c].Bytes())
a := network.ToOverlayAddr(n.Bytes())
//append it to the array of all overlay addresses
conf.addrs = append(conf.addrs, a)
//the proximity calculation is on overlay addr,
//the p2p/simulations check func triggers on discover.NodeID,
//so we need to know which overlay addr maps to which nodeID
conf.addrToIdMap[string(a)] = ids[c]
}
log.Info("Test config successfully initialized")
//only needed for healthy call when debugging
ppmap = network.NewPeerPotMap(testMinProxBinSize, conf.addrs)
//define the action to be performed before the test checks: start syncing
action := func(ctx context.Context) error {
//first run the health check on all nodes,
//wait until nodes are all healthy
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
healthy := true
for _, id := range ids {
r := registries[id]
//PeerPot for this node
addr := common.Bytes2Hex(network.ToOverlayAddr(id.Bytes()))
pp := ppmap[addr]
//call Healthy RPC
h := r.delivery.overlay.Healthy(pp)
//print info
log.Debug(r.delivery.overlay.String())
log.Debug(fmt.Sprintf("IS HEALTHY: %t", h.GotNN && h.KnowNN && h.Full))
if !h.GotNN || !h.Full {
healthy = false
break
conf.addrToIDMap[string(a)] = n
}
//get the the node at that index
//this is the node selected for upload
node := sim.RandomUpNode()
item, ok := sim.NodeItem(node.ID, bucketKeyStore)
if !ok {
return fmt.Errorf("No localstore")
}
if healthy {
break
lstore := item.(*storage.LocalStore)
hashes, err := uploadFileToSingleNodeStore(node.ID, chunkCount, lstore)
if err != nil {
return err
}
conf.hashes = append(conf.hashes, hashes...)
mapKeysToNodes(conf)
if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
return err
}
if history {
log.Info("Uploading for history")
//If testing only history, we upload the chunk(s) first
chunks, err := uploadFileToSingleNodeStore(node.ID(), chunkCount)
// File retrieval check is repeated until all uploaded files are retrieved from all nodes
// or until the timeout is reached.
allSuccess := false
var gDir string
var globalStore *mockdb.GlobalStore
if *useMockStore {
gDir, globalStore, err = createGlobalStore()
if err != nil {
return err
return fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil")
}
conf.hashes = append(conf.hashes, chunks...)
//finally map chunks to the closest addresses
mapKeysToNodes(conf)
defer func() {
os.RemoveAll(gDir)
err := globalStore.Close()
if err != nil {
log.Error("Error closing global store! %v", "err", err)
}
}()
}
for !allSuccess {
for _, id := range nodeIDs {
//for each expected chunk, check if it is in the local store
localChunks := conf.idToChunksMap[id]
localSuccess := true
for _, ch := range localChunks {
//get the real chunk by the index in the index array
chunk := conf.hashes[ch]
log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
//check if the expected chunk is indeed in the localstore
var err error
if *useMockStore {
//use the globalStore if the mockStore should be used; in that case,
//the complete localStore stack is bypassed for getting the chunk
_, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk)
} else {
//use the actual localstore
item, ok := sim.NodeItem(id, bucketKeyStore)
if !ok {
return fmt.Errorf("Error accessing localstore")
}
lstore := item.(*storage.LocalStore)
_, err = lstore.Get(ctx, chunk)
}
if err != nil {
log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
localSuccess = false
} else {
log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
}
}
allSuccess = localSuccess
}
}
if !allSuccess {
return fmt.Errorf("Not all chunks succeeded!")
}
return nil
})
//variables needed to wait for all subscriptions established before uploading
errc := make(chan error)
if result.Error != nil {
t.Fatal(result.Error)
}
}
//now setup and start event watching in order to know when we can upload
ctx, watchCancel := context.WithTimeout(context.Background(), MaxTimeout*time.Second)
defer watchCancel()
/*
The test generates the given number of chunks
log.Info("Setting up stream subscription")
For every chunk generated, the nearest node addresses
are identified, we verify that the nodes closer to the
chunk addresses actually do have the chunks in their local stores.
//We need two iterations, one to subscribe to the subscription events
//(so we know when setup phase is finished), and one to
//actually run the stream subscriptions. We can't do it in the same iteration,
//because while the first nodes in the loop are setting up subscriptions,
//the latter ones have not subscribed to listen to peer events yet,
//and then we miss events.
The test loads a snapshot file to construct the swarm network,
assuming that the snapshot file identifies a healthy
kademlia network. The snapshot should have 'streamer' in its service list.
*/
func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
//first iteration: setup disconnection watcher and subscribe to peer events
for j, id := range ids {
log.Trace(fmt.Sprintf("Subscribe to subscription events: %d", j))
client, err := net.GetNode(id).Client()
id := ctx.Config.ID
addr := network.NewAddrFromNodeID(id)
store, datadir, err := createTestLocalStorageForID(id, addr)
if err != nil {
return err
return nil, nil, err
}
wsDoneC := watchSubscriptionEvents(ctx, id, client, errc, quitC)
// doneC is nil, the error happened which is sent to errc channel, already
if wsDoneC == nil {
continue
bucket.Store(bucketKeyStore, store)
cleanup = func() {
os.RemoveAll(datadir)
store.Close()
}
rpcSubscriptionsWg.Add(1)
go func() {
<-wsDoneC
rpcSubscriptionsWg.Done()
}()
localStore := store.(*storage.LocalStore)
db := storage.NewDBAPI(localStore)
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, db)
r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil)
bucket.Store(bucketKeyRegistry, r)
fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
return r, cleanup, nil
},
})
defer sim.Close()
ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancelSimRun()
conf := &synctestConfig{}
//map of discover ID to indexes of chunks expected at that ID
conf.idToChunksMap = make(map[discover.NodeID][]int)
//map of overlay address to discover ID
conf.addrToIDMap = make(map[string]discover.NodeID)
//array where the generated chunk hashes will be stored
conf.hashes = make([]storage.Address, 0)
//watch for peers disconnecting
wdDoneC, err := streamTesting.WatchDisconnections(id, client, disconnectC, quitC)
err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
if err != nil {
return err
}
rpcSubscriptionsWg.Add(1)
go func() {
<-wdDoneC
rpcSubscriptionsWg.Done()
}()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
for _, n := range nodeIDs {
//get the kademlia overlay address from this ID
a := network.ToOverlayAddr(n.Bytes())
//append it to the array of all overlay addresses
conf.addrs = append(conf.addrs, a)
//the proximity calculation is on overlay addr,
//the p2p/simulations check func triggers on discover.NodeID,
//so we need to know which overlay addr maps to which nodeID
conf.addrToIDMap[string(a)] = n
}
//second iteration: start syncing
for j, id := range ids {
var subscriptionCount int
filter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(4)
eventC := sim.PeerEvents(ctx, nodeIDs, filter)
for j, node := range nodeIDs {
log.Trace(fmt.Sprintf("Start syncing subscriptions: %d", j))
client, err := net.GetNode(id).Client()
if err != nil {
return err
}
//start syncing!
item, ok := sim.NodeItem(node, bucketKeyRegistry)
if !ok {
return fmt.Errorf("No registry")
}
registry := item.(*Registry)
var cnt int
err = client.CallContext(ctx, &cnt, "stream_startSyncing")
cnt, err = startSyncing(registry, conf)
if err != nil {
return err
}
@ -370,57 +360,50 @@ func runSyncTest(chunkCount int, nodeCount int, live bool, history bool) error {
subscriptionCount += cnt
}
//now wait until the number of expected subscriptions has been finished
//`watchSubscriptionEvents` will write with a `nil` value to errc
for err := range errc {
if err != nil {
return err
for e := range eventC {
if e.Error != nil {
return e.Error
}
//`nil` received, decrement count
subscriptionCount--
//all subscriptions received
if subscriptionCount == 0 {
break
}
}
log.Info("Stream subscriptions successfully requested")
if live {
//now upload the chunks to the selected random single node
hashes, err := uploadFileToSingleNodeStore(node.ID(), chunkCount)
//select a random node for upload
node := sim.RandomUpNode()
item, ok := sim.NodeItem(node.ID, bucketKeyStore)
if !ok {
return fmt.Errorf("No localstore")
}
lstore := item.(*storage.LocalStore)
hashes, err := uploadFileToSingleNodeStore(node.ID, chunkCount, lstore)
if err != nil {
return err
}
conf.hashes = append(conf.hashes, hashes...)
//finally map chunks to the closest addresses
log.Debug(fmt.Sprintf("Uploaded chunks for live syncing: %v", conf.hashes))
mapKeysToNodes(conf)
log.Info(fmt.Sprintf("Uploaded %d chunks to random single node", chunkCount))
}
log.Info("Action terminated")
return nil
if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
return err
}
//check defines what will be checked during the test
check := func(ctx context.Context, id discover.NodeID) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
case e := <-disconnectC:
log.Error(e.Error())
return false, fmt.Errorf("Disconnect event detected, network unhealthy")
default:
var gDir string
var globalStore *mockdb.GlobalStore
if *useMockStore {
gDir, globalStore, err = createGlobalStore()
if err != nil {
return fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil")
}
log.Trace(fmt.Sprintf("Checking node: %s", id))
//select the local store for the given node
//if there are more than one chunk, test only succeeds if all expected chunks are found
allSuccess := true
//all the chunk indexes which are supposed to be found for this node
localChunks := conf.idToChunksMap[id]
defer os.RemoveAll(gDir)
}
// File retrieval check is repeated until all uploaded files are retrieved from all nodes
// or until the timeout is reached.
allSuccess := false
for !allSuccess {
for _, id := range nodeIDs {
//for each expected chunk, check if it is in the local store
localChunks := conf.idToChunksMap[id]
localSuccess := true
for _, ch := range localChunks {
//get the real chunk by the index in the index array
chunk := conf.hashes[ch]
@ -428,59 +411,38 @@ func runSyncTest(chunkCount int, nodeCount int, live bool, history bool) error {
//check if the expected chunk is indeed in the localstore
var err error
if *useMockStore {
if globalStore == nil {
return false, fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil")
}
//use the globalStore if the mockStore should be used; in that case,
//the complete localStore stack is bypassed for getting the chunk
_, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk)
} else {
//use the actual localstore
lstore := stores[id]
_, err = lstore.Get(context.TODO(), chunk)
item, ok := sim.NodeItem(id, bucketKeyStore)
if !ok {
return fmt.Errorf("Error accessing localstore")
}
lstore := item.(*storage.LocalStore)
_, err = lstore.Get(ctx, chunk)
}
if err != nil {
log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
allSuccess = false
localSuccess = false
} else {
log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
}
}
return allSuccess, nil
allSuccess = localSuccess
}
//for each tick, run the checks on all nodes
timingTicker := time.NewTicker(time.Second * 1)
defer timingTicker.Stop()
go func() {
for range timingTicker.C {
for i := 0; i < len(ids); i++ {
log.Trace(fmt.Sprintf("triggering step %d, id %s", i, ids[i]))
trigger <- ids[i]
}
if !allSuccess {
return fmt.Errorf("Not all chunks succeeded!")
}
}()
log.Info("Starting simulation run...")
timeout := MaxTimeout * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
//run the simulation
result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{
Action: action,
Trigger: trigger,
Expect: &simulations.Expectation{
Nodes: ids,
Check: check,
},
return nil
})
if result.Error != nil {
return result.Error
}
log.Info("Simulation terminated")
return nil
}
@ -489,20 +451,9 @@ func runSyncTest(chunkCount int, nodeCount int, live bool, history bool) error {
//issues `RequestSubscriptionMsg` to peers, based on po, by iterating over
//the kademlia's `EachBin` function.
//returns the number of subscriptions requested
func (r *TestRegistry) StartSyncing(ctx context.Context) (int, error) {
func startSyncing(r *Registry, conf *synctestConfig) (int, error) {
var err error
if log.Lvl(*loglevel) == log.LvlDebug {
//PeerPot for this node
addr := common.Bytes2Hex(r.addr.OAddr)
pp := ppmap[addr]
//call Healthy RPC
h := r.delivery.overlay.Healthy(pp)
//print info
log.Debug(r.delivery.overlay.String())
log.Debug(fmt.Sprintf("IS HEALTHY: %t", h.GotNN && h.KnowNN && h.Full))
}
kad, ok := r.delivery.overlay.(*network.Kademlia)
if !ok {
return 0, fmt.Errorf("Not a Kademlia!")
@ -512,14 +463,10 @@ func (r *TestRegistry) StartSyncing(ctx context.Context) (int, error) {
//iterate over each bin and solicit needed subscription to bins
kad.EachBin(r.addr.Over(), pof, 0, func(conn network.OverlayConn, po int) bool {
//identify begin and start index of the bin(s) we want to subscribe to
log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr.ID(), conf.addrToIdMap[string(conn.Address())], po))
var histRange *Range
if history {
histRange = &Range{}
}
histRange := &Range{}
subCnt++
err = r.RequestSubscription(conf.addrToIdMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), live), histRange, Top)
err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), histRange, Top)
if err != nil {
log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err))
return false
@ -552,7 +499,7 @@ func mapKeysToNodes(conf *synctestConfig) {
return false
}
if pl == 256 || pl == po {
log.Trace(fmt.Sprintf("appending %s", conf.addrToIdMap[string(a)]))
log.Trace(fmt.Sprintf("appending %s", conf.addrToIDMap[string(a)]))
nns = append(nns, indexmap[string(a)])
nodemap[string(a)] = append(nodemap[string(a)], i)
}
@ -567,26 +514,24 @@ func mapKeysToNodes(conf *synctestConfig) {
}
for addr, chunks := range nodemap {
//this selects which chunks are expected to be found with the given node
conf.idToChunksMap[conf.addrToIdMap[addr]] = chunks
conf.idToChunksMap[conf.addrToIDMap[addr]] = chunks
}
log.Debug(fmt.Sprintf("Map of expected chunks by ID: %v", conf.idToChunksMap))
conf.chunksToNodesMap = kmap
}
//upload a file(chunks) to a single local node store
func uploadFileToSingleNodeStore(id discover.NodeID, chunkCount int) ([]storage.Address, error) {
func uploadFileToSingleNodeStore(id discover.NodeID, chunkCount int, lstore *storage.LocalStore) ([]storage.Address, error) {
log.Debug(fmt.Sprintf("Uploading to node id: %s", id))
lstore := stores[id]
size := chunkSize
fileStore := storage.NewFileStore(lstore, storage.NewFileStoreParams())
size := chunkSize
var rootAddrs []storage.Address
for i := 0; i < chunkCount; i++ {
ctx := context.TODO()
rk, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
rk, wait, err := fileStore.Store(context.TODO(), io.LimitReader(crand.Reader, int64(size)), int64(size), false)
if err != nil {
return nil, err
}
err = wait(ctx)
err = wait(context.TODO())
if err != nil {
return nil, err
}
@ -595,129 +540,3 @@ func uploadFileToSingleNodeStore(id discover.NodeID, chunkCount int) ([]storage.
return rootAddrs, nil
}
//initialize a network from a snapshot
func initNetWithSnapshot(nodeCount int) (*simulations.Network, error) {
var a adapters.NodeAdapter
//add the streamer service to the node adapter
if *adapter == "exec" {
dirname, err := ioutil.TempDir(".", "")
if err != nil {
return nil, err
}
a = adapters.NewExecAdapter(dirname)
} else if *adapter == "tcp" {
a = adapters.NewTCPAdapter(services)
} else if *adapter == "sim" {
a = adapters.NewSimAdapter(services)
}
log.Info("Setting up Snapshot network")
net := simulations.NewNetwork(a, &simulations.NetworkConfig{
ID: "0",
DefaultService: "streamer",
})
f, err := os.Open(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
if err != nil {
return nil, err
}
defer f.Close()
jsonbyte, err := ioutil.ReadAll(f)
if err != nil {
return nil, err
}
var snap simulations.Snapshot
err = json.Unmarshal(jsonbyte, &snap)
if err != nil {
return nil, err
}
//the snapshot probably has the property EnableMsgEvents not set
//just in case, set it to true!
//(we need this to wait for messages before uploading)
for _, n := range snap.Nodes {
n.Node.Config.EnableMsgEvents = true
}
log.Info("Waiting for p2p connections to be established...")
//now we can load the snapshot
err = net.Load(&snap)
if err != nil {
return nil, err
}
log.Info("Snapshot loaded")
return net, nil
}
//we want to wait for subscriptions to be established before uploading to test
//that live syncing is working correctly
func watchSubscriptionEvents(ctx context.Context, id discover.NodeID, client *rpc.Client, errc chan error, quitC chan struct{}) (doneC <-chan struct{}) {
events := make(chan *p2p.PeerEvent)
sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents")
if err != nil {
log.Error(err.Error())
errc <- fmt.Errorf("error getting peer events for node %v: %s", id, err)
return
}
c := make(chan struct{})
go func() {
defer func() {
log.Trace("watch subscription events: unsubscribe", "id", id)
sub.Unsubscribe()
close(c)
}()
for {
select {
case <-quitC:
return
case <-ctx.Done():
select {
case errc <- ctx.Err():
case <-quitC:
}
return
case e := <-events:
//just catch SubscribeMsg
if e.Type == p2p.PeerEventTypeMsgRecv && e.Protocol == "stream" && e.MsgCode != nil && *e.MsgCode == 4 {
errc <- nil
}
case err := <-sub.Err():
if err != nil {
select {
case errc <- fmt.Errorf("error getting peer events for node %v: %v", id, err):
case <-quitC:
}
return
}
}
}
}()
return c
}
//create a local store for the given node
func createTestLocalStorageForId(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error) {
var datadir string
var err error
datadir, err = ioutil.TempDir("", fmt.Sprintf("syncer-test-%s", id.TerminalString()))
if err != nil {
return nil, err
}
datadirs[id] = datadir
var store storage.ChunkStore
params := storage.NewDefaultLocalStoreParams()
params.ChunkDbPath = datadir
params.BaseKey = addr.Over()
store, err = storage.NewTestLocalStoreForAddr(params)
if err != nil {
return nil, err
}
return store, nil
}

@ -23,18 +23,22 @@ import (
"io"
"io/ioutil"
"math"
"os"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db"
)
const dataChunkCount = 200
@ -46,192 +50,174 @@ func TestSyncerSimulation(t *testing.T) {
testSyncBetweenNodes(t, 16, 1, dataChunkCount, true, 1)
}
func createMockStore(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error) {
var err error
func createMockStore(globalStore *mockdb.GlobalStore, id discover.NodeID, addr *network.BzzAddr) (lstore storage.ChunkStore, datadir string, err error) {
address := common.BytesToAddress(id.Bytes())
mockStore := globalStore.NewNodeStore(address)
params := storage.NewDefaultLocalStoreParams()
datadirs[id], err = ioutil.TempDir("", "localMockStore-"+id.TerminalString())
datadir, err = ioutil.TempDir("", "localMockStore-"+id.TerminalString())
if err != nil {
return nil, err
return nil, "", err
}
params.Init(datadirs[id])
params.Init(datadir)
params.BaseKey = addr.Over()
lstore, err := storage.NewLocalStore(params, mockStore)
return lstore, nil
lstore, err = storage.NewLocalStore(params, mockStore)
return lstore, datadir, nil
}
func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck bool, po uint8) {
defer setDefaultSkipCheck(defaultSkipCheck)
defaultSkipCheck = skipCheck
//data directories for each node and store
datadirs = make(map[discover.NodeID]string)
if *useMockStore {
createStoreFunc = createMockStore
createGlobalStore()
} else {
createStoreFunc = createTestLocalStorageFromSim
}
defer datadirsCleanup()
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
var store storage.ChunkStore
var globalStore *mockdb.GlobalStore
var gDir, datadir string
registries = make(map[discover.NodeID]*TestRegistry)
toAddr = func(id discover.NodeID) *network.BzzAddr {
id := ctx.Config.ID
addr := network.NewAddrFromNodeID(id)
//hack to put addresses in same space
addr.OAddr[0] = byte(0)
return addr
if *useMockStore {
gDir, globalStore, err = createGlobalStore()
if err != nil {
return nil, nil, fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil")
}
conf := &streamTesting.RunConfig{
Adapter: *adapter,
NodeCount: nodes,
ConnLevel: conns,
ToAddr: toAddr,
Services: services,
EnableMsgEvents: false,
store, datadir, err = createMockStore(globalStore, id, addr)
} else {
store, datadir, err = createTestLocalStorageForID(id, addr)
}
// HACK: these are global variables in the test so that they are available for
// the service constructor function
// TODO: will this work with exec/docker adapter?
// localstore of nodes made available for action and check calls
stores = make(map[discover.NodeID]storage.ChunkStore)
deliveries = make(map[discover.NodeID]*Delivery)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
cleanup = func() {
store.Close()
os.RemoveAll(datadir)
if *useMockStore {
err := globalStore.Close()
if err != nil {
log.Error("Error closing global store! %v", "err", err)
}
os.RemoveAll(gDir)
}
}
localStore := store.(*storage.LocalStore)
db := storage.NewDBAPI(localStore)
bucket.Store(bucketKeyDB, db)
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, db)
bucket.Store(bucketKeyDelivery, delivery)
r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
})
fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
return r, cleanup, nil
},
})
defer sim.Close()
// create context for simulation run
timeout := 30 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
// defer cancel should come before defer simulation teardown
defer cancel()
// create simulation network with the config
sim, teardown, err := streamTesting.NewSimulation(conf)
var rpcSubscriptionsWg sync.WaitGroup
defer func() {
rpcSubscriptionsWg.Wait()
teardown()
}()
_, err := sim.AddNodesAndConnectChain(nodes)
if err != nil {
t.Fatal(err.Error())
t.Fatal(err)
}
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
nodeIndex := make(map[discover.NodeID]int)
for i, id := range sim.IDs {
for i, id := range nodeIDs {
nodeIndex[id] = i
if !*useMockStore {
stores[id] = sim.Stores[i]
sim.Stores[i] = stores[id]
}
}
// peerCount function gives the number of peer connections for a nodeID
// this is needed for the service run function to wait until
// each protocol instance runs and the streamer peers are available
peerCount = func(id discover.NodeID) int {
if sim.IDs[0] == id || sim.IDs[nodes-1] == id {
return 1
}
return 2
}
waitPeerErrC = make(chan error)
// create DBAPI-s for all nodes
dbs := make([]*storage.DBAPI, nodes)
for i := 0; i < nodes; i++ {
dbs[i] = storage.NewDBAPI(sim.Stores[i].(*storage.LocalStore))
}
disconnections := sim.PeerEvents(
context.Background(),
sim.NodeIDs(),
simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
)
// collect hashes in po 1 bin for each node
hashes := make([][]storage.Address, nodes)
totalHashes := 0
hashCounts := make([]int, nodes)
for i := nodes - 1; i >= 0; i-- {
if i < nodes-1 {
hashCounts[i] = hashCounts[i+1]
go func() {
for d := range disconnections {
if d.Error != nil {
log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
t.Fatal(d.Error)
}
dbs[i].Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool {
hashes[i] = append(hashes[i], addr)
totalHashes++
hashCounts[i]++
return true
})
}
}()
// errc is error channel for simulation
errc := make(chan error, 1)
quitC := make(chan struct{})
defer close(quitC)
// action is subscribe
action := func(ctx context.Context) error {
// need to wait till an aynchronous process registers the peers in streamer.peers
// that is used by Subscribe
// the global peerCount function tells how many connections each node has
// TODO: this is to be reimplemented with peerEvent watcher without global var
i := 0
for err := range waitPeerErrC {
if err != nil {
return fmt.Errorf("error waiting for peers: %s", err)
}
i++
if i == nodes {
break
}
}
// each node Subscribes to each other's swarmChunkServerStreamName
for j := 0; j < nodes-1; j++ {
id := sim.IDs[j]
sim.Stores[j] = stores[id]
err := sim.CallClient(id, func(client *rpc.Client) error {
// report disconnect events to the error channel cos peers should not disconnect
doneC, err := streamTesting.WatchDisconnections(id, client, errc, quitC)
id := nodeIDs[j]
client, err := sim.Net.GetNode(id).Client()
if err != nil {
return err
t.Fatal(err)
}
rpcSubscriptionsWg.Add(1)
go func() {
<-doneC
rpcSubscriptionsWg.Done()
}()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
// start syncing, i.e., subscribe to upstream peers po 1 bin
sid := sim.IDs[j+1]
return client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream("SYNC", FormatSyncBinKey(1), false), NewRange(0, 0), Top)
})
sid := nodeIDs[j+1]
client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream("SYNC", FormatSyncBinKey(1), false), NewRange(0, 0), Top)
if err != nil {
return err
}
if j > 0 || nodes == 2 {
item, ok := sim.NodeItem(nodeIDs[j], bucketKeyFileStore)
if !ok {
return fmt.Errorf("No filestore")
}
// here we distribute chunks of a random file into stores 1...nodes
rrFileStore := storage.NewFileStore(newRoundRobinStore(sim.Stores[1:]...), storage.NewFileStoreParams())
fileStore := item.(*storage.FileStore)
size := chunkCount * chunkSize
_, wait, err := rrFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
_, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
if err != nil {
t.Fatal(err.Error())
}
// need to wait cos we then immediately collect the relevant bin content
wait(ctx)
if err != nil {
t.Fatal(err.Error())
}
return nil
}
// this makes sure check is not called before the previous call finishes
check := func(ctx context.Context, id discover.NodeID) (bool, error) {
select {
case err := <-errc:
return false, err
case <-ctx.Done():
return false, ctx.Err()
default:
// here we distribute chunks of a random file into stores 1...nodes
if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
return err
}
i := nodeIndex[id]
// collect hashes in po 1 bin for each node
hashes := make([][]storage.Address, nodes)
totalHashes := 0
hashCounts := make([]int, nodes)
for i := nodes - 1; i >= 0; i-- {
if i < nodes-1 {
hashCounts[i] = hashCounts[i+1]
}
item, ok := sim.NodeItem(nodeIDs[i], bucketKeyDB)
if !ok {
return fmt.Errorf("No DB")
}
db := item.(*storage.DBAPI)
db.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool {
hashes[i] = append(hashes[i], addr)
totalHashes++
hashCounts[i]++
return true
})
}
var total, found int
for _, node := range nodeIDs {
i := nodeIndex[node]
for j := i; j < nodes; j++ {
total += len(hashes[j])
for _, key := range hashes[j] {
chunk, err := dbs[i].Get(ctx, key)
item, ok := sim.NodeItem(nodeIDs[j], bucketKeyDB)
if !ok {
return fmt.Errorf("No DB")
}
db := item.(*storage.DBAPI)
chunk, err := db.Get(ctx, key)
if err == storage.ErrFetching {
<-chunk.ReqC
} else if err != nil {
@ -242,26 +228,15 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
found++
}
}
log.Debug("sync check", "node", id, "index", i, "bin", po, "found", found, "total", total)
return total == found, nil
}
conf.Step = &simulations.Step{
Action: action,
Trigger: streamTesting.Trigger(500*time.Millisecond, quitC, sim.IDs[0:nodes-1]...),
Expect: &simulations.Expectation{
Nodes: sim.IDs[0:1],
Check: check,
},
log.Debug("sync check", "node", node, "index", i, "bin", po, "found", found, "total", total)
}
startedAt := time.Now()
result, err := sim.Run(ctx, conf)
finishedAt := time.Now()
if err != nil {
t.Fatalf("Setting up simulation failed: %v", err)
if total == found && total > 0 {
return nil
}
return fmt.Errorf("Total not equallying found: total is %d", total)
})
if result.Error != nil {
t.Fatalf("Simulation failed: %s", result.Error)
t.Fatal(result.Error)
}
streamTesting.CheckResult(t, result, startedAt, finishedAt)
}

@ -1,293 +0,0 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package testing
import (
"context"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"os"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/storage"
)
type Simulation struct {
Net *simulations.Network
Stores []storage.ChunkStore
Addrs []network.Addr
IDs []discover.NodeID
}
func SetStores(addrs ...network.Addr) ([]storage.ChunkStore, func(), error) {
var datadirs []string
stores := make([]storage.ChunkStore, len(addrs))
var err error
for i, addr := range addrs {
var datadir string
datadir, err = ioutil.TempDir("", "streamer")
if err != nil {
break
}
var store storage.ChunkStore
params := storage.NewDefaultLocalStoreParams()
params.Init(datadir)
params.BaseKey = addr.Over()
store, err = storage.NewTestLocalStoreForAddr(params)
if err != nil {
break
}
datadirs = append(datadirs, datadir)
stores[i] = store
}
teardown := func() {
for i, datadir := range datadirs {
stores[i].Close()
os.RemoveAll(datadir)
}
}
return stores, teardown, err
}
func NewAdapter(adapterType string, services adapters.Services) (adapter adapters.NodeAdapter, teardown func(), err error) {
teardown = func() {}
switch adapterType {
case "sim":
adapter = adapters.NewSimAdapter(services)
case "exec":
baseDir, err0 := ioutil.TempDir("", "swarm-test")
if err0 != nil {
return nil, teardown, err0
}
teardown = func() { os.RemoveAll(baseDir) }
adapter = adapters.NewExecAdapter(baseDir)
case "docker":
adapter, err = adapters.NewDockerAdapter()
if err != nil {
return nil, teardown, err
}
default:
return nil, teardown, errors.New("adapter needs to be one of sim, exec, docker")
}
return adapter, teardown, nil
}
func CheckResult(t *testing.T, result *simulations.StepResult, startedAt, finishedAt time.Time) {
t.Logf("Simulation passed in %s", result.FinishedAt.Sub(result.StartedAt))
if len(result.Passes) > 1 {
var min, max time.Duration
var sum int
for _, pass := range result.Passes {
duration := pass.Sub(result.StartedAt)
if sum == 0 || duration < min {
min = duration
}
if duration > max {
max = duration
}
sum += int(duration.Nanoseconds())
}
t.Logf("Min: %s, Max: %s, Average: %s", min, max, time.Duration(sum/len(result.Passes))*time.Nanosecond)
}
t.Logf("Setup: %s, Shutdown: %s", result.StartedAt.Sub(startedAt), finishedAt.Sub(result.FinishedAt))
}
type RunConfig struct {
Adapter string
Step *simulations.Step
NodeCount int
ConnLevel int
ToAddr func(discover.NodeID) *network.BzzAddr
Services adapters.Services
DefaultService string
EnableMsgEvents bool
}
func NewSimulation(conf *RunConfig) (*Simulation, func(), error) {
// create network
nodes := conf.NodeCount
adapter, adapterTeardown, err := NewAdapter(conf.Adapter, conf.Services)
if err != nil {
return nil, adapterTeardown, err
}
defaultService := "streamer"
if conf.DefaultService != "" {
defaultService = conf.DefaultService
}
net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{
ID: "0",
DefaultService: defaultService,
})
teardown := func() {
adapterTeardown()
net.Shutdown()
}
ids := make([]discover.NodeID, nodes)
addrs := make([]network.Addr, nodes)
// start nodes
for i := 0; i < nodes; i++ {
nodeconf := adapters.RandomNodeConfig()
nodeconf.EnableMsgEvents = conf.EnableMsgEvents
node, err := net.NewNodeWithConfig(nodeconf)
if err != nil {
return nil, teardown, fmt.Errorf("error creating node: %s", err)
}
ids[i] = node.ID()
addrs[i] = conf.ToAddr(ids[i])
}
// set nodes number of Stores available
stores, storeTeardown, err := SetStores(addrs...)
teardown = func() {
net.Shutdown()
adapterTeardown()
storeTeardown()
}
if err != nil {
return nil, teardown, err
}
s := &Simulation{
Net: net,
Stores: stores,
IDs: ids,
Addrs: addrs,
}
return s, teardown, nil
}
func (s *Simulation) Run(ctx context.Context, conf *RunConfig) (*simulations.StepResult, error) {
// bring up nodes, launch the servive
nodes := conf.NodeCount
conns := conf.ConnLevel
for i := 0; i < nodes; i++ {
if err := s.Net.Start(s.IDs[i]); err != nil {
return nil, fmt.Errorf("error starting node %s: %s", s.IDs[i].TerminalString(), err)
}
}
// run a simulation which connects the 10 nodes in a chain
wg := sync.WaitGroup{}
for i := range s.IDs {
// collect the overlay addresses, to
for j := 0; j < conns; j++ {
var k int
if j == 0 {
k = i - 1
} else {
k = rand.Intn(len(s.IDs))
}
if i > 0 {
wg.Add(1)
go func(i, k int) {
defer wg.Done()
s.Net.Connect(s.IDs[i], s.IDs[k])
}(i, k)
}
}
}
wg.Wait()
log.Info(fmt.Sprintf("simulation with %v nodes", len(s.Addrs)))
// create an only locally retrieving FileStore for the pivot node to test
// if retriee requests have arrived
result := simulations.NewSimulation(s.Net).Run(ctx, conf.Step)
return result, nil
}
// WatchDisconnections subscribes to admin peerEvents and sends peer event drop
// errors to the errc channel. Channel quitC signals the termination of the event loop.
// Returned doneC will be closed after the rpc subscription is unsubscribed,
// signaling that simulations network is safe to shutdown.
func WatchDisconnections(id discover.NodeID, client *rpc.Client, errc chan error, quitC chan struct{}) (doneC <-chan struct{}, err error) {
events := make(chan *p2p.PeerEvent)
sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents")
if err != nil {
return nil, fmt.Errorf("error getting peer events for node %v: %s", id, err)
}
c := make(chan struct{})
go func() {
defer func() {
log.Trace("watch disconnections: unsubscribe", "id", id)
sub.Unsubscribe()
close(c)
}()
for {
select {
case <-quitC:
return
case e := <-events:
if e.Type == p2p.PeerEventTypeDrop {
select {
case errc <- fmt.Errorf("peerEvent for node %v: %v", id, e):
case <-quitC:
return
}
}
case err := <-sub.Err():
if err != nil {
select {
case errc <- fmt.Errorf("error getting peer events for node %v: %v", id, err):
case <-quitC:
return
}
}
}
}
}()
return c, nil
}
func Trigger(d time.Duration, quitC chan struct{}, ids ...discover.NodeID) chan discover.NodeID {
trigger := make(chan discover.NodeID)
go func() {
defer close(trigger)
ticker := time.NewTicker(d)
defer ticker.Stop()
// we are only testing the pivot node (net.Nodes[0])
for range ticker.C {
for _, id := range ids {
select {
case trigger <- id:
case <-quitC:
return
}
}
}
}()
return trigger
}
func (sim *Simulation) CallClient(id discover.NodeID, f func(*rpc.Client) error) error {
node := sim.Net.GetNode(id)
if node == nil {
return fmt.Errorf("unknown node: %s", id)
}
client, err := node.Client()
if err != nil {
return fmt.Errorf("error getting node client: %s", err)
}
return f(client)
}
Loading…
Cancel
Save