|
|
|
@ -5,6 +5,7 @@ package issues |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"context" |
|
|
|
|
"fmt" |
|
|
|
|
"os" |
|
|
|
|
"runtime/pprof" |
|
|
|
|
"sync/atomic" |
|
|
|
@ -202,11 +203,16 @@ func getIssueIndexerQueueHandler(ctx context.Context) func(items ...*IndexerMeta |
|
|
|
|
func populateIssueIndexer(ctx context.Context) { |
|
|
|
|
ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: PopulateIssueIndexer", process.SystemProcessType, true) |
|
|
|
|
defer finished() |
|
|
|
|
if err := PopulateIssueIndexer(ctx, true); err != nil { |
|
|
|
|
log.Error("Issue indexer population failed: %v", err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func PopulateIssueIndexer(ctx context.Context, keepRetrying bool) error { |
|
|
|
|
for page := 1; ; page++ { |
|
|
|
|
select { |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
log.Warn("Issue Indexer population shutdown before completion") |
|
|
|
|
return |
|
|
|
|
return fmt.Errorf("shutdown before completion: %w", ctx.Err()) |
|
|
|
|
default: |
|
|
|
|
} |
|
|
|
|
repos, _, err := repo_model.SearchRepositoryByName(ctx, &repo_model.SearchRepoOptions{ |
|
|
|
@ -221,21 +227,23 @@ func populateIssueIndexer(ctx context.Context) { |
|
|
|
|
} |
|
|
|
|
if len(repos) == 0 { |
|
|
|
|
log.Debug("Issue Indexer population complete") |
|
|
|
|
return |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for _, repo := range repos { |
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
log.Info("Issue Indexer population shutdown before completion") |
|
|
|
|
return |
|
|
|
|
return fmt.Errorf("shutdown before completion: %w", ctx.Err()) |
|
|
|
|
default: |
|
|
|
|
} |
|
|
|
|
if err := updateRepoIndexer(ctx, repo.ID); err != nil { |
|
|
|
|
if keepRetrying && ctx.Err() == nil { |
|
|
|
|
log.Warn("Retry to populate issue indexer for repo %d: %v", repo.ID, err) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
return fmt.Errorf("populate issue indexer for repo %d: %v", repo.ID, err) |
|
|
|
|
} |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|