From 91e7ad569ac9590b521e5c4fdfb2162f528db49f Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Mon, 7 Sep 2020 23:05:08 +0800 Subject: [PATCH] Add queue for code indexer (#10332) * Add queue for code indexer * Fix lint * Fix test * Fix lint * Fix bug * Fix bug * Fix lint * Add noqueue * Fix tests * Rename noqueue to immediate --- integrations/mssql.ini.tmpl | 3 + integrations/mysql.ini.tmpl | 3 + integrations/mysql8.ini.tmpl | 3 + integrations/pgsql.ini.tmpl | 3 + integrations/repo_search_test.go | 13 +- integrations/sqlite.ini.tmpl | 3 + modules/indexer/code/elastic_search.go | 5 + modules/indexer/code/indexer.go | 158 ++++++++++++++++++++++++- modules/indexer/code/queue.go | 154 ------------------------ modules/queue/queue.go | 59 ++++++++- 10 files changed, 233 insertions(+), 171 deletions(-) delete mode 100644 modules/indexer/code/queue.go diff --git a/integrations/mssql.ini.tmpl b/integrations/mssql.ini.tmpl index a8fbbe7fe5a..cfb35941261 100644 --- a/integrations/mssql.ini.tmpl +++ b/integrations/mssql.ini.tmpl @@ -14,6 +14,9 @@ ISSUE_INDEXER_PATH = integrations/indexers-mssql/issues.bleve REPO_INDEXER_ENABLED = true REPO_INDEXER_PATH = integrations/indexers-mssql/repos.bleve +[queue.code_indexer] +TYPE = immediate + [repository] ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-mssql/gitea-repositories diff --git a/integrations/mysql.ini.tmpl b/integrations/mysql.ini.tmpl index 5691311660c..8e3d2b3f14e 100644 --- a/integrations/mysql.ini.tmpl +++ b/integrations/mysql.ini.tmpl @@ -16,6 +16,9 @@ ISSUE_INDEXER_PATH = integrations/indexers-mysql/issues.bleve REPO_INDEXER_ENABLED = true REPO_INDEXER_PATH = integrations/indexers-mysql/repos.bleve +[queue.code_indexer] +TYPE = immediate + [repository] ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-mysql/gitea-repositories diff --git a/integrations/mysql8.ini.tmpl b/integrations/mysql8.ini.tmpl index a135ecb9812..ca77babf4b7 100644 --- a/integrations/mysql8.ini.tmpl +++ b/integrations/mysql8.ini.tmpl @@ -14,6 +14,9 @@ ISSUE_INDEXER_PATH = integrations/indexers-mysql8/issues.bleve REPO_INDEXER_ENABLED = true REPO_INDEXER_PATH = integrations/indexers-mysql8/repos.bleve +[queue.code_indexer] +TYPE = immediate + [repository] ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-mysql8/gitea-repositories diff --git a/integrations/pgsql.ini.tmpl b/integrations/pgsql.ini.tmpl index 4cac2585fbe..802296cf631 100644 --- a/integrations/pgsql.ini.tmpl +++ b/integrations/pgsql.ini.tmpl @@ -15,6 +15,9 @@ ISSUE_INDEXER_PATH = integrations/indexers-pgsql/issues.bleve REPO_INDEXER_ENABLED = true REPO_INDEXER_PATH = integrations/indexers-pgsql/repos.bleve +[queue.code_indexer] +TYPE = immediate + [repository] ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-pgsql/gitea-repositories diff --git a/integrations/repo_search_test.go b/integrations/repo_search_test.go index 701013735c9..6f2ee374600 100644 --- a/integrations/repo_search_test.go +++ b/integrations/repo_search_test.go @@ -7,7 +7,6 @@ package integrations import ( "net/http" "testing" - "time" "code.gitea.io/gitea/models" code_indexer "code.gitea.io/gitea/modules/indexer/code" @@ -62,14 +61,6 @@ func testSearch(t *testing.T, url string, expected []string) { assert.EqualValues(t, expected, filenames) } -func executeIndexer(t *testing.T, repo *models.Repository, op func(*models.Repository, ...chan<- error)) { - waiter := make(chan error, 1) - op(repo, waiter) - - select { - case err := <-waiter: - assert.NoError(t, err) - case <-time.After(1 * time.Minute): - assert.Fail(t, "Repository indexer took too long") - } +func executeIndexer(t *testing.T, repo *models.Repository, op func(*models.Repository)) { + op(repo) } diff --git a/integrations/sqlite.ini.tmpl b/integrations/sqlite.ini.tmpl index e899328c81f..5d54c5f9fab 100644 --- a/integrations/sqlite.ini.tmpl +++ b/integrations/sqlite.ini.tmpl @@ -10,6 +10,9 @@ ISSUE_INDEXER_PATH = integrations/indexers-sqlite/issues.bleve REPO_INDEXER_ENABLED = true REPO_INDEXER_PATH = integrations/indexers-sqlite/repos.bleve +[queue.code_indexer] +TYPE = immediate + [repository] ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-sqlite/gitea-repositories diff --git a/modules/indexer/code/elastic_search.go b/modules/indexer/code/elastic_search.go index 4f690ed8065..db36c5e0c4f 100644 --- a/modules/indexer/code/elastic_search.go +++ b/modules/indexer/code/elastic_search.go @@ -168,6 +168,11 @@ func (b *ElasticSearchIndexer) init() (bool, error) { } func (b *ElasticSearchIndexer) addUpdate(sha string, update fileUpdate, repo *models.Repository) ([]elastic.BulkableRequest, error) { + // Ignore vendored files in code search + if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) { + return nil, nil + } + stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha). RunInDir(repo.RepoPath()) if err != nil { diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index 468955cd893..54563733980 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -14,6 +14,7 @@ import ( "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/timeutil" ) @@ -38,7 +39,7 @@ type SearchResultLanguages struct { Count int } -// Indexer defines an interface to indexer issues contents +// Indexer defines an interface to index and search code contents type Indexer interface { Index(repo *models.Repository, sha string, changes *repoChanges) error Delete(repoID int64) error @@ -67,6 +68,40 @@ func filenameOfIndexerID(indexerID string) string { return indexerID[index+1:] } +// IndexerData represents data stored in the code indexer +type IndexerData struct { + RepoID int64 + IsDelete bool +} + +var ( + indexerQueue queue.Queue +) + +func index(indexer Indexer, repoID int64) error { + repo, err := models.GetRepositoryByID(repoID) + if err != nil { + return err + } + + sha, err := getDefaultBranchSha(repo) + if err != nil { + return err + } + changes, err := getRepoChanges(repo, sha) + if err != nil { + return err + } else if changes == nil { + return nil + } + + if err := indexer.Index(repo, sha, changes); err != nil { + return err + } + + return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha) +} + // Init initialize the repo indexer func Init() { if !setting.Indexer.RepoIndexerEnabled { @@ -74,8 +109,6 @@ func Init() { return } - initQueue(setting.Indexer.UpdateQueueLength) - ctx, cancel := context.WithCancel(context.Background()) graceful.GetManager().RunAtTerminate(ctx, func() { @@ -85,6 +118,46 @@ func Init() { }) waitChannel := make(chan time.Duration) + + // Create the Queue + switch setting.Indexer.RepoType { + case "bleve", "elasticsearch": + handler := func(data ...queue.Data) { + idx, err := indexer.get() + if idx == nil || err != nil { + log.Error("Codes indexer handler: unable to get indexer!") + return + } + + for _, datum := range data { + indexerData, ok := datum.(*IndexerData) + if !ok { + log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum) + continue + } + log.Trace("IndexerData Process: %v %t", indexerData.RepoID, indexerData.IsDelete) + + if indexerData.IsDelete { + if err := indexer.Delete(indexerData.RepoID); err != nil { + log.Error("indexer.Delete: %v", err) + } + } else { + if err := index(indexer, indexerData.RepoID); err != nil { + log.Error("index: %v", err) + continue + } + } + } + } + + indexerQueue = queue.CreateQueue("code_indexer", handler, &IndexerData{}) + if indexerQueue == nil { + log.Fatal("Unable to create codes indexer queue") + } + default: + log.Fatal("Unknown codes indexer type; %s", setting.Indexer.RepoType) + } + go func() { start := time.Now() var ( @@ -139,10 +212,11 @@ func Init() { indexer.set(rIndexer) - go processRepoIndexerOperationQueue(indexer) + // Start processing the queue + go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run) if populate { - go populateRepoIndexer() + go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer) } select { case waitChannel <- time.Since(start): @@ -179,3 +253,77 @@ func Init() { }() } } + +// DeleteRepoFromIndexer remove all of a repository's entries from the indexer +func DeleteRepoFromIndexer(repo *models.Repository) { + indexData := &IndexerData{RepoID: repo.ID, IsDelete: true} + if err := indexerQueue.Push(indexData); err != nil { + log.Error("Delete repo index data %v failed: %v", indexData, err) + } +} + +// UpdateRepoIndexer update a repository's entries in the indexer +func UpdateRepoIndexer(repo *models.Repository) { + indexData := &IndexerData{RepoID: repo.ID} + if err := indexerQueue.Push(indexData); err != nil { + log.Error("Update repo index data %v failed: %v", indexData, err) + } +} + +// populateRepoIndexer populate the repo indexer with pre-existing data. This +// should only be run when the indexer is created for the first time. +func populateRepoIndexer(ctx context.Context) { + log.Info("Populating the repo indexer with existing repositories") + + exist, err := models.IsTableNotEmpty("repository") + if err != nil { + log.Fatal("System error: %v", err) + } else if !exist { + return + } + + // if there is any existing repo indexer metadata in the DB, delete it + // since we are starting afresh. Also, xorm requires deletes to have a + // condition, and we want to delete everything, thus 1=1. + if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { + log.Fatal("System error: %v", err) + } + + var maxRepoID int64 + if maxRepoID, err = models.GetMaxID("repository"); err != nil { + log.Fatal("System error: %v", err) + } + + // start with the maximum existing repo ID and work backwards, so that we + // don't include repos that are created after gitea starts; such repos will + // already be added to the indexer, and we don't need to add them again. + for maxRepoID > 0 { + select { + case <-ctx.Done(): + log.Info("Repository Indexer population shutdown before completion") + return + default: + } + ids, err := models.GetUnindexedRepos(models.RepoIndexerTypeCode, maxRepoID, 0, 50) + if err != nil { + log.Error("populateRepoIndexer: %v", err) + return + } else if len(ids) == 0 { + break + } + for _, id := range ids { + select { + case <-ctx.Done(): + log.Info("Repository Indexer population shutdown before completion") + return + default: + } + if err := indexerQueue.Push(&IndexerData{RepoID: id}); err != nil { + log.Error("indexerQueue.Push: %v", err) + return + } + maxRepoID = id - 1 + } + } + log.Info("Done (re)populating the repo indexer with existing repositories") +} diff --git a/modules/indexer/code/queue.go b/modules/indexer/code/queue.go deleted file mode 100644 index 844003e1fcc..00000000000 --- a/modules/indexer/code/queue.go +++ /dev/null @@ -1,154 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package code - -import ( - "os" - - "code.gitea.io/gitea/models" - "code.gitea.io/gitea/modules/graceful" - "code.gitea.io/gitea/modules/log" -) - -type repoIndexerOperation struct { - repoID int64 - deleted bool - watchers []chan<- error -} - -var repoIndexerOperationQueue chan repoIndexerOperation - -func initQueue(queueLength int) { - repoIndexerOperationQueue = make(chan repoIndexerOperation, queueLength) -} - -func index(indexer Indexer, repoID int64) error { - repo, err := models.GetRepositoryByID(repoID) - if err != nil { - return err - } - - sha, err := getDefaultBranchSha(repo) - if err != nil { - return err - } - changes, err := getRepoChanges(repo, sha) - if err != nil { - return err - } else if changes == nil { - return nil - } - - if err := indexer.Index(repo, sha, changes); err != nil { - return err - } - - return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha) -} - -func processRepoIndexerOperationQueue(indexer Indexer) { - for { - select { - case op := <-repoIndexerOperationQueue: - var err error - if op.deleted { - if err = indexer.Delete(op.repoID); err != nil { - log.Error("indexer.Delete: %v", err) - } - } else { - if err = index(indexer, op.repoID); err != nil { - log.Error("indexer.Index: %v", err) - } - } - for _, watcher := range op.watchers { - watcher <- err - } - case <-graceful.GetManager().IsShutdown(): - log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid()) - return - } - } -} - -// DeleteRepoFromIndexer remove all of a repository's entries from the indexer -func DeleteRepoFromIndexer(repo *models.Repository, watchers ...chan<- error) { - addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: true, watchers: watchers}) -} - -// UpdateRepoIndexer update a repository's entries in the indexer -func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) { - addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: false, watchers: watchers}) -} - -func addOperationToQueue(op repoIndexerOperation) { - select { - case repoIndexerOperationQueue <- op: - break - default: - go func() { - repoIndexerOperationQueue <- op - }() - } -} - -// populateRepoIndexer populate the repo indexer with pre-existing data. This -// should only be run when the indexer is created for the first time. -func populateRepoIndexer() { - log.Info("Populating the repo indexer with existing repositories") - - isShutdown := graceful.GetManager().IsShutdown() - - exist, err := models.IsTableNotEmpty("repository") - if err != nil { - log.Fatal("System error: %v", err) - } else if !exist { - return - } - - // if there is any existing repo indexer metadata in the DB, delete it - // since we are starting afresh. Also, xorm requires deletes to have a - // condition, and we want to delete everything, thus 1=1. - if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { - log.Fatal("System error: %v", err) - } - - var maxRepoID int64 - if maxRepoID, err = models.GetMaxID("repository"); err != nil { - log.Fatal("System error: %v", err) - } - - // start with the maximum existing repo ID and work backwards, so that we - // don't include repos that are created after gitea starts; such repos will - // already be added to the indexer, and we don't need to add them again. - for maxRepoID > 0 { - select { - case <-isShutdown: - log.Info("Repository Indexer population shutdown before completion") - return - default: - } - ids, err := models.GetUnindexedRepos(models.RepoIndexerTypeCode, maxRepoID, 0, 50) - if err != nil { - log.Error("populateRepoIndexer: %v", err) - return - } else if len(ids) == 0 { - break - } - for _, id := range ids { - select { - case <-isShutdown: - log.Info("Repository Indexer population shutdown before completion") - return - default: - } - repoIndexerOperationQueue <- repoIndexerOperation{ - repoID: id, - deleted: false, - } - maxRepoID = id - 1 - } - } - log.Info("Done (re)populating the repo indexer with existing repositories") -} diff --git a/modules/queue/queue.go b/modules/queue/queue.go index e3c63310bef..d08cba35a1e 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -106,7 +106,64 @@ func (*DummyQueue) IsEmpty() bool { return true } -var queuesMap = map[Type]NewQueueFunc{DummyQueueType: NewDummyQueue} +// ImmediateType is the type to execute the function when push +const ImmediateType Type = "immediate" + +// NewImmediate creates a new false queue to execute the function when push +func NewImmediate(handler HandlerFunc, opts, exemplar interface{}) (Queue, error) { + return &Immediate{ + handler: handler, + }, nil +} + +// Immediate represents an direct execution queue +type Immediate struct { + handler HandlerFunc +} + +// Run does nothing +func (*Immediate) Run(_, _ func(context.Context, func())) {} + +// Push fakes a push of data to the queue +func (q *Immediate) Push(data Data) error { + return q.PushFunc(data, nil) +} + +// PushFunc fakes a push of data to the queue with a function. The function is never run. +func (q *Immediate) PushFunc(data Data, f func() error) error { + if f != nil { + if err := f(); err != nil { + return err + } + } + q.handler(data) + return nil +} + +// Has always returns false as this queue never does anything +func (*Immediate) Has(Data) (bool, error) { + return false, nil +} + +// Flush always returns nil +func (*Immediate) Flush(time.Duration) error { + return nil +} + +// FlushWithContext always returns nil +func (*Immediate) FlushWithContext(context.Context) error { + return nil +} + +// IsEmpty asserts that the queue is empty +func (*Immediate) IsEmpty() bool { + return true +} + +var queuesMap = map[Type]NewQueueFunc{ + DummyQueueType: NewDummyQueue, + ImmediateType: NewImmediate, +} // RegisteredTypes provides the list of requested types of queues func RegisteredTypes() []Type {