all: use timer instead of time.After in loops, to avoid memleaks (#29241)

time.After is equivalent to NewTimer(d).C, and does not call Stop if the timer is no longer needed. This can cause memory leaks. This change changes many such occations to use NewTimer instead, and calling Stop once the timer is no longer needed.
pull/29499/head
Bin 5 months ago committed by GitHub
parent 1126c6d8a5
commit 0bbd88bda0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 6
      core/bloombits/matcher.go
  2. 6
      eth/downloader/beaconsync.go
  3. 12
      eth/downloader/downloader.go
  4. 5
      ethstats/ethstats.go
  5. 5
      p2p/simulations/adapters/exec.go
  6. 10
      p2p/simulations/mocker.go
  7. 5
      p2p/simulations/network.go

@ -596,6 +596,9 @@ func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets []
// of the session, any request in-flight need to be responded to! Empty responses // of the session, any request in-flight need to be responded to! Empty responses
// are fine though in that case. // are fine though in that case.
func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) { func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
waitTimer := time.NewTimer(wait)
defer waitTimer.Stop()
for { for {
// Allocate a new bloom bit index to retrieve data for, stopping when done // Allocate a new bloom bit index to retrieve data for, stopping when done
bit, ok := s.allocateRetrieval() bit, ok := s.allocateRetrieval()
@ -604,6 +607,7 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan
} }
// Bit allocated, throttle a bit if we're below our batch limit // Bit allocated, throttle a bit if we're below our batch limit
if s.pendingSections(bit) < batch { if s.pendingSections(bit) < batch {
waitTimer.Reset(wait)
select { select {
case <-s.quit: case <-s.quit:
// Session terminating, we can't meaningfully service, abort // Session terminating, we can't meaningfully service, abort
@ -611,7 +615,7 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan
s.deliverSections(bit, []uint64{}, [][]byte{}) s.deliverSections(bit, []uint64{}, [][]byte{})
return return
case <-time.After(wait): case <-waitTimer.C:
// Throttling up, fetch whatever is available // Throttling up, fetch whatever is available
} }
} }

@ -289,6 +289,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
localHeaders = d.readHeaderRange(tail, int(count)) localHeaders = d.readHeaderRange(tail, int(count))
log.Warn("Retrieved beacon headers from local", "from", from, "count", count) log.Warn("Retrieved beacon headers from local", "from", from, "count", count)
} }
fsHeaderContCheckTimer := time.NewTimer(fsHeaderContCheck)
defer fsHeaderContCheckTimer.Stop()
for { for {
// Some beacon headers might have appeared since the last cycle, make // Some beacon headers might have appeared since the last cycle, make
// sure we're always syncing to all available ones // sure we're always syncing to all available ones
@ -381,8 +384,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
} }
// State sync still going, wait a bit for new headers and retry // State sync still going, wait a bit for new headers and retry
log.Trace("Pivot not yet committed, waiting...") log.Trace("Pivot not yet committed, waiting...")
fsHeaderContCheckTimer.Reset(fsHeaderContCheck)
select { select {
case <-time.After(fsHeaderContCheck): case <-fsHeaderContCheckTimer.C:
case <-d.cancelCh: case <-d.cancelCh:
return errCanceled return errCanceled
} }

