diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go index 6a4cfb23db..486581fe23 100644 --- a/core/bloombits/matcher.go +++ b/core/bloombits/matcher.go @@ -596,6 +596,9 @@ func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets [] // of the session, any request in-flight need to be responded to! Empty responses // are fine though in that case. func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) { + waitTimer := time.NewTimer(wait) + defer waitTimer.Stop() + for { // Allocate a new bloom bit index to retrieve data for, stopping when done bit, ok := s.allocateRetrieval() @@ -604,6 +607,7 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan } // Bit allocated, throttle a bit if we're below our batch limit if s.pendingSections(bit) < batch { + waitTimer.Reset(wait) select { case <-s.quit: // Session terminating, we can't meaningfully service, abort @@ -611,7 +615,7 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan s.deliverSections(bit, []uint64{}, [][]byte{}) return - case <-time.After(wait): + case <-waitTimer.C: // Throttling up, fetch whatever is available } } diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index d3f75c8527..7dfc419f4e 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -289,6 +289,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error { localHeaders = d.readHeaderRange(tail, int(count)) log.Warn("Retrieved beacon headers from local", "from", from, "count", count) } + fsHeaderContCheckTimer := time.NewTimer(fsHeaderContCheck) + defer fsHeaderContCheckTimer.Stop() + for { // Some beacon headers might have appeared since the last cycle, make // sure we're always syncing to all available ones @@ -381,8 +384,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error { } // State sync still going, wait a bit for new headers and retry log.Trace("Pivot not yet committed, waiting...") + fsHeaderContCheckTimer.Reset(fsHeaderContCheck) select { - case <-time.After(fsHeaderContCheck): + case <-fsHeaderContCheckTimer.C: case <-d.cancelCh: return errCanceled } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 6b26822e22..941f575aa8 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1276,7 +1276,10 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode var ( mode = d.getMode() gotHeaders = false // Wait for batches of headers to process + timer = time.NewTimer(time.Second) ) + defer timer.Stop() + for { select { case <-d.cancelCh: @@ -1397,10 +1400,11 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode if mode == FullSync || mode == SnapSync { // If we've reached the allowed number of pending headers, stall a bit for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders { + timer.Reset(time.Second) select { case <-d.cancelCh: return errCanceled - case <-time.After(time.Second): + case <-timer.C: } } // Otherwise insert the headers for content retrieval @@ -1567,7 +1571,10 @@ func (d *Downloader) processSnapSyncContent() error { var ( oldPivot *fetchResult // Locked in pivot block, might change eventually oldTail []*fetchResult // Downloaded content after the pivot + timer = time.NewTimer(time.Second) ) + defer timer.Stop() + for { // Wait for the next batch of downloaded data to be available. If we have // not yet reached the pivot point, wait blockingly as there's no need to @@ -1650,6 +1657,7 @@ func (d *Downloader) processSnapSyncContent() error { oldPivot = P } // Wait for completion, occasionally checking for pivot staleness + timer.Reset(time.Second) select { case <-sync.done: if sync.err != nil { @@ -1660,7 +1668,7 @@ func (d *Downloader) processSnapSyncContent() error { } oldPivot = nil - case <-time.After(time.Second): + case <-timer.C: oldTail = afterP continue } diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index 6e71666ec1..c845db1164 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -544,10 +544,13 @@ func (s *Service) reportLatency(conn *connWrapper) error { return err } // Wait for the pong request to arrive back + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + select { case <-s.pongCh: // Pong delivered, report the latency - case <-time.After(5 * time.Second): + case <-timer.C: // Ping timeout, abort return errors.New("ping timed out") } diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index 5df2d7649c..6307b90bf8 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -303,10 +303,13 @@ func (n *ExecNode) Stop() error { go func() { waitErr <- n.Cmd.Wait() }() + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + select { case err := <-waitErr: return err - case <-time.After(5 * time.Second): + case <-timer.C: return n.Cmd.Process.Kill() } } diff --git a/p2p/simulations/mocker.go b/p2p/simulations/mocker.go index 0dc04e65f9..8763df67ef 100644 --- a/p2p/simulations/mocker.go +++ b/p2p/simulations/mocker.go @@ -65,8 +65,13 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) { if err != nil { panic("Could not startup node network for mocker") } - tick := time.NewTicker(10 * time.Second) + var ( + tick = time.NewTicker(10 * time.Second) + timer = time.NewTimer(3 * time.Second) + ) defer tick.Stop() + defer timer.Stop() + for { select { case <-quit: @@ -80,11 +85,12 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) { return } + timer.Reset(3 * time.Second) select { case <-quit: log.Info("Terminating simulation loop") return - case <-time.After(3 * time.Second): + case <-timer.C: } log.Debug("starting node", "id", id) diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index 0225a3bbaa..2eb8333cd6 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -1028,11 +1028,14 @@ func (net *Network) Load(snap *Snapshot) error { } } + timeout := time.NewTimer(snapshotLoadTimeout) + defer timeout.Stop() + select { // Wait until all connections from the snapshot are established. case <-allConnected: // Make sure that we do not wait forever. - case <-time.After(snapshotLoadTimeout): + case <-timeout.C: return errors.New("snapshot connections not established") } return nil