diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index 6f75b8357ea..baac0973939 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -124,7 +124,10 @@ func (q *ChannelQueue) Shutdown() { log.Trace("ChannelQueue: %s Flushing", q.name) // We can't use Cleanup here because that will close the channel if err := q.FlushWithContext(q.terminateCtx); err != nil { - log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name) + count := atomic.LoadInt64(&q.numInQueue) + if count > 0 { + log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name) + } return } log.Debug("ChannelQueue: %s Flushed", q.name) diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index c7526714c65..91f91f0dfc8 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -94,7 +94,8 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( }, Workers: 0, }, - DataDir: config.DataDir, + DataDir: config.DataDir, + QueueName: config.Name + "-level", } levelQueue, err := NewLevelQueue(wrappedHandle, levelCfg, exemplar) @@ -172,16 +173,18 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { atShutdown(q.Shutdown) atTerminate(q.Terminate) - if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.shutdownCtx) != 0 { + if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.terminateCtx) != 0 { // Just run the level queue - we shut it down once it's flushed go q.internal.Run(func(_ func()) {}, func(_ func()) {}) go func() { - for !q.IsEmpty() { - _ = q.internal.Flush(0) + for !lq.IsEmpty() { + _ = lq.Flush(0) select { case <-time.After(100 * time.Millisecond): - case <-q.internal.(*LevelQueue).shutdownCtx.Done(): - log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name()) + case <-lq.shutdownCtx.Done(): + if lq.byteFIFO.Len(lq.terminateCtx) > 0 { + log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name()) + } return } } @@ -316,10 +319,22 @@ func (q *PersistableChannelQueue) Shutdown() { // Redirect all remaining data in the chan to the internal channel log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) close(q.channelQueue.dataChan) + countOK, countLost := 0, 0 for data := range q.channelQueue.dataChan { - _ = q.internal.Push(data) + err := q.internal.Push(data) + if err != nil { + log.Error("PersistableChannelQueue: %s Unable redirect %v due to: %v", q.delayedStarter.name, data, err) + countLost++ + } else { + countOK++ + } atomic.AddInt64(&q.channelQueue.numInQueue, -1) } + if countLost > 0 { + log.Warn("PersistableChannelQueue: %s %d will be restored on restart, %d lost", q.delayedStarter.name, countOK, countLost) + } else if countOK > 0 { + log.Warn("PersistableChannelQueue: %s %d will be restored on restart", q.delayedStarter.name, countOK) + } log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name) log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name) diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index 318610355e4..4f14a5d79df 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -39,7 +39,7 @@ func TestPersistableChannelQueue(t *testing.T) { Workers: 1, BoostWorkers: 0, MaxWorkers: 10, - Name: "first", + Name: "test-queue", }, &testData{}) assert.NoError(t, err) @@ -135,7 +135,7 @@ func TestPersistableChannelQueue(t *testing.T) { Workers: 1, BoostWorkers: 0, MaxWorkers: 10, - Name: "second", + Name: "test-queue", }, &testData{}) assert.NoError(t, err) @@ -227,7 +227,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { Workers: 1, BoostWorkers: 0, MaxWorkers: 10, - Name: "first", + Name: "test-queue", }, &testData{}) assert.NoError(t, err) @@ -433,7 +433,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) { Workers: 1, BoostWorkers: 0, MaxWorkers: 10, - Name: "second", + Name: "test-queue", }, &testData{}) assert.NoError(t, err) pausable, ok = queue.(Pausable) diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index c43bd1db3f7..62c051aa393 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -177,7 +177,9 @@ func (q *ChannelUniqueQueue) Shutdown() { go func() { log.Trace("ChannelUniqueQueue: %s Flushing", q.name) if err := q.FlushWithContext(q.terminateCtx); err != nil { - log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name) + if !q.IsEmpty() { + log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name) + } return } log.Debug("ChannelUniqueQueue: %s Flushed", q.name) diff --git a/modules/queue/unique_queue_channel_test.go b/modules/queue/unique_queue_channel_test.go index 9372694b87a..824015b834f 100644 --- a/modules/queue/unique_queue_channel_test.go +++ b/modules/queue/unique_queue_channel_test.go @@ -8,10 +8,13 @@ import ( "testing" "time" + "code.gitea.io/gitea/modules/log" + "github.com/stretchr/testify/assert" ) func TestChannelUniqueQueue(t *testing.T) { + _ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`) handleChan := make(chan *testData) handle := func(data ...Data) []Data { for _, datum := range data { @@ -52,6 +55,8 @@ func TestChannelUniqueQueue(t *testing.T) { } func TestChannelUniqueQueue_Batch(t *testing.T) { + _ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`) + handleChan := make(chan *testData) handle := func(data ...Data) []Data { for _, datum := range data { @@ -98,6 +103,8 @@ func TestChannelUniqueQueue_Batch(t *testing.T) { } func TestChannelUniqueQueue_Pause(t *testing.T) { + _ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`) + lock := sync.Mutex{} var queue Queue var err error diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index 405726182dc..cc8a807c672 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -94,7 +94,8 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac }, Workers: 0, }, - DataDir: config.DataDir, + DataDir: config.DataDir, + QueueName: config.Name + "-level", } queue.channelQueue = channelUniqueQueue.(*ChannelUniqueQueue) @@ -209,17 +210,29 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func()) atTerminate(q.Terminate) _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) - if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len(luq.shutdownCtx) != 0 { + if luq, ok := q.internal.(*LevelUniqueQueue); ok && !luq.IsEmpty() { // Just run the level queue - we shut it down once it's flushed - go q.internal.Run(func(_ func()) {}, func(_ func()) {}) + go luq.Run(func(_ func()) {}, func(_ func()) {}) go func() { - _ = q.internal.Flush(0) - log.Debug("LevelUniqueQueue: %s flushed so shutting down", q.internal.(*LevelUniqueQueue).Name()) - q.internal.(*LevelUniqueQueue).Shutdown() - GetManager().Remove(q.internal.(*LevelUniqueQueue).qid) + _ = luq.Flush(0) + for !luq.IsEmpty() { + _ = luq.Flush(0) + select { + case <-time.After(100 * time.Millisecond): + case <-luq.shutdownCtx.Done(): + if luq.byteFIFO.Len(luq.terminateCtx) > 0 { + log.Warn("LevelUniqueQueue: %s shut down before completely flushed", luq.Name()) + } + return + } + } + log.Debug("LevelUniqueQueue: %s flushed so shutting down", luq.Name()) + luq.Shutdown() + GetManager().Remove(luq.qid) }() } else { log.Debug("PersistableChannelUniqueQueue: %s Skipping running the empty level queue", q.delayedStarter.name) + _ = q.internal.Flush(0) q.internal.(*LevelUniqueQueue).Shutdown() GetManager().Remove(q.internal.(*LevelUniqueQueue).qid) } @@ -285,8 +298,20 @@ func (q *PersistableChannelUniqueQueue) Shutdown() { // Redirect all remaining data in the chan to the internal channel close(q.channelQueue.dataChan) log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name) + countOK, countLost := 0, 0 for data := range q.channelQueue.dataChan { - _ = q.internal.Push(data) + err := q.internal.(*LevelUniqueQueue).Push(data) + if err != nil { + log.Error("PersistableChannelUniqueQueue: %s Unable redirect %v due to: %v", q.delayedStarter.name, data, err) + countLost++ + } else { + countOK++ + } + } + if countLost > 0 { + log.Warn("PersistableChannelUniqueQueue: %s %d will be restored on restart, %d lost", q.delayedStarter.name, countOK, countLost) + } else if countOK > 0 { + log.Warn("PersistableChannelUniqueQueue: %s %d will be restored on restart", q.delayedStarter.name, countOK) } log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name) diff --git a/modules/queue/unique_queue_disk_channel_test.go b/modules/queue/unique_queue_disk_channel_test.go new file mode 100644 index 00000000000..fd76163f4aa --- /dev/null +++ b/modules/queue/unique_queue_disk_channel_test.go @@ -0,0 +1,259 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package queue + +import ( + "fmt" + "strconv" + "sync" + "testing" + "time" + + "code.gitea.io/gitea/modules/log" + + "github.com/stretchr/testify/assert" +) + +func TestPersistableChannelUniqueQueue(t *testing.T) { + tmpDir := t.TempDir() + fmt.Printf("TempDir %s\n", tmpDir) + _ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`) + + // Common function to create the Queue + newQueue := func(name string, handle func(data ...Data) []Data) Queue { + q, err := NewPersistableChannelUniqueQueue(handle, + PersistableChannelUniqueQueueConfiguration{ + Name: name, + DataDir: tmpDir, + QueueLength: 200, + MaxWorkers: 1, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 1, + Workers: 0, + }, "task-0") + assert.NoError(t, err) + return q + } + + // runs the provided queue and provides some timer function + type channels struct { + readyForShutdown chan struct{} // closed when shutdown functions have been assigned + readyForTerminate chan struct{} // closed when terminate functions have been assigned + signalShutdown chan struct{} // Should close to signal shutdown + doneShutdown chan struct{} // closed when shutdown function is done + queueTerminate []func() // list of atTerminate functions to call atTerminate - need to be accessed with lock + } + runQueue := func(q Queue, lock *sync.Mutex) *channels { + chans := &channels{ + readyForShutdown: make(chan struct{}), + readyForTerminate: make(chan struct{}), + signalShutdown: make(chan struct{}), + doneShutdown: make(chan struct{}), + } + go q.Run(func(atShutdown func()) { + go func() { + lock.Lock() + select { + case <-chans.readyForShutdown: + default: + close(chans.readyForShutdown) + } + lock.Unlock() + <-chans.signalShutdown + atShutdown() + close(chans.doneShutdown) + }() + }, func(atTerminate func()) { + lock.Lock() + defer lock.Unlock() + select { + case <-chans.readyForTerminate: + default: + close(chans.readyForTerminate) + } + chans.queueTerminate = append(chans.queueTerminate, atTerminate) + }) + + return chans + } + + // call to shutdown and terminate the queue associated with the channels + doTerminate := func(chans *channels, lock *sync.Mutex) { + <-chans.readyForTerminate + + lock.Lock() + callbacks := []func(){} + callbacks = append(callbacks, chans.queueTerminate...) + lock.Unlock() + + for _, callback := range callbacks { + callback() + } + } + + mapLock := sync.Mutex{} + executedInitial := map[string][]string{} + hasInitial := map[string][]string{} + + fillQueue := func(name string, done chan struct{}) { + t.Run("Initial Filling: "+name, func(t *testing.T) { + lock := sync.Mutex{} + + startAt100Queued := make(chan struct{}) + stopAt20Shutdown := make(chan struct{}) // stop and shutdown at the 20th item + + handle := func(data ...Data) []Data { + <-startAt100Queued + for _, datum := range data { + s := datum.(string) + mapLock.Lock() + executedInitial[name] = append(executedInitial[name], s) + mapLock.Unlock() + if s == "task-20" { + close(stopAt20Shutdown) + } + } + return nil + } + + q := newQueue(name, handle) + + // add 100 tasks to the queue + for i := 0; i < 100; i++ { + _ = q.Push("task-" + strconv.Itoa(i)) + } + close(startAt100Queued) + + chans := runQueue(q, &lock) + + <-chans.readyForShutdown + <-stopAt20Shutdown + close(chans.signalShutdown) + <-chans.doneShutdown + _ = q.Push("final") + + // check which tasks are still in the queue + for i := 0; i < 100; i++ { + if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has { + mapLock.Lock() + hasInitial[name] = append(hasInitial[name], "task-"+strconv.Itoa(i)) + mapLock.Unlock() + } + } + if has, _ := q.(UniqueQueue).Has("final"); has { + mapLock.Lock() + hasInitial[name] = append(hasInitial[name], "final") + mapLock.Unlock() + } else { + assert.Fail(t, "UnqueQueue %s should have \"final\"", name) + } + doTerminate(chans, &lock) + mapLock.Lock() + assert.Equal(t, 101, len(executedInitial[name])+len(hasInitial[name])) + mapLock.Unlock() + }) + close(done) + } + + doneA := make(chan struct{}) + doneB := make(chan struct{}) + + go fillQueue("QueueA", doneA) + go fillQueue("QueueB", doneB) + + <-doneA + <-doneB + + executedEmpty := map[string][]string{} + hasEmpty := map[string][]string{} + emptyQueue := func(name string, done chan struct{}) { + t.Run("Empty Queue: "+name, func(t *testing.T) { + lock := sync.Mutex{} + stop := make(chan struct{}) + + // collect the tasks that have been executed + handle := func(data ...Data) []Data { + lock.Lock() + for _, datum := range data { + mapLock.Lock() + executedEmpty[name] = append(executedEmpty[name], datum.(string)) + mapLock.Unlock() + if datum.(string) == "final" { + close(stop) + } + } + lock.Unlock() + return nil + } + + q := newQueue(name, handle) + chans := runQueue(q, &lock) + + <-chans.readyForShutdown + <-stop + close(chans.signalShutdown) + <-chans.doneShutdown + + // check which tasks are still in the queue + for i := 0; i < 100; i++ { + if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has { + mapLock.Lock() + hasEmpty[name] = append(hasEmpty[name], "task-"+strconv.Itoa(i)) + mapLock.Unlock() + } + } + doTerminate(chans, &lock) + + mapLock.Lock() + assert.Equal(t, 101, len(executedInitial[name])+len(executedEmpty[name])) + assert.Equal(t, 0, len(hasEmpty[name])) + mapLock.Unlock() + }) + close(done) + } + + doneA = make(chan struct{}) + doneB = make(chan struct{}) + + go emptyQueue("QueueA", doneA) + go emptyQueue("QueueB", doneB) + + <-doneA + <-doneB + + mapLock.Lock() + t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v", + len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"])) + + // reset and rerun + executedInitial = map[string][]string{} + hasInitial = map[string][]string{} + executedEmpty = map[string][]string{} + hasEmpty = map[string][]string{} + mapLock.Unlock() + + doneA = make(chan struct{}) + doneB = make(chan struct{}) + + go fillQueue("QueueA", doneA) + go fillQueue("QueueB", doneB) + + <-doneA + <-doneB + + doneA = make(chan struct{}) + doneB = make(chan struct{}) + + go emptyQueue("QueueA", doneA) + go emptyQueue("QueueB", doneB) + + <-doneA + <-doneB + + mapLock.Lock() + t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v", + len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"])) + mapLock.Unlock() +}