Use global lock instead of NewExclusivePool to allow distributed lock between multiple Gitea instances (#31813)

Replace #26486 
Fix #19620

---------

Co-authored-by: Jason Song <i@wolfogre.com>
pull/31997/head
Lunny Xiao 3 months ago committed by GitHub
parent a5818470fe
commit 2da2000413
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 15
      assets/go-licenses.json
  2. 6
      custom/conf/app.example.ini
  3. 12
      modules/globallock/globallock.go
  4. 37
      modules/setting/gloabl_lock.go
  5. 35
      modules/setting/global_lock_test.go
  6. 1
      modules/setting/setting.go
  7. 69
      modules/sync/exclusive_pool.go
  8. 13
      services/pull/check.go
  9. 25
      services/pull/merge.go
  10. 15
      services/pull/pull.go
  11. 9
      services/pull/update.go
  12. 32
      services/repository/transfer.go
  13. 23
      services/wiki/wiki.go

File diff suppressed because one or more lines are too long

@ -2713,3 +2713,9 @@ LEVEL = Info
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; storage type
;STORAGE_TYPE = local
;[global_lock]
;; Lock service type, could be memory or redis
;SERVICE_TYPE = memory
;; Ignored for the "memory" type. For "redis" use something like `redis://127.0.0.1:6379/0`
;SERVICE_CONN_STR =

@ -6,14 +6,22 @@ package globallock
import (
"context"
"sync"
"code.gitea.io/gitea/modules/setting"
)
var (
defaultLocker Locker
initOnce sync.Once
initFunc = func() {
// TODO: read the setting and initialize the default locker.
// Before implementing this, don't use it.
switch setting.GlobalLock.ServiceType {
case "redis":
defaultLocker = NewRedisLocker(setting.GlobalLock.ServiceConnStr)
case "memory":
fallthrough
default:
defaultLocker = NewMemoryLocker()
}
} // define initFunc as a variable to make it possible to change it in tests
)

@ -0,0 +1,37 @@
// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package setting
import (
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/nosql"
)
// GlobalLock represents configuration of global lock
var GlobalLock = struct {
ServiceType string
ServiceConnStr string
}{
ServiceType: "memory",
}
func loadGlobalLockFrom(rootCfg ConfigProvider) {
sec := rootCfg.Section("global_lock")
GlobalLock.ServiceType = sec.Key("SERVICE_TYPE").MustString("memory")
switch GlobalLock.ServiceType {
case "memory":
case "redis":
connStr := sec.Key("SERVICE_CONN_STR").String()
if connStr == "" {
log.Fatal("SERVICE_CONN_STR is empty for redis")
}
u := nosql.ToRedisURI(connStr)
if u == nil {
log.Fatal("SERVICE_CONN_STR %s is not a valid redis connection string", connStr)
}
GlobalLock.ServiceConnStr = connStr
default:
log.Fatal("Unknown sync lock service type: %s", GlobalLock.ServiceType)
}
}

@ -0,0 +1,35 @@
// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package setting
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestLoadGlobalLockConfig(t *testing.T) {
t.Run("DefaultGlobalLockConfig", func(t *testing.T) {
iniStr := ``
cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
loadGlobalLockFrom(cfg)
assert.EqualValues(t, "memory", GlobalLock.ServiceType)
})
t.Run("RedisGlobalLockConfig", func(t *testing.T) {
iniStr := `
[global_lock]
SERVICE_TYPE = redis
SERVICE_CONN_STR = addrs=127.0.0.1:6379 db=0
`
cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
loadGlobalLockFrom(cfg)
assert.EqualValues(t, "redis", GlobalLock.ServiceType)
assert.EqualValues(t, "addrs=127.0.0.1:6379 db=0", GlobalLock.ServiceConnStr)
})
}

@ -147,6 +147,7 @@ func loadCommonSettingsFrom(cfg ConfigProvider) error {
loadGitFrom(cfg)
loadMirrorFrom(cfg)
loadMarkupFrom(cfg)
loadGlobalLockFrom(cfg)
loadOtherFrom(cfg)
return nil
}

@ -1,69 +0,0 @@
// Copyright 2016 The Gogs Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package sync
import (
"sync"
)
// ExclusivePool is a pool of non-identical instances
// that only one instance with same identity is in the pool at a time.
// In other words, only instances with different identities can be in
// the pool the same time. If another instance with same identity tries
// to get into the pool, it hangs until previous instance left the pool.
//
// This pool is particularly useful for performing tasks on same resource
// on the file system in different goroutines.
type ExclusivePool struct {
lock sync.Mutex
// pool maintains locks for each instance in the pool.
pool map[string]*sync.Mutex
// count maintains the number of times an instance with same identity checks in
// to the pool, and should be reduced to 0 (removed from map) by checking out
// with same number of times.
// The purpose of count is to delete lock when count down to 0 and recycle memory
// from map object.
count map[string]int
}
// NewExclusivePool initializes and returns a new ExclusivePool object.
func NewExclusivePool() *ExclusivePool {
return &ExclusivePool{
pool: make(map[string]*sync.Mutex),
count: make(map[string]int),
}
}
// CheckIn checks in an instance to the pool and hangs while instance
// with same identity is using the lock.
func (p *ExclusivePool) CheckIn(identity string) {
p.lock.Lock()
lock, has := p.pool[identity]
if !has {
lock = &sync.Mutex{}
p.pool[identity] = lock
}
p.count[identity]++
p.lock.Unlock()
lock.Lock()
}
// CheckOut checks out an instance from the pool and releases the lock
// to let other instances with same identity to grab the lock.
func (p *ExclusivePool) CheckOut(identity string) {
p.lock.Lock()
defer p.lock.Unlock()
p.pool[identity].Unlock()
if p.count[identity] == 1 {
delete(p.pool, identity)
delete(p.count, identity)
} else {
p.count[identity]--
}
}

@ -21,6 +21,7 @@ import (
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/gitrepo"
"code.gitea.io/gitea/modules/globallock"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
@ -334,9 +335,15 @@ func handler(items ...string) []string {
}
func testPR(id int64) {
pullWorkingPool.CheckIn(fmt.Sprint(id))
defer pullWorkingPool.CheckOut(fmt.Sprint(id))
ctx, _, finished := process.GetManager().AddContext(graceful.GetManager().HammerContext(), fmt.Sprintf("Test PR[%d] from patch checking queue", id))
ctx := graceful.GetManager().HammerContext()
releaser, err := globallock.Lock(ctx, getPullWorkingLockKey(id))
if err != nil {
log.Error("lock.Lock(): %v", err)
return
}
defer releaser()
ctx, _, finished := process.GetManager().AddContext(ctx, fmt.Sprintf("Test PR[%d] from patch checking queue", id))
defer finished()
pr, err := issues_model.GetPullRequestByID(ctx, id)

@ -23,6 +23,7 @@ import (
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/cache"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/globallock"
"code.gitea.io/gitea/modules/httplib"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/references"
@ -169,9 +170,6 @@ func Merge(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.U
return fmt.Errorf("unable to load head repo: %w", err)
}
pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
prUnit, err := pr.BaseRepo.GetUnit(ctx, unit.TypePullRequests)
if err != nil {
log.Error("pr.BaseRepo.GetUnit(unit.TypePullRequests): %v", err)
@ -184,11 +182,18 @@ func Merge(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.U
return models.ErrInvalidMergeStyle{ID: pr.BaseRepo.ID, Style: mergeStyle}
}
releaser, err := globallock.Lock(ctx, getPullWorkingLockKey(pr.ID))
if err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}
defer releaser()
defer func() {
go AddTestPullRequestTask(doer, pr.BaseRepo.ID, pr.BaseBranch, false, "", "")
}()
_, err = doMergeAndPush(ctx, pr, doer, mergeStyle, expectedHeadCommitID, message, repo_module.PushTriggerPRMergeToBase)
releaser()
if err != nil {
return err
}
@ -487,10 +492,14 @@ func CheckPullBranchProtections(ctx context.Context, pr *issues_model.PullReques
// MergedManually mark pr as merged manually
func MergedManually(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, commitID string) error {
pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
releaser, err := globallock.Lock(ctx, getPullWorkingLockKey(pr.ID))
if err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}
defer releaser()
if err := db.WithTx(ctx, func(ctx context.Context) error {
err = db.WithTx(ctx, func(ctx context.Context) error {
if err := pr.LoadBaseRepo(ctx); err != nil {
return err
}
@ -540,7 +549,9 @@ func MergedManually(ctx context.Context, pr *issues_model.PullRequest, doer *use
return fmt.Errorf("SetMerged failed")
}
return nil
}); err != nil {
})
releaser()
if err != nil {
return err
}

@ -25,20 +25,21 @@ import (
"code.gitea.io/gitea/modules/container"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/gitrepo"
"code.gitea.io/gitea/modules/globallock"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
repo_module "code.gitea.io/gitea/modules/repository"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/sync"
"code.gitea.io/gitea/modules/util"
gitea_context "code.gitea.io/gitea/services/context"
issue_service "code.gitea.io/gitea/services/issue"
notify_service "code.gitea.io/gitea/services/notify"
)
// TODO: use clustered lock (unique queue? or *abuse* cache)
var pullWorkingPool = sync.NewExclusivePool()
func getPullWorkingLockKey(prID int64) string {
return fmt.Sprintf("pull_working_%d", prID)
}
// NewPullRequest creates new pull request with labels for repository.
func NewPullRequest(ctx context.Context, repo *repo_model.Repository, issue *issues_model.Issue, labelIDs []int64, uuids []string, pr *issues_model.PullRequest, assigneeIDs []int64) error {
@ -202,8 +203,12 @@ func NewPullRequest(ctx context.Context, repo *repo_model.Repository, issue *iss
// ChangeTargetBranch changes the target branch of this pull request, as the given user.
func ChangeTargetBranch(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.User, targetBranch string) (err error) {
pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
releaser, err := globallock.Lock(ctx, getPullWorkingLockKey(pr.ID))
if err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}
defer releaser()
// Current target branch is already the same
if pr.BaseBranch == targetBranch {

@ -14,6 +14,7 @@ import (
"code.gitea.io/gitea/models/unit"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/globallock"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/repository"
)
@ -25,8 +26,12 @@ func Update(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.
return fmt.Errorf("update of agit flow pull request's head branch is unsupported")
}
pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
releaser, err := globallock.Lock(ctx, getPullWorkingLockKey(pr.ID))
if err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}
defer releaser()
diffCount, err := GetDiverging(ctx, pr)
if err != nil {

@ -18,16 +18,16 @@ import (
project_model "code.gitea.io/gitea/models/project"
repo_model "code.gitea.io/gitea/models/repo"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/globallock"
"code.gitea.io/gitea/modules/log"
repo_module "code.gitea.io/gitea/modules/repository"
"code.gitea.io/gitea/modules/sync"
"code.gitea.io/gitea/modules/util"
notify_service "code.gitea.io/gitea/services/notify"
)
// repoWorkingPool represents a working pool to order the parallel changes to the same repository
// TODO: use clustered lock (unique queue? or *abuse* cache)
var repoWorkingPool = sync.NewExclusivePool()
func getRepoWorkingLockKey(repoID int64) string {
return fmt.Sprintf("repo_working_%d", repoID)
}
// TransferOwnership transfers all corresponding setting from old user to new one.
func TransferOwnership(ctx context.Context, doer, newOwner *user_model.User, repo *repo_model.Repository, teams []*organization.Team) error {
@ -42,12 +42,17 @@ func TransferOwnership(ctx context.Context, doer, newOwner *user_model.User, rep
oldOwner := repo.Owner
repoWorkingPool.CheckIn(fmt.Sprint(repo.ID))
releaser, err := globallock.Lock(ctx, getRepoWorkingLockKey(repo.ID))
if err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}
defer releaser()
if err := transferOwnership(ctx, doer, newOwner.Name, repo); err != nil {
repoWorkingPool.CheckOut(fmt.Sprint(repo.ID))
return err
}
repoWorkingPool.CheckOut(fmt.Sprint(repo.ID))
releaser()
newRepo, err := repo_model.GetRepositoryByID(ctx, repo.ID)
if err != nil {
@ -360,15 +365,20 @@ func ChangeRepositoryName(ctx context.Context, doer *user_model.User, repo *repo
oldRepoName := repo.Name
// Change repository directory name. We must lock the local copy of the
// repo so that we can atomically rename the repo path and updates the
// repo so that we can automatically rename the repo path and updates the
// local copy's origin accordingly.
repoWorkingPool.CheckIn(fmt.Sprint(repo.ID))
releaser, err := globallock.Lock(ctx, getRepoWorkingLockKey(repo.ID))
if err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}
defer releaser()
if err := changeRepositoryName(ctx, repo, newRepoName); err != nil {
repoWorkingPool.CheckOut(fmt.Sprint(repo.ID))
return err
}
repoWorkingPool.CheckOut(fmt.Sprint(repo.ID))
releaser()
repo.Name = newRepoName
notify_service.RenameRepository(ctx, doer, repo, oldRepoName)

@ -18,19 +18,20 @@ import (
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/gitrepo"
"code.gitea.io/gitea/modules/globallock"
"code.gitea.io/gitea/modules/log"
repo_module "code.gitea.io/gitea/modules/repository"
"code.gitea.io/gitea/modules/sync"
"code.gitea.io/gitea/modules/util"
asymkey_service "code.gitea.io/gitea/services/asymkey"
repo_service "code.gitea.io/gitea/services/repository"
)
// TODO: use clustered lock (unique queue? or *abuse* cache)
var wikiWorkingPool = sync.NewExclusivePool()
const DefaultRemote = "origin"
func getWikiWorkingLockKey(repoID int64) string {
return fmt.Sprintf("wiki_working_%d", repoID)
}
// InitWiki initializes a wiki for repository,
// it does nothing when repository already has wiki.
func InitWiki(ctx context.Context, repo *repo_model.Repository) error {
@ -89,8 +90,11 @@ func updateWikiPage(ctx context.Context, doer *user_model.User, repo *repo_model
if err = validateWebPath(newWikiName); err != nil {
return err
}
wikiWorkingPool.CheckIn(fmt.Sprint(repo.ID))
defer wikiWorkingPool.CheckOut(fmt.Sprint(repo.ID))
releaser, err := globallock.Lock(ctx, getWikiWorkingLockKey(repo.ID))
if err != nil {
return err
}
defer releaser()
if err = InitWiki(ctx, repo); err != nil {
return fmt.Errorf("InitWiki: %w", err)
@ -250,8 +254,11 @@ func DeleteWikiPage(ctx context.Context, doer *user_model.User, repo *repo_model
return err
}
wikiWorkingPool.CheckIn(fmt.Sprint(repo.ID))
defer wikiWorkingPool.CheckOut(fmt.Sprint(repo.ID))
releaser, err := globallock.Lock(ctx, getWikiWorkingLockKey(repo.ID))
if err != nil {
return err
}
defer releaser()
if err = InitWiki(ctx, repo); err != nil {
return fmt.Errorf("InitWiki: %w", err)

Loading…
Cancel
Save