diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 7b29626084..3b6e4a9465 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -26,16 +26,19 @@ import ( "math/rand" "os" "strings" + "sync" "sync/atomic" "time" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" p2ptest "github.com/ethereum/go-ethereum/p2p/testing" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" + mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem" "github.com/ethereum/go-ethereum/swarm/testutil" colorable "github.com/mattn/go-colorable" ) @@ -66,6 +69,80 @@ func init() { log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true)))) } +// newNetStoreAndDelivery is a default constructor for BzzAddr, NetStore and Delivery, used in Simulations +func newNetStoreAndDelivery(ctx *adapters.ServiceContext, bucket *sync.Map) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) { + addr := network.NewAddr(ctx.Config.Node()) + + netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr) + if err != nil { + return nil, nil, nil, nil, err + } + + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New + + return addr, netStore, delivery, cleanup, nil +} + +// newNetStoreAndDeliveryWithBzzAddr is a constructor for NetStore and Delivery, used in Simulations, accepting any BzzAddr +func newNetStoreAndDeliveryWithBzzAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) { + netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr) + if err != nil { + return nil, nil, nil, err + } + + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New + + return netStore, delivery, cleanup, nil +} + +// newNetStoreAndDeliveryWithRequestFunc is a constructor for NetStore and Delivery, used in Simulations, accepting any NetStore.RequestFunc +func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket *sync.Map, rf network.RequestFunc) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) { + addr := network.NewAddr(ctx.Config.Node()) + + netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr) + if err != nil { + return nil, nil, nil, nil, err + } + + netStore.NewNetFetcherFunc = network.NewFetcherFactory(rf, true).New + + return addr, netStore, delivery, cleanup, nil +} + +func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) { + n := ctx.Config.Node() + + store, datadir, err := createTestLocalStorageForID(n.ID(), addr) + if *useMockStore { + store, datadir, err = createMockStore(mockmem.NewGlobalStore(), n.ID(), addr) + } + if err != nil { + return nil, nil, nil, err + } + localStore := store.(*storage.LocalStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, nil, err + } + + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) + + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + delivery := NewDelivery(kad, netStore) + + bucket.Store(bucketKeyStore, store) + bucket.Store(bucketKeyDB, netStore) + bucket.Store(bucketKeyDelivery, delivery) + bucket.Store(bucketKeyFileStore, fileStore) + + cleanup := func() { + netStore.Close() + os.RemoveAll(datadir) + } + + return netStore, delivery, cleanup, nil +} + func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) { // setup addr := network.RandomAddr() // tested peers peer address diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 6f1ddc6590..cb7690f3ef 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -21,7 +21,6 @@ import ( "context" "errors" "fmt" - "os" "sync" "sync/atomic" "testing" @@ -457,27 +456,11 @@ func TestDeliveryFromNodes(t *testing.T) { func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) { sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - node := ctx.Config.Node() - addr := network.NewAddr(node) - store, datadir, err := createTestLocalStorageForID(node.ID(), addr) - if err != nil { - return nil, nil, err - } - bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) if err != nil { return nil, nil, err } - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, Syncing: SyncingDisabled, @@ -485,11 +468,12 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) }, nil) bucket.Store(bucketKeyRegistry, r) - fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) - bucket.Store(bucketKeyFileStore, fileStore) + cleanup = func() { + r.Close() + clean() + } return r, cleanup, nil - }, }) defer sim.Close() @@ -644,25 +628,10 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) { func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) { sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - node := ctx.Config.Node() - addr := network.NewAddr(node) - store, datadir, err := createTestLocalStorageForID(node.ID(), addr) + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) if err != nil { return nil, nil, err } - bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, @@ -670,12 +639,14 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b Retrieval: RetrievalDisabled, SyncUpdateDelay: 0, }, nil) + bucket.Store(bucketKeyRegistry, r) - fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) - bucket.Store(bucketKeyFileStore, fileStore) + cleanup = func() { + r.Close() + clean() + } return r, cleanup, nil - }, }) defer sim.Close() diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index 8f2bed9d64..248ba0c846 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -21,7 +21,6 @@ import ( "encoding/binary" "errors" "fmt" - "os" "sync" "sync/atomic" "testing" @@ -31,7 +30,6 @@ import ( "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" - "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" @@ -62,26 +60,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { externalStreamMaxKeys := uint64(100) sim := simulation.New(map[string]simulation.ServiceFunc{ - "intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - n := ctx.Config.Node() - addr := network.NewAddr(n) - store, datadir, err := createTestLocalStorageForID(n.ID(), addr) + "intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Service, func(), error) { + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) if err != nil { return nil, nil, err } - bucket.Store(bucketKeyStore, store) - cleanup = func() { - store.Close() - os.RemoveAll(datadir) - } - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ Retrieval: RetrievalDisabled, @@ -97,11 +80,12 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil }) - fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams()) - bucket.Store(bucketKeyFileStore, fileStore) + cleanup := func() { + r.Close() + clean() + } return r, cleanup, nil - }, }) defer sim.Close() diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index d345ac8d02..f097e4180a 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -18,7 +18,6 @@ package stream import ( "context" "fmt" - "os" "sync" "testing" "time" @@ -27,7 +26,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/swarm/log" - "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" @@ -105,43 +103,25 @@ func TestRetrieval(t *testing.T) { } var retrievalSimServiceMap = map[string]simulation.ServiceFunc{ - "streamer": retrievalStreamerFunc, -} - -func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - n := ctx.Config.Node() - addr := network.NewAddr(n) - store, datadir, err := createTestLocalStorageForID(n.ID(), addr) - if err != nil { - return nil, nil, err - } - bucket.Store(bucketKeyStore, store) - - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - - r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalEnabled, - Syncing: SyncingAutoSubscribe, - SyncUpdateDelay: 3 * time.Second, - }, nil) + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) + if err != nil { + return nil, nil, err + } - fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) - bucket.Store(bucketKeyFileStore, fileStore) + r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + Retrieval: RetrievalEnabled, + Syncing: SyncingAutoSubscribe, + SyncUpdateDelay: 3 * time.Second, + }, nil) - cleanup = func() { - os.RemoveAll(datadir) - netStore.Close() - r.Close() - } + cleanup = func() { + r.Close() + clean() + } - return r, cleanup, nil + return r, cleanup, nil + }, } /* diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 6af19c12ab..c32ed7d077 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -107,42 +107,27 @@ func TestSyncingViaGlobalSync(t *testing.T) { } var simServiceMap = map[string]simulation.ServiceFunc{ - "streamer": streamerFunc, -} + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers) + if err != nil { + return nil, nil, err + } -func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - n := ctx.Config.Node() - addr := network.NewAddr(n) - store, datadir, err := createTestLocalStorageForID(n.ID(), addr) - if err != nil { - return nil, nil, err - } - bucket.Store(bucketKeyStore, store) - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New - - r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingAutoSubscribe, - SyncUpdateDelay: 3 * time.Second, - }, nil) - - bucket.Store(bucketKeyRegistry, r) - - cleanup = func() { - os.RemoveAll(datadir) - netStore.Close() - r.Close() - } + r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + Retrieval: RetrievalDisabled, + Syncing: SyncingAutoSubscribe, + SyncUpdateDelay: 3 * time.Second, + }, nil) - return r, cleanup, nil + bucket.Store(bucketKeyRegistry, r) + + cleanup = func() { + r.Close() + clean() + } + return r, cleanup, nil + }, } func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index b83521f060..c2aee61b73 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -21,7 +21,6 @@ import ( "context" "errors" "fmt" - "os" "strconv" "strings" "sync" @@ -37,7 +36,6 @@ import ( "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" - "github.com/ethereum/go-ethereum/swarm/storage" "golang.org/x/crypto/sha3" ) @@ -1209,26 +1207,18 @@ func TestGetSubscriptionsRPC(t *testing.T) { // create a standard sim sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - n := ctx.Config.Node() - addr := network.NewAddr(n) - store, datadir, err := createTestLocalStorageForID(n.ID(), addr) + addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers) if err != nil { return nil, nil, err } - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New + // configure so that sync registrations actually happen r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ Retrieval: RetrievalEnabled, Syncing: SyncingAutoSubscribe, //enable sync registrations SyncUpdateDelay: syncUpdateDelay, }, nil) + // get the SubscribeMsg code subscribeMsgCode, ok = r.GetSpec().GetCode(SubscribeMsg{}) if !ok { @@ -1236,13 +1226,11 @@ func TestGetSubscriptionsRPC(t *testing.T) { } cleanup = func() { - os.RemoveAll(datadir) - netStore.Close() r.Close() + clean() } return r, cleanup, nil - }, }) defer sim.Close() @@ -1322,9 +1310,9 @@ func TestGetSubscriptionsRPC(t *testing.T) { t.Fatal(err) } //length of the subscriptions can not be smaller than number of peers - log.Debug("node subscriptions:", "node", node.String()) + log.Debug("node subscriptions", "node", node.String()) for p, ps := range pstreams { - log.Debug("... with: ", "peer", p) + log.Debug("... with", "peer", p) for _, s := range ps { log.Debug(".......", "stream", s) // each node also has subscriptions to RETRIEVE_REQUEST streams, diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index 014ec9a982..5656963d9c 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -22,7 +22,6 @@ import ( "fmt" "io/ioutil" "math" - "os" "sync" "sync/atomic" "testing" @@ -38,7 +37,6 @@ import ( "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/mock" - mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem" "github.com/ethereum/go-ethereum/swarm/testutil" ) @@ -73,38 +71,14 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - var store storage.ChunkStore - var datadir string - - node := ctx.Config.Node() - addr := network.NewAddr(node) + addr := network.NewAddr(ctx.Config.Node()) //hack to put addresses in same space addr.OAddr[0] = byte(0) - if *useMockStore { - store, datadir, err = createMockStore(mockmem.NewGlobalStore(), node.ID(), addr) - } else { - store, datadir, err = createTestLocalStorageForID(node.ID(), addr) - } - if err != nil { - return nil, nil, err - } - bucket.Store(bucketKeyStore, store) - cleanup = func() { - store.Close() - os.RemoveAll(datadir) - } - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) + netStore, delivery, clean, err := newNetStoreAndDeliveryWithBzzAddr(ctx, bucket, addr) if err != nil { return nil, nil, err } - bucket.Store(bucketKeyDB, netStore) - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - - bucket.Store(bucketKeyDelivery, delivery) r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ Retrieval: RetrievalDisabled, @@ -112,11 +86,12 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p SkipCheck: skipCheck, }, nil) - fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) - bucket.Store(bucketKeyFileStore, fileStore) + cleanup = func() { + r.Close() + clean() + } return r, cleanup, nil - }, }) defer sim.Close() @@ -251,44 +226,26 @@ func TestSameVersionID(t *testing.T) { v := uint(1) sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - var store storage.ChunkStore - var datadir string - - node := ctx.Config.Node() - addr := network.NewAddr(node) - - store, datadir, err = createTestLocalStorageForID(node.ID(), addr) + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) if err != nil { return nil, nil, err } - bucket.Store(bucketKeyStore, store) - cleanup = func() { - store.Close() - os.RemoveAll(datadir) - } - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - bucket.Store(bucketKeyDB, netStore) - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - - bucket.Store(bucketKeyDelivery, delivery) r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ Retrieval: RetrievalDisabled, Syncing: SyncingAutoSubscribe, }, nil) + bucket.Store(bucketKeyRegistry, r) + //assign to each node the same version ID r.spec.Version = v - bucket.Store(bucketKeyRegistry, r) + cleanup = func() { + r.Close() + clean() + } return r, cleanup, nil - }, }) defer sim.Close() @@ -333,46 +290,27 @@ func TestDifferentVersionID(t *testing.T) { v := uint(0) sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - var store storage.ChunkStore - var datadir string - - node := ctx.Config.Node() - addr := network.NewAddr(node) - - store, datadir, err = createTestLocalStorageForID(node.ID(), addr) + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) if err != nil { return nil, nil, err } - bucket.Store(bucketKeyStore, store) - cleanup = func() { - store.Close() - os.RemoveAll(datadir) - } - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - bucket.Store(bucketKeyDB, netStore) - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - - bucket.Store(bucketKeyDelivery, delivery) r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ Retrieval: RetrievalDisabled, Syncing: SyncingAutoSubscribe, }, nil) + bucket.Store(bucketKeyRegistry, r) //increase the version ID for each node v++ r.spec.Version = v - bucket.Store(bucketKeyRegistry, r) + cleanup = func() { + r.Close() + clean() + } return r, cleanup, nil - }, }) defer sim.Close() diff --git a/swarm/network/stream/testing/snapshot_4.json b/swarm/network/stream/testing/snapshot_4.json index a64f31375a..a8b6174074 100644 --- a/swarm/network/stream/testing/snapshot_4.json +++ b/swarm/network/stream/testing/snapshot_4.json @@ -1 +1 @@ -{"nodes":[{"node":{"config":{"id":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","private_key":"e567b7d9c554e5102cdc99b6523bace02dbb8951415c8816d82ba2d2e97fa23b","name":"node01","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","private_key":"c7526db70acd02f36d3b201ef3e1d85e38c52bee6931453213dbc5edec4d0976","name":"node02","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","private_key":"61b5728f59bc43080c3b8eb0458fb30d7723e2747355b6dc980f35f3ed431199","name":"node03","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","private_key":"075b07c29ceac4ffa2a114afd67b21dfc438126bc169bf7c154be6d81d86ed38","name":"node04","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}}],"conns":[{"one":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","other":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","up":true},{"one":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","other":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","up":true},{"one":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","other":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","up":true}]} \ No newline at end of file +{"nodes":[{"node":{"config":{"id":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","private_key":"e567b7d9c554e5102cdc99b6523bace02dbb8951415c8816d82ba2d2e97fa23b","name":"node01","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","private_key":"c7526db70acd02f36d3b201ef3e1d85e38c52bee6931453213dbc5edec4d0976","name":"node02","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","private_key":"61b5728f59bc43080c3b8eb0458fb30d7723e2747355b6dc980f35f3ed431199","name":"node03","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","private_key":"075b07c29ceac4ffa2a114afd67b21dfc438126bc169bf7c154be6d81d86ed38","name":"node04","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}}],"conns":[{"one":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","other":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","up":true},{"one":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","other":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","up":true},{"one":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","other":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","up":true}]} diff --git a/swarm/network/stream/visualized_snapshot_sync_sim_test.go b/swarm/network/stream/visualized_snapshot_sync_sim_test.go index 3b7d0d7435..3694dd3117 100644 --- a/swarm/network/stream/visualized_snapshot_sync_sim_test.go +++ b/swarm/network/stream/visualized_snapshot_sync_sim_test.go @@ -24,7 +24,6 @@ import ( "errors" "fmt" "io" - "os" "sync" "testing" "time" @@ -37,7 +36,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/swarm/log" - "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" @@ -169,21 +167,10 @@ func TestSnapshotSyncWithServer(t *testing.T) { sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - n := ctx.Config.Node() - addr := network.NewAddr(n) - store, datadir, err := createTestLocalStorageForID(n.ID(), addr) + addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers) if err != nil { return nil, nil, err } - bucket.Store(bucketKeyStore, store) - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ Retrieval: RetrievalDisabled, @@ -199,9 +186,8 @@ func TestSnapshotSyncWithServer(t *testing.T) { bucket.Store(bucketKeyRegistry, tr) cleanup = func() { - netStore.Close() tr.Close() - os.RemoveAll(datadir) + clean() } return tr, cleanup, nil