cmd/swarm/swarm-smoke: refactor generateEndpoints (#19006)

(cherry picked from commit d212535ddd)
pull/19029/head
Anton Evangelatov 6 years ago committed by Rafael Matias
parent 355d55bd34
commit 637a75d61a
No known key found for this signature in database
GPG Key ID: 1BC39532FB4A2DBD
  1. 67
      cmd/swarm/swarm-smoke/feed_upload_and_sync.go
  2. 70
      cmd/swarm/swarm-smoke/main.go
  3. 56
      cmd/swarm/swarm-smoke/sliding_window.go
  4. 71
      cmd/swarm/swarm-smoke/upload_and_sync.go
  5. 43
      cmd/swarm/swarm-smoke/upload_speed.go
  6. 98
      cmd/swarm/swarm-smoke/util.go

@ -3,7 +3,6 @@ package main
import ( import (
"bytes" "bytes"
"crypto/md5" "crypto/md5"
crand "crypto/rand"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -16,7 +15,9 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/ethereum/go-ethereum/swarm/storage/feed"
"github.com/ethereum/go-ethereum/swarm/testutil"
"github.com/pborman/uuid" "github.com/pborman/uuid"
cli "gopkg.in/urfave/cli.v1" cli "gopkg.in/urfave/cli.v1"
) )
@ -25,13 +26,28 @@ const (
feedRandomDataLength = 8 feedRandomDataLength = 8
) )
// TODO: retrieve with manifest + extract repeating code func feedUploadAndSyncCmd(ctx *cli.Context, tuid string) error {
func feedUploadAndSync(c *cli.Context) error { errc := make(chan error)
defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "size (kb)", filesize) }(time.Now())
generateEndpoints(scheme, cluster, appName, from, to) go func() {
errc <- feedUploadAndSync(ctx, tuid)
}()
log.Info("generating and uploading feeds to " + endpoints[0] + " and syncing") select {
case err := <-errc:
if err != nil {
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1)
}
return err
case <-time.After(time.Duration(timeout) * time.Second):
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1)
return fmt.Errorf("timeout after %v sec", timeout)
}
}
func feedUploadAndSync(c *cli.Context, tuid string) error {
log.Info("generating and uploading feeds to " + httpEndpoint(hosts[0]) + " and syncing")
// create a random private key to sign updates with and derive the address // create a random private key to sign updates with and derive the address
pkFile, err := ioutil.TempFile("", "swarm-feed-smoke-test") pkFile, err := ioutil.TempFile("", "swarm-feed-smoke-test")
@ -85,7 +101,7 @@ func feedUploadAndSync(c *cli.Context) error {
// create feed manifest, topic only // create feed manifest, topic only
var out bytes.Buffer var out bytes.Buffer
cmd := exec.Command("swarm", "--bzzapi", endpoints[0], "feed", "create", "--topic", topicHex, "--user", userHex) cmd := exec.Command("swarm", "--bzzapi", httpEndpoint(hosts[0]), "feed", "create", "--topic", topicHex, "--user", userHex)
cmd.Stdout = &out cmd.Stdout = &out
log.Debug("create feed manifest topic cmd", "cmd", cmd) log.Debug("create feed manifest topic cmd", "cmd", cmd)
err = cmd.Run() err = cmd.Run()
@ -100,7 +116,7 @@ func feedUploadAndSync(c *cli.Context) error {
out.Reset() out.Reset()
// create feed manifest, subtopic only // create feed manifest, subtopic only
cmd = exec.Command("swarm", "--bzzapi", endpoints[0], "feed", "create", "--name", subTopicHex, "--user", userHex) cmd = exec.Command("swarm", "--bzzapi", httpEndpoint(hosts[0]), "feed", "create", "--name", subTopicHex, "--user", userHex)
cmd.Stdout = &out cmd.Stdout = &out
log.Debug("create feed manifest subtopic cmd", "cmd", cmd) log.Debug("create feed manifest subtopic cmd", "cmd", cmd)
err = cmd.Run() err = cmd.Run()
@ -115,7 +131,7 @@ func feedUploadAndSync(c *cli.Context) error {
out.Reset() out.Reset()
// create feed manifest, merged topic // create feed manifest, merged topic
cmd = exec.Command("swarm", "--bzzapi", endpoints[0], "feed", "create", "--topic", topicHex, "--name", subTopicHex, "--user", userHex) cmd = exec.Command("swarm", "--bzzapi", httpEndpoint(hosts[0]), "feed", "create", "--topic", topicHex, "--name", subTopicHex, "--user", userHex)
cmd.Stdout = &out cmd.Stdout = &out
log.Debug("create feed manifest mergetopic cmd", "cmd", cmd) log.Debug("create feed manifest mergetopic cmd", "cmd", cmd)
err = cmd.Run() err = cmd.Run()
@ -141,7 +157,7 @@ func feedUploadAndSync(c *cli.Context) error {
dataHex := hexutil.Encode(data) dataHex := hexutil.Encode(data)
// update with topic // update with topic
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, dataHex) cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, dataHex)
cmd.Stdout = &out cmd.Stdout = &out
log.Debug("update feed manifest topic cmd", "cmd", cmd) log.Debug("update feed manifest topic cmd", "cmd", cmd)
err = cmd.Run() err = cmd.Run()
@ -152,7 +168,7 @@ func feedUploadAndSync(c *cli.Context) error {
out.Reset() out.Reset()
// update with subtopic // update with subtopic
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--name", subTopicHex, dataHex) cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--name", subTopicHex, dataHex)
cmd.Stdout = &out cmd.Stdout = &out
log.Debug("update feed manifest subtopic cmd", "cmd", cmd) log.Debug("update feed manifest subtopic cmd", "cmd", cmd)
err = cmd.Run() err = cmd.Run()
@ -163,7 +179,7 @@ func feedUploadAndSync(c *cli.Context) error {
out.Reset() out.Reset()
// update with merged topic // update with merged topic
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, "--name", subTopicHex, dataHex) cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, "--name", subTopicHex, dataHex)
cmd.Stdout = &out cmd.Stdout = &out
log.Debug("update feed manifest merged topic cmd", "cmd", cmd) log.Debug("update feed manifest merged topic cmd", "cmd", cmd)
err = cmd.Run() err = cmd.Run()
@ -177,14 +193,14 @@ func feedUploadAndSync(c *cli.Context) error {
// retrieve the data // retrieve the data
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for _, endpoint := range endpoints { for _, host := range hosts {
// raw retrieve, topic only // raw retrieve, topic only
for _, hex := range []string{topicHex, subTopicOnlyHex, mergedSubTopicHex} { for _, hex := range []string{topicHex, subTopicOnlyHex, mergedSubTopicHex} {
wg.Add(1) wg.Add(1)
ruid := uuid.New()[:8] ruid := uuid.New()[:8]
go func(hex string, endpoint string, ruid string) { go func(hex string, endpoint string, ruid string) {
for { for {
err := fetchFeed(hex, userHex, endpoint, dataHash, ruid) err := fetchFeed(hex, userHex, httpEndpoint(host), dataHash, ruid)
if err != nil { if err != nil {
continue continue
} }
@ -192,21 +208,18 @@ func feedUploadAndSync(c *cli.Context) error {
wg.Done() wg.Done()
return return
} }
}(hex, endpoint, ruid) }(hex, httpEndpoint(host), ruid)
} }
} }
wg.Wait() wg.Wait()
log.Info("all endpoints synced random data successfully") log.Info("all endpoints synced random data successfully")
// upload test file // upload test file
seed := int(time.Now().UnixNano() / 1e6) log.Info("feed uploading to "+httpEndpoint(hosts[0])+" and syncing", "seed", seed)
log.Info("feed uploading to "+endpoints[0]+" and syncing", "seed", seed)
h = md5.New() randomBytes := testutil.RandomBytes(seed, filesize*1000)
r := io.TeeReader(io.LimitReader(crand.Reader, int64(filesize*1000)), h)
hash, err := upload(r, filesize*1000, endpoints[0]) hash, err := upload(randomBytes, httpEndpoint(hosts[0]))
if err != nil { if err != nil {
return err return err
} }
@ -220,7 +233,7 @@ func feedUploadAndSync(c *cli.Context) error {
log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fileHash)) log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fileHash))
// update file with topic // update file with topic
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, multihashHex) cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, multihashHex)
cmd.Stdout = &out cmd.Stdout = &out
err = cmd.Run() err = cmd.Run()
if err != nil { if err != nil {
@ -230,7 +243,7 @@ func feedUploadAndSync(c *cli.Context) error {
out.Reset() out.Reset()
// update file with subtopic // update file with subtopic
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--name", subTopicHex, multihashHex) cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--name", subTopicHex, multihashHex)
cmd.Stdout = &out cmd.Stdout = &out
err = cmd.Run() err = cmd.Run()
if err != nil { if err != nil {
@ -240,7 +253,7 @@ func feedUploadAndSync(c *cli.Context) error {
out.Reset() out.Reset()
// update file with merged topic // update file with merged topic
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, "--name", subTopicHex, multihashHex) cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, "--name", subTopicHex, multihashHex)
cmd.Stdout = &out cmd.Stdout = &out
err = cmd.Run() err = cmd.Run()
if err != nil { if err != nil {
@ -251,7 +264,7 @@ func feedUploadAndSync(c *cli.Context) error {
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
for _, endpoint := range endpoints { for _, host := range hosts {
// manifest retrieve, topic only // manifest retrieve, topic only
for _, url := range []string{manifestWithTopic, manifestWithSubTopic, manifestWithMergedTopic} { for _, url := range []string{manifestWithTopic, manifestWithSubTopic, manifestWithMergedTopic} {
@ -259,7 +272,7 @@ func feedUploadAndSync(c *cli.Context) error {
ruid := uuid.New()[:8] ruid := uuid.New()[:8]
go func(url string, endpoint string, ruid string) { go func(url string, endpoint string, ruid string) {
for { for {
err := fetch(url, endpoint, fileHash, ruid) err := fetch(url, endpoint, fileHash, ruid, "")
if err != nil { if err != nil {
continue continue
} }
@ -267,7 +280,7 @@ func feedUploadAndSync(c *cli.Context) error {
wg.Done() wg.Done()
return return
} }
}(url, endpoint, ruid) }(url, httpEndpoint(host), ruid)
} }
} }

@ -37,18 +37,15 @@ var (
) )
var ( var (
endpoints []string allhosts string
includeLocalhost bool hosts []string
cluster string filesize int
appName string syncDelay int
scheme string httpPort int
filesize int wsPort int
syncDelay int verbosity int
from int timeout int
to int single bool
verbosity int
timeout int
single bool
) )
func main() { func main() {
@ -59,39 +56,22 @@ func main() {
app.Flags = []cli.Flag{ app.Flags = []cli.Flag{
cli.StringFlag{ cli.StringFlag{
Name: "cluster-endpoint", Name: "hosts",
Value: "prod", Value: "",
Usage: "cluster to point to (prod or a given namespace)", Usage: "comma-separated list of swarm hosts",
Destination: &cluster, Destination: &allhosts,
},
cli.StringFlag{
Name: "app",
Value: "swarm",
Usage: "application to point to (swarm or swarm-private)",
Destination: &appName,
}, },
cli.IntFlag{ cli.IntFlag{
Name: "cluster-from", Name: "http-port",
Value: 8501, Value: 80,
Usage: "swarm node (from)", Usage: "http port",
Destination: &from, Destination: &httpPort,
}, },
cli.IntFlag{ cli.IntFlag{
Name: "cluster-to", Name: "ws-port",
Value: 8512, Value: 8546,
Usage: "swarm node (to)", Usage: "ws port",
Destination: &to, Destination: &wsPort,
},
cli.StringFlag{
Name: "cluster-scheme",
Value: "http",
Usage: "http or https",
Destination: &scheme,
},
cli.BoolFlag{
Name: "include-localhost",
Usage: "whether to include localhost:8500 as an endpoint",
Destination: &includeLocalhost,
}, },
cli.IntFlag{ cli.IntFlag{
Name: "filesize", Name: "filesize",
@ -140,25 +120,25 @@ func main() {
Name: "upload_and_sync", Name: "upload_and_sync",
Aliases: []string{"c"}, Aliases: []string{"c"},
Usage: "upload and sync", Usage: "upload and sync",
Action: wrapCliCommand("upload-and-sync", true, uploadAndSync), Action: wrapCliCommand("upload-and-sync", uploadAndSyncCmd),
}, },
{ {
Name: "feed_sync", Name: "feed_sync",
Aliases: []string{"f"}, Aliases: []string{"f"},
Usage: "feed update generate, upload and sync", Usage: "feed update generate, upload and sync",
Action: wrapCliCommand("feed-and-sync", true, feedUploadAndSync), Action: wrapCliCommand("feed-and-sync", feedUploadAndSyncCmd),
}, },
{ {
Name: "upload_speed", Name: "upload_speed",
Aliases: []string{"u"}, Aliases: []string{"u"},
Usage: "measure upload speed", Usage: "measure upload speed",
Action: wrapCliCommand("upload-speed", true, uploadSpeed), Action: wrapCliCommand("upload-speed", uploadSpeedCmd),
}, },
{ {
Name: "sliding_window", Name: "sliding_window",
Aliases: []string{"s"}, Aliases: []string{"s"},
Usage: "measure network aggregate capacity", Usage: "measure network aggregate capacity",
Action: wrapCliCommand("sliding-window", false, slidingWindow), Action: wrapCliCommand("sliding-window", slidingWindowCmd),
}, },
} }

@ -17,50 +17,62 @@
package main package main
import ( import (
"crypto/md5" "bytes"
crand "crypto/rand"
"fmt" "fmt"
"io"
"math/rand" "math/rand"
"time" "time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/testutil"
"github.com/pborman/uuid" "github.com/pborman/uuid"
cli "gopkg.in/urfave/cli.v1" cli "gopkg.in/urfave/cli.v1"
) )
var seed = time.Now().UTC().UnixNano()
func init() {
rand.Seed(seed)
}
type uploadResult struct { type uploadResult struct {
hash string hash string
digest []byte digest []byte
} }
func slidingWindow(c *cli.Context) error { func slidingWindowCmd(ctx *cli.Context, tuid string) error {
generateEndpoints(scheme, cluster, appName, from, to) errc := make(chan error)
go func() {
errc <- slidingWindow(ctx, tuid)
}()
select {
case err := <-errc:
if err != nil {
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1)
}
return err
case <-time.After(time.Duration(timeout) * time.Second):
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1)
return fmt.Errorf("timeout after %v sec", timeout)
}
}
func slidingWindow(ctx *cli.Context, tuid string) error {
hashes := []uploadResult{} //swarm hashes of the uploads hashes := []uploadResult{} //swarm hashes of the uploads
nodes := to - from nodes := len(hosts)
const iterationTimeout = 30 * time.Second const iterationTimeout = 30 * time.Second
log.Info("sliding window test started", "nodes", nodes, "filesize(kb)", filesize, "timeout", timeout) log.Info("sliding window test started", "tuid", tuid, "nodes", nodes, "filesize(kb)", filesize, "timeout", timeout)
uploadedBytes := 0 uploadedBytes := 0
networkDepth := 0 networkDepth := 0
errored := false errored := false
outer: outer:
for { for {
log.Info("uploading to "+endpoints[0]+" and syncing", "seed", seed) log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "seed", seed)
h := md5.New()
r := io.TeeReader(io.LimitReader(crand.Reader, int64(filesize*1000)), h)
t1 := time.Now() t1 := time.Now()
hash, err := upload(r, filesize*1000, endpoints[0]) randomBytes := testutil.RandomBytes(seed, filesize*1000)
hash, err := upload(randomBytes, httpEndpoint(hosts[0]))
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
return err return err
@ -68,7 +80,11 @@ outer:
metrics.GetOrRegisterResettingTimer("sliding-window.upload-time", nil).UpdateSince(t1) metrics.GetOrRegisterResettingTimer("sliding-window.upload-time", nil).UpdateSince(t1)
fhash := h.Sum(nil) fhash, err := digest(bytes.NewReader(randomBytes))
if err != nil {
log.Error(err.Error())
return err
}
log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash), "sleeping", syncDelay) log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash), "sleeping", syncDelay)
hashes = append(hashes, uploadResult{hash: hash, digest: fhash}) hashes = append(hashes, uploadResult{hash: hash, digest: fhash})
@ -88,10 +104,10 @@ outer:
metrics.GetOrRegisterCounter("sliding-window.single.error", nil).Inc(1) metrics.GetOrRegisterCounter("sliding-window.single.error", nil).Inc(1)
break inner break inner
default: default:
randIndex := 1 + rand.Intn(len(endpoints)-1) idx := 1 + rand.Intn(len(hosts)-1)
ruid := uuid.New()[:8] ruid := uuid.New()[:8]
start := time.Now() start := time.Now()
err := fetch(v.hash, endpoints[randIndex], v.digest, ruid) err := fetch(v.hash, httpEndpoint(hosts[idx]), v.digest, ruid, "")
if err != nil { if err != nil {
continue inner continue inner
} }

@ -17,84 +17,109 @@
package main package main
import ( import (
"crypto/md5" "bytes"
crand "crypto/rand"
"fmt" "fmt"
"io"
"math/rand" "math/rand"
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/testutil"
"github.com/pborman/uuid" "github.com/pborman/uuid"
cli "gopkg.in/urfave/cli.v1" cli "gopkg.in/urfave/cli.v1"
) )
func uploadAndSync(c *cli.Context) error { func uploadAndSyncCmd(ctx *cli.Context, tuid string) error {
generateEndpoints(scheme, cluster, appName, from, to) randomBytes := testutil.RandomBytes(seed, filesize*1000)
seed := int(time.Now().UnixNano() / 1e6)
log.Info("uploading to "+endpoints[0]+" and syncing", "seed", seed) errc := make(chan error)
h := md5.New() go func() {
r := io.TeeReader(io.LimitReader(crand.Reader, int64(filesize*1000)), h) errc <- uplaodAndSync(ctx, randomBytes, tuid)
}()
select {
case err := <-errc:
if err != nil {
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1)
}
return err
case <-time.After(time.Duration(timeout) * time.Second):
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1)
// trigger debug functionality on randomBytes
return fmt.Errorf("timeout after %v sec", timeout)
}
}
func uplaodAndSync(c *cli.Context, randomBytes []byte, tuid string) error {
log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "tuid", tuid, "seed", seed)
t1 := time.Now() t1 := time.Now()
hash, err := upload(r, filesize*1000, endpoints[0]) hash, err := upload(randomBytes, httpEndpoint(hosts[0]))
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
return err return err
} }
metrics.GetOrRegisterResettingTimer("upload-and-sync.upload-time", nil).UpdateSince(t1) t2 := time.Since(t1)
metrics.GetOrRegisterResettingTimer("upload-and-sync.upload-time", nil).Update(t2)
fhash := h.Sum(nil) fhash, err := digest(bytes.NewReader(randomBytes))
if err != nil {
log.Error(err.Error())
return err
}
log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash)) log.Info("uploaded successfully", "tuid", tuid, "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash))
time.Sleep(time.Duration(syncDelay) * time.Second) time.Sleep(time.Duration(syncDelay) * time.Second)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
if single { if single {
rand.Seed(time.Now().UTC().UnixNano()) randIndex := 1 + rand.Intn(len(hosts)-1)
randIndex := 1 + rand.Intn(len(endpoints)-1)
ruid := uuid.New()[:8] ruid := uuid.New()[:8]
wg.Add(1) wg.Add(1)
go func(endpoint string, ruid string) { go func(endpoint string, ruid string) {
for { for {
start := time.Now() start := time.Now()
err := fetch(hash, endpoint, fhash, ruid) err := fetch(hash, endpoint, fhash, ruid, tuid)
if err != nil { if err != nil {
continue continue
} }
ended := time.Since(start)
metrics.GetOrRegisterResettingTimer("upload-and-sync.single.fetch-time", nil).UpdateSince(start) metrics.GetOrRegisterResettingTimer("upload-and-sync.single.fetch-time", nil).Update(ended)
log.Info("fetch successful", "tuid", tuid, "ruid", ruid, "took", ended, "endpoint", endpoint)
wg.Done() wg.Done()
return return
} }
}(endpoints[randIndex], ruid) }(httpEndpoint(hosts[randIndex]), ruid)
} else { } else {
for _, endpoint := range endpoints[1:] { for _, endpoint := range hosts[1:] {
ruid := uuid.New()[:8] ruid := uuid.New()[:8]
wg.Add(1) wg.Add(1)
go func(endpoint string, ruid string) { go func(endpoint string, ruid string) {
for { for {
start := time.Now() start := time.Now()
err := fetch(hash, endpoint, fhash, ruid) err := fetch(hash, endpoint, fhash, ruid, tuid)
if err != nil { if err != nil {
continue continue
} }
ended := time.Since(start)
metrics.GetOrRegisterResettingTimer("upload-and-sync.each.fetch-time", nil).UpdateSince(start) metrics.GetOrRegisterResettingTimer("upload-and-sync.each.fetch-time", nil).Update(ended)
log.Info("fetch successful", "tuid", tuid, "ruid", ruid, "took", ended, "endpoint", endpoint)
wg.Done() wg.Done()
return return
} }
}(endpoint, ruid) }(httpEndpoint(endpoint), ruid)
} }
} }
wg.Wait() wg.Wait()
log.Info("all endpoints synced random file successfully") log.Info("all hosts synced random file successfully")
return nil return nil
} }

@ -17,35 +17,56 @@
package main package main
import ( import (
"crypto/md5" "bytes"
crand "crypto/rand"
"fmt" "fmt"
"io"
"time" "time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/testutil"
cli "gopkg.in/urfave/cli.v1" cli "gopkg.in/urfave/cli.v1"
) )
func uploadSpeed(c *cli.Context) error { func uploadSpeedCmd(ctx *cli.Context, tuid string) error {
endpoint := generateEndpoint(scheme, cluster, appName, from) log.Info("uploading to "+hosts[0], "tuid", tuid, "seed", seed)
seed := int(time.Now().UnixNano() / 1e6) randomBytes := testutil.RandomBytes(seed, filesize*1000)
log.Info("uploading to "+endpoint, "seed", seed)
h := md5.New() errc := make(chan error)
r := io.TeeReader(io.LimitReader(crand.Reader, int64(filesize*1000)), h)
go func() {
errc <- uploadSpeed(ctx, tuid, randomBytes)
}()
select {
case err := <-errc:
if err != nil {
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1)
}
return err
case <-time.After(time.Duration(timeout) * time.Second):
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1)
// trigger debug functionality on randomBytes
return fmt.Errorf("timeout after %v sec", timeout)
}
}
func uploadSpeed(c *cli.Context, tuid string, data []byte) error {
t1 := time.Now() t1 := time.Now()
hash, err := upload(r, filesize*1000, endpoint) hash, err := upload(data, hosts[0])
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
return err return err
} }
metrics.GetOrRegisterCounter("upload-speed.upload-time", nil).Inc(int64(time.Since(t1))) metrics.GetOrRegisterCounter("upload-speed.upload-time", nil).Inc(int64(time.Since(t1)))
fhash := h.Sum(nil) fhash, err := digest(bytes.NewReader(data))
if err != nil {
log.Error(err.Error())
return err
}
log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash)) log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash))
return nil return nil

