From 6ed812db131b0186e1023a5c5c389c4c9dbd08a3 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Thu, 1 Jul 2021 14:01:19 +0200 Subject: [PATCH] les: avoid shutdown hang (#23139) --- les/servingqueue.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/les/servingqueue.go b/les/servingqueue.go index 16e064cb3..10c7e6f48 100644 --- a/les/servingqueue.go +++ b/les/servingqueue.go @@ -159,27 +159,23 @@ func (sq *servingQueue) newTask(peer *clientPeer, maxTime uint64, priority int64 // run tokens from the token channel and allow the corresponding tasks to run // without entering the priority queue. func (sq *servingQueue) threadController() { + defer sq.wg.Done() for { token := make(runToken) select { case best := <-sq.queueBestCh: best.tokenCh <- token case <-sq.stopThreadCh: - sq.wg.Done() return case <-sq.quit: - sq.wg.Done() return } - <-token select { case <-sq.stopThreadCh: - sq.wg.Done() return case <-sq.quit: - sq.wg.Done() return - default: + case <-token: } } } @@ -298,6 +294,7 @@ func (sq *servingQueue) addTask(task *servingTask) { // and always tries to send the highest priority task to queueBestCh. Successfully sent // tasks are removed from the queue. func (sq *servingQueue) queueLoop() { + defer sq.wg.Done() for { if sq.best != nil { expTime := sq.best.expTime @@ -316,7 +313,6 @@ func (sq *servingQueue) queueLoop() { sq.best, _ = sq.queue.PopItem().(*servingTask) } case <-sq.quit: - sq.wg.Done() return } } else { @@ -324,7 +320,6 @@ func (sq *servingQueue) queueLoop() { case task := <-sq.queueAddCh: sq.addTask(task) case <-sq.quit: - sq.wg.Done() return } } @@ -335,6 +330,7 @@ func (sq *servingQueue) queueLoop() { // of active thread controller goroutines. func (sq *servingQueue) threadCountLoop() { var threadCountTarget int + defer sq.wg.Done() for { for threadCountTarget > sq.threadCount { sq.wg.Add(1) @@ -347,14 +343,12 @@ func (sq *servingQueue) threadCountLoop() { case sq.stopThreadCh <- struct{}{}: sq.threadCount-- case <-sq.quit: - sq.wg.Done() return } } else { select { case threadCountTarget = <-sq.setThreadsCh: case <-sq.quit: - sq.wg.Done() return } }