@ -1276,7 +1276,10 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
var ( var (
mode = d.getMode() mode = d.getMode()
gotHeaders = false // Wait for batches of headers to process gotHeaders = false // Wait for batches of headers to process
timer = time.NewTimer(time.Second)
) )
defer timer.Stop()
for { for {
select { select {
case <-d.cancelCh: case <-d.cancelCh:
@ -1397,10 +1400,11 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
if mode == FullSync || mode == SnapSync { if mode == FullSync || mode == SnapSync {
// If we've reached the allowed number of pending headers, stall a bit // If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders { for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
timer.Reset(time.Second)
select { select {
case <-d.cancelCh: case <-d.cancelCh:
return errCanceled return errCanceled
case <-time.After(time.Second): case <-timer.C:
} }
} }
// Otherwise insert the headers for content retrieval // Otherwise insert the headers for content retrieval
@ -1567,7 +1571,10 @@ func (d *Downloader) processSnapSyncContent() error {
var ( var (
oldPivot *fetchResult // Locked in pivot block, might change eventually oldPivot *fetchResult // Locked in pivot block, might change eventually
oldTail []*fetchResult // Downloaded content after the pivot oldTail []*fetchResult // Downloaded content after the pivot
timer = time.NewTimer(time.Second)
) )
defer timer.Stop()
for { for {
// Wait for the next batch of downloaded data to be available. If we have // Wait for the next batch of downloaded data to be available. If we have
// not yet reached the pivot point, wait blockingly as there's no need to // not yet reached the pivot point, wait blockingly as there's no need to
@ -1650,6 +1657,7 @@ func (d *Downloader) processSnapSyncContent() error {
oldPivot = P oldPivot = P
} }
// Wait for completion, occasionally checking for pivot staleness // Wait for completion, occasionally checking for pivot staleness
timer.Reset(time.Second)
select { select {
case <-sync.done: case <-sync.done:
if sync.err != nil { if sync.err != nil {
@ -1660,7 +1668,7 @@ func (d *Downloader) processSnapSyncContent() error {
} }
oldPivot = nil oldPivot = nil
case <-time.After(time.Second): case <-timer.C:
oldTail = afterP oldTail = afterP
continue continue
} }

@ -544,10 +544,13 @@ func (s *Service) reportLatency(conn *connWrapper) error {
return err return err
} }
// Wait for the pong request to arrive back // Wait for the pong request to arrive back
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
select { select {
case <-s.pongCh: case <-s.pongCh:
// Pong delivered, report the latency // Pong delivered, report the latency
case <-time.After(5 * time.Second): case <-timer.C:
// Ping timeout, abort // Ping timeout, abort
return errors.New("ping timed out") return errors.New("ping timed out")
} }

@ -303,10 +303,13 @@ func (n *ExecNode) Stop() error {
go func() { go func() {
waitErr <- n.Cmd.Wait() waitErr <- n.Cmd.Wait()
}() }()
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
select { select {
case err := <-waitErr: case err := <-waitErr:
return err return err
case <-time.After(5 * time.Second): case <-timer.C:
return n.Cmd.Process.Kill() return n.Cmd.Process.Kill()
} }
} }

@ -65,8 +65,13 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) {
if err != nil { if err != nil {
panic("Could not startup node network for mocker") panic("Could not startup node network for mocker")
} }
tick := time.NewTicker(10 * time.Second) var (
tick = time.NewTicker(10 * time.Second)
timer = time.NewTimer(3 * time.Second)
)
defer tick.Stop() defer tick.Stop()
defer timer.Stop()
for { for {
select { select {
case <-quit: case <-quit:
@ -80,11 +85,12 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) {
return return
} }
timer.Reset(3 * time.Second)
select { select {
case <-quit: case <-quit:
log.Info("Terminating simulation loop") log.Info("Terminating simulation loop")
return return
case <-time.After(3 * time.Second): case <-timer.C:
} }
log.Debug("starting node", "id", id) log.Debug("starting node", "id", id)

@ -1028,11 +1028,14 @@ func (net *Network) Load(snap *Snapshot) error {
} }
} }
timeout := time.NewTimer(snapshotLoadTimeout)
defer timeout.Stop()
select { select {
// Wait until all connections from the snapshot are established. // Wait until all connections from the snapshot are established.
case <-allConnected: case <-allConnected:
// Make sure that we do not wait forever. // Make sure that we do not wait forever.
case <-time.After(snapshotLoadTimeout): case <-timeout.C:
return errors.New("snapshot connections not established") return errors.New("snapshot connections not established")
} }
return nil return nil

Loading…
Cancel
Save