swarm-smoke: add syncDelay flag

swarm/network: add want delay timer to syncing (#1367)

swarm/network: synchronise peer.close() (#1369)
pull/19550/head
Anton Evangelatov 6 years ago
parent ad6c39012f
commit 8802b9ce7f
  1. 7
      cmd/swarm/swarm-smoke/main.go
  2. 8
      cmd/swarm/swarm-smoke/sliding_window.go
  3. 6
      cmd/swarm/swarm-smoke/upload_and_sync.go
  4. 16
      swarm/network/stream/messages.go
  5. 5
      swarm/network/stream/peer.go

@ -40,7 +40,7 @@ var (
allhosts string allhosts string
hosts []string hosts []string
filesize int filesize int
syncDelay int syncDelay bool
inputSeed int inputSeed int
httpPort int httpPort int
wsPort int wsPort int
@ -87,10 +87,9 @@ func main() {
Usage: "file size for generated random file in KB", Usage: "file size for generated random file in KB",
Destination: &filesize, Destination: &filesize,
}, },
cli.IntFlag{ cli.BoolFlag{
Name: "sync-delay", Name: "sync-delay",
Value: 5, Usage: "wait for content to be synced",
Usage: "duration of delay in seconds to wait for content to be synced",
Destination: &syncDelay, Destination: &syncDelay,
}, },
cli.IntFlag{ cli.IntFlag{

@ -81,9 +81,13 @@ outer:
return err return err
} }
log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash), "sleeping", syncDelay) log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash), "wait for sync", syncDelay)
hashes = append(hashes, uploadResult{hash: hash, digest: fhash}) hashes = append(hashes, uploadResult{hash: hash, digest: fhash})
time.Sleep(time.Duration(syncDelay) * time.Second)
if syncDelay {
waitToSync()
}
uploadedBytes += filesize * 1000 uploadedBytes += filesize * 1000
q := make(chan struct{}, 1) q := make(chan struct{}, 1)
d := make(chan struct{}) d := make(chan struct{})

@ -197,7 +197,8 @@ func getBzzAddrFromHost(client *rpc.Client) (string, error) {
// we make an ugly assumption about the output format of the hive.String() method // we make an ugly assumption about the output format of the hive.String() method
// ideally we should replace this with an API call that returns the bzz addr for a given host, // ideally we should replace this with an API call that returns the bzz addr for a given host,
// but this also works for now (provided we don't change the hive.String() method, which we haven't in some time // but this also works for now (provided we don't change the hive.String() method, which we haven't in some time
return strings.Split(strings.Split(hive, "\n")[3], " ")[10], nil ss := strings.Split(strings.Split(hive, "\n")[3], " ")
return ss[len(ss)-1], nil
} }
// checkChunksVsMostProxHosts is checking: // checkChunksVsMostProxHosts is checking:
@ -284,6 +285,8 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error {
log.Info("uploaded successfully", "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash)) log.Info("uploaded successfully", "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash))
// wait to sync and log chunks before fetch attempt, only if syncDelay is set to true
if syncDelay {
waitToSync() waitToSync()
log.Debug("chunks before fetch attempt", "hash", hash) log.Debug("chunks before fetch attempt", "hash", hash)
@ -292,6 +295,7 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error {
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
} }
}
if onlyUpload { if onlyUpload {
log.Debug("only-upload is true, stoppping test", "hash", hash) log.Debug("only-upload is true, stoppping test", "hash", hash)

@ -223,6 +223,9 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
return fmt.Errorf("error initiaising bitvector of length %v: %v", lenHashes/HashSize, err) return fmt.Errorf("error initiaising bitvector of length %v: %v", lenHashes/HashSize, err)
} }
var wantDelaySet bool
var wantDelay time.Time
ctr := 0 ctr := 0
errC := make(chan error) errC := make(chan error)
ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout) ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout)
@ -234,6 +237,13 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
if wait := c.NeedData(ctx, hash); wait != nil { if wait := c.NeedData(ctx, hash); wait != nil {
ctr++ ctr++
want.Set(i/HashSize, true) want.Set(i/HashSize, true)
// measure how long it takes before we mark chunks for retrieval, and actually send the request
if !wantDelaySet {
wantDelaySet = true
wantDelay = time.Now()
}
// create request and wait until the chunk data arrives and is stored // create request and wait until the chunk data arrives and is stored
go func(w func(context.Context) error) { go func(w func(context.Context) error) {
select { select {
@ -304,6 +314,12 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
return return
} }
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
// record want delay
if wantDelaySet {
metrics.GetOrRegisterResettingTimer("handleoffered.wantdelay", nil).UpdateSince(wantDelay)
}
err := p.SendPriority(ctx, msg, c.priority) err := p.SendPriority(ctx, msg, c.priority)
if err != nil { if err != nil {
log.Warn("SendPriority error", "err", err) log.Warn("SendPriority error", "err", err)

@ -415,9 +415,14 @@ func (p *Peer) removeClientParams(s Stream) error {
} }
func (p *Peer) close() { func (p *Peer) close() {
p.serverMu.Lock()
defer p.serverMu.Unlock()
for _, s := range p.servers { for _, s := range p.servers {
s.Close() s.Close()
} }
p.servers = nil
} }
// runUpdateSyncing is a long running function that creates the initial // runUpdateSyncing is a long running function that creates the initial

Loading…
Cancel
Save