@ -16,17 +16,9 @@ import (
)
)
func runWorkerPoolQueue [ T any ] ( q * WorkerPoolQueue [ T ] ) func ( ) {
func runWorkerPoolQueue [ T any ] ( q * WorkerPoolQueue [ T ] ) func ( ) {
var stop func ( )
go q . Run ( )
started := make ( chan struct { } )
stopped := make ( chan struct { } )
go func ( ) {
q . Run ( func ( f func ( ) ) { stop = f ; close ( started ) } , nil )
close ( stopped )
} ( )
<- started
return func ( ) {
return func ( ) {
stop ( )
q . ShutdownWait ( 1 * time . Second )
<- stopped
}
}
}
}
@ -57,7 +49,7 @@ func TestWorkerPoolQueueUnhandled(t *testing.T) {
return unhandled
return unhandled
}
}
q , _ := NewWorkerPoolQueueBySetting ( "test-workpoolqueue" , queueSetting , handler , false )
q , _ := newWorkerPoolQueueForTest ( "test-workpoolqueue" , queueSetting , handler , false )
stop := runWorkerPoolQueue ( q )
stop := runWorkerPoolQueue ( q )
for i := 0 ; i < queueSetting . Length ; i ++ {
for i := 0 ; i < queueSetting . Length ; i ++ {
testRecorder . Record ( "push:%v" , i )
testRecorder . Record ( "push:%v" , i )
@ -145,7 +137,7 @@ func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSett
return nil
return nil
}
}
q , _ := NewWorkerPoolQueueBySetting ( "pr_patch_checker_test" , queueSetting , testHandler , true )
q , _ := newWorkerPoolQueueForTest ( "pr_patch_checker_test" , queueSetting , testHandler , true )
stop := runWorkerPoolQueue ( q )
stop := runWorkerPoolQueue ( q )
for i := 0 ; i < testCount ; i ++ {
for i := 0 ; i < testCount ; i ++ {
_ = q . Push ( "task-" + strconv . Itoa ( i ) )
_ = q . Push ( "task-" + strconv . Itoa ( i ) )
@ -169,7 +161,7 @@ func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSett
return nil
return nil
}
}
q , _ := NewWorkerPoolQueueBySetting ( "pr_patch_checker_test" , queueSetting , testHandler , true )
q , _ := newWorkerPoolQueueForTest ( "pr_patch_checker_test" , queueSetting , testHandler , true )
stop := runWorkerPoolQueue ( q )
stop := runWorkerPoolQueue ( q )
assert . NoError ( t , q . FlushWithContext ( context . Background ( ) , 0 ) )
assert . NoError ( t , q . FlushWithContext ( context . Background ( ) , 0 ) )
stop ( )
stop ( )
@ -194,7 +186,7 @@ func TestWorkerPoolQueueActiveWorkers(t *testing.T) {
return nil
return nil
}
}
q , _ := NewWorkerPoolQueueBySetting ( "test-workpoolqueue" , setting . QueueSettings { Type : "channel" , BatchLength : 1 , MaxWorkers : 1 , Length : 100 } , handler , false )
q , _ := newWorkerPoolQueueForTest ( "test-workpoolqueue" , setting . QueueSettings { Type : "channel" , BatchLength : 1 , MaxWorkers : 1 , Length : 100 } , handler , false )
stop := runWorkerPoolQueue ( q )
stop := runWorkerPoolQueue ( q )
for i := 0 ; i < 5 ; i ++ {
for i := 0 ; i < 5 ; i ++ {
assert . NoError ( t , q . Push ( i ) )
assert . NoError ( t , q . Push ( i ) )
@ -210,7 +202,7 @@ func TestWorkerPoolQueueActiveWorkers(t *testing.T) {
assert . EqualValues ( t , 1 , q . GetWorkerNumber ( ) ) // there is at least one worker after the queue begins working
assert . EqualValues ( t , 1 , q . GetWorkerNumber ( ) ) // there is at least one worker after the queue begins working
stop ( )
stop ( )
q , _ = NewWorkerPoolQueueBySetting ( "test-workpoolqueue" , setting . QueueSettings { Type : "channel" , BatchLength : 1 , MaxWorkers : 3 , Length : 100 } , handler , false )
q , _ = newWorkerPoolQueueForTest ( "test-workpoolqueue" , setting . QueueSettings { Type : "channel" , BatchLength : 1 , MaxWorkers : 3 , Length : 100 } , handler , false )
stop = runWorkerPoolQueue ( q )
stop = runWorkerPoolQueue ( q )
for i := 0 ; i < 15 ; i ++ {
for i := 0 ; i < 15 ; i ++ {
assert . NoError ( t , q . Push ( i ) )
assert . NoError ( t , q . Push ( i ) )
@ -238,23 +230,23 @@ func TestWorkerPoolQueueShutdown(t *testing.T) {
if items [ 0 ] == 0 {
if items [ 0 ] == 0 {
close ( handlerCalled )
close ( handlerCalled )
}
}
time . Sleep ( 1 00 * time . Millisecond )
time . Sleep ( 4 00 * time . Millisecond )
return items
return items
}
}
qs := setting . QueueSettings { Type : "level" , Datadir : t . TempDir ( ) + "/queue" , BatchLength : 3 , MaxWorkers : 4 , Length : 20 }
qs := setting . QueueSettings { Type : "level" , Datadir : t . TempDir ( ) + "/queue" , BatchLength : 3 , MaxWorkers : 4 , Length : 20 }
q , _ := NewWorkerPoolQueueBySetting ( "test-workpoolqueue" , qs , handler , false )
q , _ := newWorkerPoolQueueForTest ( "test-workpoolqueue" , qs , handler , false )
stop := runWorkerPoolQueue ( q )
stop := runWorkerPoolQueue ( q )
for i := 0 ; i < qs . Length ; i ++ {
for i := 0 ; i < qs . Length ; i ++ {
assert . NoError ( t , q . Push ( i ) )
assert . NoError ( t , q . Push ( i ) )
}
}
<- handlerCalled
<- handlerCalled
time . Sleep ( 5 0 * time . Millisecond ) // wait for a while to make sure all workers are active
time . Sleep ( 20 0 * time . Millisecond ) // wait for a while to make sure all workers are active
assert . EqualValues ( t , 4 , q . GetWorkerActiveNumber ( ) )
assert . EqualValues ( t , 4 , q . GetWorkerActiveNumber ( ) )
stop ( ) // stop triggers shutdown
stop ( ) // stop triggers shutdown
assert . EqualValues ( t , 0 , q . GetWorkerActiveNumber ( ) )
assert . EqualValues ( t , 0 , q . GetWorkerActiveNumber ( ) )
// no item was ever handled, so we still get all of them again
// no item was ever handled, so we still get all of them again
q , _ = NewWorkerPoolQueueBySetting ( "test-workpoolqueue" , qs , handler , false )
q , _ = newWorkerPoolQueueForTest ( "test-workpoolqueue" , qs , handler , false )
assert . EqualValues ( t , 20 , q . GetQueueItemNumber ( ) )
assert . EqualValues ( t , 20 , q . GetQueueItemNumber ( ) )
}
}