@ -25,9 +25,11 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"math/rand"
"net/http" "net/http"
"net/http/httptrace" "net/http/httptrace"
"os" "os"
"strings"
"time" "time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
@ -36,83 +38,49 @@ import (
"github.com/ethereum/go-ethereum/swarm/api/client" "github.com/ethereum/go-ethereum/swarm/api/client"
"github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/spancontext"
opentracing "github.com/opentracing/opentracing-go" opentracing "github.com/opentracing/opentracing-go"
"github.com/pborman/uuid"
cli "gopkg.in/urfave/cli.v1" cli "gopkg.in/urfave/cli.v1"
) )
var ( var (
commandName = "" commandName = ""
seed = int(time.Now().UTC().UnixNano())
) )
func wrapCliCommand(name string, killOnTimeout bool, command func(*cli.Context) error) func(*cli.Context) error { func init() {
rand.Seed(int64(seed))
}
func httpEndpoint(host string) string {
return fmt.Sprintf("http://%s:%d", host, httpPort)
}
func wsEndpoint(host string) string {
return fmt.Sprintf("ws://%s:%d", host, wsPort)
}
func wrapCliCommand(name string, command func(*cli.Context, string) error) func(*cli.Context) error {
return func(ctx *cli.Context) error { return func(ctx *cli.Context) error {
log.PrintOrigins(true) log.PrintOrigins(true)
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(false)))) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(false))))
// test uuid
tuid := uuid.New()[:8]
commandName = name
hosts = strings.Split(allhosts, ",")
defer func(now time.Time) { defer func(now time.Time) {
totalTime := time.Since(now) totalTime := time.Since(now)
log.Info("total time", "time", totalTime, "kb", filesize) log.Info("total time", "tuid", tuid, "time", totalTime, "kb", filesize)
metrics.GetOrRegisterResettingTimer(name+".total-time", nil).Update(totalTime) metrics.GetOrRegisterResettingTimer(name+".total-time", nil).Update(totalTime)
}(time.Now()) }(time.Now())
log.Info("smoke test starting", "task", name, "timeout", timeout) log.Info("smoke test starting", "tuid", tuid, "task", name, "timeout", timeout)
commandName = name
metrics.GetOrRegisterCounter(name, nil).Inc(1) metrics.GetOrRegisterCounter(name, nil).Inc(1)
errc := make(chan error) return command(ctx, tuid)
done := make(chan struct{})
if killOnTimeout {
go func() {
<-time.After(time.Duration(timeout) * time.Second)
close(done)
}()
}
go func() {
errc <- command(ctx)
}()
select {
case err := <-errc:
if err != nil {
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", name), nil).Inc(1)
}
return err
case <-done:
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", name), nil).Inc(1)
return fmt.Errorf("timeout after %v sec", timeout)
}
}
}
func generateEndpoints(scheme string, cluster string, app string, from int, to int) {
if cluster == "prod" {
for port := from; port < to; port++ {
endpoints = append(endpoints, fmt.Sprintf("%s://%v.swarm-gateways.net", scheme, port))
}
} else if cluster == "private-internal" {
for port := from; port < to; port++ {
endpoints = append(endpoints, fmt.Sprintf("%s://swarm-private-internal-%v:8500", scheme, port))
}
} else {
for port := from; port < to; port++ {
endpoints = append(endpoints, fmt.Sprintf("%s://%s-%v-%s.stg.swarm-gateways.net", scheme, app, port, cluster))
}
}
if includeLocalhost {
endpoints = append(endpoints, "http://localhost:8500")
}
}
//just use the first endpoint
func generateEndpoint(scheme string, cluster string, app string, from int) string {
if cluster == "prod" {
return fmt.Sprintf("%s://%v.swarm-gateways.net", scheme, from)
} else if cluster == "private-internal" {
return fmt.Sprintf("%s://swarm-private-internal-%v:8500", scheme, from)
} else {
return fmt.Sprintf("%s://%s-%v-%s.stg.swarm-gateways.net", scheme, app, from, cluster)
} }
} }
@ -174,11 +142,11 @@ func fetchFeed(topic string, user string, endpoint string, original []byte, ruid
} }
// fetch is getting the requested `hash` from the `endpoint` and compares it with the `original` file // fetch is getting the requested `hash` from the `endpoint` and compares it with the `original` file
func fetch(hash string, endpoint string, original []byte, ruid string) error { func fetch(hash string, endpoint string, original []byte, ruid string, tuid string) error {
ctx, sp := spancontext.StartSpan(context.Background(), "upload-and-sync.fetch") ctx, sp := spancontext.StartSpan(context.Background(), "upload-and-sync.fetch")
defer sp.Finish() defer sp.Finish()
log.Trace("http get request", "ruid", ruid, "api", endpoint, "hash", hash) log.Info("http get request", "tuid", tuid, "ruid", ruid, "endpoint", endpoint, "hash", hash)
var tn time.Time var tn time.Time
reqUri := endpoint + "/bzz:/" + hash + "/" reqUri := endpoint + "/bzz:/" + hash + "/"
@ -202,7 +170,7 @@ func fetch(hash string, endpoint string, original []byte, ruid string) error {
log.Error(err.Error(), "ruid", ruid) log.Error(err.Error(), "ruid", ruid)
return err return err
} }
log.Trace("http get response", "ruid", ruid, "api", endpoint, "hash", hash, "code", res.StatusCode, "len", res.ContentLength) log.Info("http get response", "tuid", tuid, "ruid", ruid, "endpoint", endpoint, "hash", hash, "code", res.StatusCode, "len", res.ContentLength)
if res.StatusCode != 200 { if res.StatusCode != 200 {
err := fmt.Errorf("expected status code %d, got %v", 200, res.StatusCode) err := fmt.Errorf("expected status code %d, got %v", 200, res.StatusCode)
@ -230,14 +198,14 @@ func fetch(hash string, endpoint string, original []byte, ruid string) error {
} }
// upload an arbitrary byte as a plaintext file to `endpoint` using the api client // upload an arbitrary byte as a plaintext file to `endpoint` using the api client
func upload(r io.Reader, size int, endpoint string) (string, error) { func upload(data []byte, endpoint string) (string, error) {
swarm := client.NewClient(endpoint) swarm := client.NewClient(endpoint)
f := &client.File{ f := &client.File{
ReadCloser: ioutil.NopCloser(r), ReadCloser: ioutil.NopCloser(bytes.NewReader(data)),
ManifestEntry: api.ManifestEntry{ ManifestEntry: api.ManifestEntry{
ContentType: "text/plain", ContentType: "text/plain",
Mode: 0660, Mode: 0660,
Size: int64(size), Size: int64(len(data)),
}, },
} }

Loading…
Cancel
Save