Refactor globallock (#31933)

Follow #31908. The main refactor is that it has removed the returned
context of `Lock`.

The returned context of `Lock` in old code is to provide a way to let
callers know that they have lost the lock. But in most cases, callers
shouldn't cancel what they are doing even it has lost the lock. And the
design would confuse developers and make them use it incorrectly.

See the discussion history:
https://github.com/go-gitea/gitea/pull/31813#discussion_r1732041513 and
https://github.com/go-gitea/gitea/pull/31813#discussion_r1734078998

It's a breaking change, but since the new module hasn't been used yet, I
think it's OK to not add the `pr/breaking` label.

## Design principles

It's almost copied from #31908, but with some changes.

### Use spinlock even in memory implementation (unchanged)

In actual use cases, users may cancel requests. `sync.Mutex` will block
the goroutine until the lock is acquired even if the request is
canceled. And the spinlock is more suitable for this scenario since it's
possible to give up the lock acquisition.

Although the spinlock consumes more CPU resources, I think it's
acceptable in most cases.

### Do not expose the mutex to callers (unchanged)

If we expose the mutex to callers, it's possible for callers to reuse
the mutex, which causes more complexity.

For example:
```go
lock := GetLocker(key)
lock.Lock()
// ...
// even if the lock is unlocked, we cannot GC the lock,
// since the caller may still use it again.
lock.Unlock()
lock.Lock()
// ...
lock.Unlock()

// callers have to GC the lock manually.
RemoveLocker(key)
```

That's why
https://github.com/go-gitea/gitea/pull/31813#discussion_r1721200549

In this PR, we only expose `ReleaseFunc` to callers. So callers just
need to call `ReleaseFunc` to release the lock, and do not need to care
about the lock's lifecycle.
```go
release, err := locker.Lock(ctx, key)
if err != nil {
    return err
}
// ...
release()

// if callers want to lock again, they have to re-acquire the lock.
release, err := locker.Lock(ctx, key)
// ...
```

In this way, it's also much easier for redis implementation to extend
the mutex automatically, so that callers do not need to care about the
lock's lifecycle. See also
https://github.com/go-gitea/gitea/pull/31813#discussion_r1722659743

### Use "release" instead of "unlock" (unchanged)

For "unlock", it has the meaning of "unlock an acquired lock". So it's
not acceptable to call "unlock" when failed to acquire the lock, or call
"unlock" multiple times. It causes more complexity for callers to decide
whether to call "unlock" or not.

So we use "release" instead of "unlock" to make it clear. Whether the
lock is acquired or not, callers can always call "release", and it's
also safe to call "release" multiple times.

But the code DO NOT expect callers to not call "release" after acquiring
the lock. If callers forget to call "release", it will cause resource
leak. That's why it's always safe to call "release" without extra
checks: to avoid callers to forget to call it.

### Acquired locks could be lost, but the callers shouldn't stop

Unlike `sync.Mutex` which will be locked forever once acquired until
calling `Unlock`, for distributed lock, the acquired lock could be lost.

For example, the caller has acquired the lock, and it holds the lock for
a long time since auto-extending is working for redis. However, it lost
the connection to the redis server, and it's impossible to extend the
lock anymore.

In #31908, it will cancel the context to make the operation stop, but
it's not safe. Many operations are not revert-able. If they have been
interrupted, then the instance goes corrupted. So `Lock` won't return
`ctx` anymore in this PR.

### Multiple ways to use the lock

1. Regular way

```go
release, err := Lock(ctx, key)
if err != nil {
    return err
}
defer release()
// ...
```

2. Early release

```go
release, err := Lock(ctx, key)
if err != nil {
    return err
}
defer release()
// ...
// release the lock earlier
release()
// continue to do something else
// ...
```

3. Functional way

```go
if err := LockAndDo(ctx, key, func(ctx context.Context) error {
    // ...
    return nil
}); err != nil {
    return err
}
```
pull/31939/head^2
Jason Song 3 months ago committed by GitHub
parent 7207d93f01
commit bc0977f1c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 8
      modules/globallock/globallock.go
  2. 30
      modules/globallock/locker.go
  3. 70
      modules/globallock/locker_test.go
  4. 27
      modules/globallock/memory_locker.go
  5. 49
      modules/globallock/redis_locker.go

@ -27,20 +27,20 @@ func DefaultLocker() Locker {
// Lock tries to acquire a lock for the given key, it uses the default locker. // Lock tries to acquire a lock for the given key, it uses the default locker.
// Read the documentation of Locker.Lock for more information about the behavior. // Read the documentation of Locker.Lock for more information about the behavior.
func Lock(ctx context.Context, key string) (context.Context, ReleaseFunc, error) { func Lock(ctx context.Context, key string) (ReleaseFunc, error) {
return DefaultLocker().Lock(ctx, key) return DefaultLocker().Lock(ctx, key)
} }
// TryLock tries to acquire a lock for the given key, it uses the default locker. // TryLock tries to acquire a lock for the given key, it uses the default locker.
// Read the documentation of Locker.TryLock for more information about the behavior. // Read the documentation of Locker.TryLock for more information about the behavior.
func TryLock(ctx context.Context, key string) (bool, context.Context, ReleaseFunc, error) { func TryLock(ctx context.Context, key string) (bool, ReleaseFunc, error) {
return DefaultLocker().TryLock(ctx, key) return DefaultLocker().TryLock(ctx, key)
} }
// LockAndDo tries to acquire a lock for the given key and then calls the given function. // LockAndDo tries to acquire a lock for the given key and then calls the given function.
// It uses the default locker, and it will return an error if failed to acquire the lock. // It uses the default locker, and it will return an error if failed to acquire the lock.
func LockAndDo(ctx context.Context, key string, f func(context.Context) error) error { func LockAndDo(ctx context.Context, key string, f func(context.Context) error) error {
ctx, release, err := Lock(ctx, key) release, err := Lock(ctx, key)
if err != nil { if err != nil {
return err return err
} }
@ -52,7 +52,7 @@ func LockAndDo(ctx context.Context, key string, f func(context.Context) error) e
// TryLockAndDo tries to acquire a lock for the given key and then calls the given function. // TryLockAndDo tries to acquire a lock for the given key and then calls the given function.
// It uses the default locker, and it will return false if failed to acquire the lock. // It uses the default locker, and it will return false if failed to acquire the lock.
func TryLockAndDo(ctx context.Context, key string, f func(context.Context) error) (bool, error) { func TryLockAndDo(ctx context.Context, key string, f func(context.Context) error) (bool, error) {
ok, ctx, release, err := TryLock(ctx, key) ok, release, err := TryLock(ctx, key)
if err != nil { if err != nil {
return false, err return false, err
} }

@ -5,56 +5,34 @@ package globallock
import ( import (
"context" "context"
"fmt"
) )
type Locker interface { type Locker interface {
// Lock tries to acquire a lock for the given key, it blocks until the lock is acquired or the context is canceled. // Lock tries to acquire a lock for the given key, it blocks until the lock is acquired or the context is canceled.
// //
// Lock returns a new context which should be used in the following code.
// The new context will be canceled when the lock is released or lost - yes, it's possible to lose a lock.
// For example, it lost the connection to the redis server while holding the lock.
// If it fails to acquire the lock, the returned context will be the same as the input context.
//
// Lock returns a ReleaseFunc to release the lock, it cannot be nil. // Lock returns a ReleaseFunc to release the lock, it cannot be nil.
// It's always safe to call this function even if it fails to acquire the lock, and it will do nothing in that case. // It's always safe to call this function even if it fails to acquire the lock, and it will do nothing in that case.
// And it's also safe to call it multiple times, but it will only release the lock once. // And it's also safe to call it multiple times, but it will only release the lock once.
// That's why it's called ReleaseFunc, not UnlockFunc. // That's why it's called ReleaseFunc, not UnlockFunc.
// But be aware that it's not safe to not call it at all; it could lead to a memory leak. // But be aware that it's not safe to not call it at all; it could lead to a memory leak.
// So a recommended pattern is to use defer to call it: // So a recommended pattern is to use defer to call it:
// ctx, release, err := locker.Lock(ctx, "key") // release, err := locker.Lock(ctx, "key")
// if err != nil {
// return err
// }
// defer release()
// The ReleaseFunc will return the original context which was used to acquire the lock.
// It's useful when you want to continue to do something after releasing the lock.
// At that time, the ctx will be canceled, and you can use the returned context by the ReleaseFunc to continue:
// ctx, release, err := locker.Lock(ctx, "key")
// if err != nil { // if err != nil {
// return err // return err
// } // }
// defer release() // defer release()
// doSomething(ctx)
// ctx = release()
// doSomethingElse(ctx)
// Please ignore it and use `defer release()` instead if you don't need this, to avoid forgetting to release the lock.
// //
// Lock returns an error if failed to acquire the lock. // Lock returns an error if failed to acquire the lock.
// Be aware that even the context is not canceled, it's still possible to fail to acquire the lock. // Be aware that even the context is not canceled, it's still possible to fail to acquire the lock.
// For example, redis is down, or it reached the maximum number of tries. // For example, redis is down, or it reached the maximum number of tries.
Lock(ctx context.Context, key string) (context.Context, ReleaseFunc, error) Lock(ctx context.Context, key string) (ReleaseFunc, error)
// TryLock tries to acquire a lock for the given key, it returns immediately. // TryLock tries to acquire a lock for the given key, it returns immediately.
// It follows the same pattern as Lock, but it doesn't block. // It follows the same pattern as Lock, but it doesn't block.
// And if it fails to acquire the lock because it's already locked, not other reasons like redis is down, // And if it fails to acquire the lock because it's already locked, not other reasons like redis is down,
// it will return false without any error. // it will return false without any error.
TryLock(ctx context.Context, key string) (bool, context.Context, ReleaseFunc, error) TryLock(ctx context.Context, key string) (bool, ReleaseFunc, error)
} }
// ReleaseFunc is a function that releases a lock. // ReleaseFunc is a function that releases a lock.
// It returns the original context which was used to acquire the lock. type ReleaseFunc func()
type ReleaseFunc func() context.Context
// ErrLockReleased is used as context cause when a lock is released
var ErrLockReleased = fmt.Errorf("lock released")

@ -47,27 +47,24 @@ func TestLocker(t *testing.T) {
func testLocker(t *testing.T, locker Locker) { func testLocker(t *testing.T, locker Locker) {
t.Run("lock", func(t *testing.T) { t.Run("lock", func(t *testing.T) {
parentCtx := context.Background() parentCtx := context.Background()
ctx, release, err := locker.Lock(parentCtx, "test") release, err := locker.Lock(parentCtx, "test")
defer release() defer release()
assert.NotEqual(t, parentCtx, ctx) // new context should be returned
assert.NoError(t, err) assert.NoError(t, err)
func() { func() {
parentCtx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel() defer cancel()
ctx, release, err := locker.Lock(parentCtx, "test") release, err := locker.Lock(ctx, "test")
defer release() defer release()
assert.Error(t, err) assert.Error(t, err)
assert.Equal(t, parentCtx, ctx) // should return the same context
}() }()
release() release()
assert.Error(t, ctx.Err())
func() { func() {
_, release, err := locker.Lock(context.Background(), "test") release, err := locker.Lock(context.Background(), "test")
defer release() defer release()
assert.NoError(t, err) assert.NoError(t, err)
@ -76,29 +73,26 @@ func testLocker(t *testing.T, locker Locker) {
t.Run("try lock", func(t *testing.T) { t.Run("try lock", func(t *testing.T) {
parentCtx := context.Background() parentCtx := context.Background()
ok, ctx, release, err := locker.TryLock(parentCtx, "test") ok, release, err := locker.TryLock(parentCtx, "test")
defer release() defer release()
assert.True(t, ok) assert.True(t, ok)
assert.NotEqual(t, parentCtx, ctx) // new context should be returned
assert.NoError(t, err) assert.NoError(t, err)
func() { func() {
parentCtx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel() defer cancel()
ok, ctx, release, err := locker.TryLock(parentCtx, "test") ok, release, err := locker.TryLock(ctx, "test")
defer release() defer release()
assert.False(t, ok) assert.False(t, ok)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, parentCtx, ctx) // should return the same context
}() }()
release() release()
assert.Error(t, ctx.Err())
func() { func() {
ok, _, release, _ := locker.TryLock(context.Background(), "test") ok, release, _ := locker.TryLock(context.Background(), "test")
defer release() defer release()
assert.True(t, ok) assert.True(t, ok)
@ -107,7 +101,7 @@ func testLocker(t *testing.T, locker Locker) {
t.Run("wait and acquired", func(t *testing.T) { t.Run("wait and acquired", func(t *testing.T) {
ctx := context.Background() ctx := context.Background()
_, release, err := locker.Lock(ctx, "test") release, err := locker.Lock(ctx, "test")
require.NoError(t, err) require.NoError(t, err)
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
@ -115,7 +109,7 @@ func testLocker(t *testing.T, locker Locker) {
go func() { go func() {
defer wg.Done() defer wg.Done()
started := time.Now() started := time.Now()
_, release, err := locker.Lock(context.Background(), "test") // should be blocked for seconds release, err := locker.Lock(context.Background(), "test") // should be blocked for seconds
defer release() defer release()
assert.Greater(t, time.Since(started), time.Second) assert.Greater(t, time.Since(started), time.Second)
assert.NoError(t, err) assert.NoError(t, err)
@ -127,34 +121,15 @@ func testLocker(t *testing.T, locker Locker) {
wg.Wait() wg.Wait()
}) })
t.Run("continue after release", func(t *testing.T) {
ctx := context.Background()
ctxBeforeLock := ctx
ctx, release, err := locker.Lock(ctx, "test")
require.NoError(t, err)
assert.NoError(t, ctx.Err())
assert.NotEqual(t, ctxBeforeLock, ctx)
ctxBeforeRelease := ctx
ctx = release()
assert.NoError(t, ctx.Err())
assert.Error(t, ctxBeforeRelease.Err())
// so it can continue with ctx to do more work
})
t.Run("multiple release", func(t *testing.T) { t.Run("multiple release", func(t *testing.T) {
ctx := context.Background() ctx := context.Background()
_, release1, err := locker.Lock(ctx, "test") release1, err := locker.Lock(ctx, "test")
require.NoError(t, err) require.NoError(t, err)
release1() release1()
_, release2, err := locker.Lock(ctx, "test") release2, err := locker.Lock(ctx, "test")
defer release2() defer release2()
require.NoError(t, err) require.NoError(t, err)
@ -163,7 +138,7 @@ func testLocker(t *testing.T, locker Locker) {
// and it shouldn't affect the other lock // and it shouldn't affect the other lock
release1() release1()
ok, _, release3, err := locker.TryLock(ctx, "test") ok, release3, err := locker.TryLock(ctx, "test")
defer release3() defer release3()
require.NoError(t, err) require.NoError(t, err)
// It should be able to acquire the lock; // It should be able to acquire the lock;
@ -184,28 +159,23 @@ func testRedisLocker(t *testing.T, locker *redisLocker) {
// Otherwise, it will affect other tests. // Otherwise, it will affect other tests.
t.Run("close", func(t *testing.T) { t.Run("close", func(t *testing.T) {
assert.NoError(t, locker.Close()) assert.NoError(t, locker.Close())
_, _, err := locker.Lock(context.Background(), "test") _, err := locker.Lock(context.Background(), "test")
assert.Error(t, err) assert.Error(t, err)
}) })
}() }()
t.Run("failed extend", func(t *testing.T) { t.Run("failed extend", func(t *testing.T) {
ctx, release, err := locker.Lock(context.Background(), "test") release, err := locker.Lock(context.Background(), "test")
defer release() defer release()
require.NoError(t, err) require.NoError(t, err)
// It simulates that there are some problems with extending like network issues or redis server down. // It simulates that there are some problems with extending like network issues or redis server down.
v, ok := locker.mutexM.Load("test") v, ok := locker.mutexM.Load("test")
require.True(t, ok) require.True(t, ok)
m := v.(*redisMutex) m := v.(*redsync.Mutex)
_, _ = m.mutex.Unlock() // release it to make it impossible to extend _, _ = m.Unlock() // release it to make it impossible to extend
select { // In current design, callers can't know the lock can't be extended.
case <-time.After(redisLockExpiry + time.Second): // Just keep this case to improve the test coverage.
t.Errorf("lock should be expired")
case <-ctx.Done():
var errTaken *redsync.ErrTaken
assert.ErrorAs(t, context.Cause(ctx), &errTaken)
}
}) })
} }

@ -19,18 +19,13 @@ func NewMemoryLocker() Locker {
return &memoryLocker{} return &memoryLocker{}
} }
func (l *memoryLocker) Lock(ctx context.Context, key string) (context.Context, ReleaseFunc, error) { func (l *memoryLocker) Lock(ctx context.Context, key string) (ReleaseFunc, error) {
originalCtx := ctx
if l.tryLock(key) { if l.tryLock(key) {
ctx, cancel := context.WithCancelCause(ctx)
releaseOnce := sync.Once{} releaseOnce := sync.Once{}
return ctx, func() context.Context { return func() {
releaseOnce.Do(func() { releaseOnce.Do(func() {
l.locks.Delete(key) l.locks.Delete(key)
cancel(ErrLockReleased)
}) })
return originalCtx
}, nil }, nil
} }
@ -39,39 +34,31 @@ func (l *memoryLocker) Lock(ctx context.Context, key string) (context.Context, R
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx, func() context.Context { return originalCtx }, ctx.Err() return func() {}, ctx.Err()
case <-ticker.C: case <-ticker.C:
if l.tryLock(key) { if l.tryLock(key) {
ctx, cancel := context.WithCancelCause(ctx)
releaseOnce := sync.Once{} releaseOnce := sync.Once{}
return ctx, func() context.Context { return func() {
releaseOnce.Do(func() { releaseOnce.Do(func() {
l.locks.Delete(key) l.locks.Delete(key)
cancel(ErrLockReleased)
}) })
return originalCtx
}, nil }, nil
} }
} }
} }
} }
func (l *memoryLocker) TryLock(ctx context.Context, key string) (bool, context.Context, ReleaseFunc, error) { func (l *memoryLocker) TryLock(_ context.Context, key string) (bool, ReleaseFunc, error) {
originalCtx := ctx
if l.tryLock(key) { if l.tryLock(key) {
ctx, cancel := context.WithCancelCause(ctx)
releaseOnce := sync.Once{} releaseOnce := sync.Once{}
return true, ctx, func() context.Context { return true, func() {
releaseOnce.Do(func() { releaseOnce.Do(func() {
cancel(ErrLockReleased)
l.locks.Delete(key) l.locks.Delete(key)
}) })
return originalCtx
}, nil }, nil
} }
return false, ctx, func() context.Context { return originalCtx }, nil return false, func() {}, nil
} }
func (l *memoryLocker) tryLock(key string) bool { func (l *memoryLocker) tryLock(key string) bool {

@ -48,21 +48,21 @@ func NewRedisLocker(connection string) Locker {
return l return l
} }
func (l *redisLocker) Lock(ctx context.Context, key string) (context.Context, ReleaseFunc, error) { func (l *redisLocker) Lock(ctx context.Context, key string) (ReleaseFunc, error) {
return l.lock(ctx, key, 0) return l.lock(ctx, key, 0)
} }
func (l *redisLocker) TryLock(ctx context.Context, key string) (bool, context.Context, ReleaseFunc, error) { func (l *redisLocker) TryLock(ctx context.Context, key string) (bool, ReleaseFunc, error) {
ctx, f, err := l.lock(ctx, key, 1) f, err := l.lock(ctx, key, 1)
var ( var (
errTaken *redsync.ErrTaken errTaken *redsync.ErrTaken
errNodeTaken *redsync.ErrNodeTaken errNodeTaken *redsync.ErrNodeTaken
) )
if errors.As(err, &errTaken) || errors.As(err, &errNodeTaken) { if errors.As(err, &errTaken) || errors.As(err, &errNodeTaken) {
return false, ctx, f, nil return false, f, nil
} }
return err == nil, ctx, f, err return err == nil, f, err
} }
// Close closes the locker. // Close closes the locker.
@ -76,18 +76,11 @@ func (l *redisLocker) Close() error {
return nil return nil
} }
type redisMutex struct { func (l *redisLocker) lock(ctx context.Context, key string, tries int) (ReleaseFunc, error) {
mutex *redsync.Mutex
cancel context.CancelCauseFunc
}
func (l *redisLocker) lock(ctx context.Context, key string, tries int) (context.Context, ReleaseFunc, error) {
if l.closed.Load() { if l.closed.Load() {
return ctx, func() context.Context { return ctx }, fmt.Errorf("locker is closed") return func() {}, fmt.Errorf("locker is closed")
} }
originalCtx := ctx
options := []redsync.Option{ options := []redsync.Option{
redsync.WithExpiry(redisLockExpiry), redsync.WithExpiry(redisLockExpiry),
} }
@ -96,18 +89,13 @@ func (l *redisLocker) lock(ctx context.Context, key string, tries int) (context.
} }
mutex := l.rs.NewMutex(redisLockKeyPrefix+key, options...) mutex := l.rs.NewMutex(redisLockKeyPrefix+key, options...)
if err := mutex.LockContext(ctx); err != nil { if err := mutex.LockContext(ctx); err != nil {
return ctx, func() context.Context { return originalCtx }, err return func() {}, err
} }
ctx, cancel := context.WithCancelCause(ctx) l.mutexM.Store(key, mutex)
l.mutexM.Store(key, &redisMutex{
mutex: mutex,
cancel: cancel,
})
releaseOnce := sync.Once{} releaseOnce := sync.Once{}
return ctx, func() context.Context { return func() {
releaseOnce.Do(func() { releaseOnce.Do(func() {
l.mutexM.Delete(key) l.mutexM.Delete(key)
@ -115,10 +103,7 @@ func (l *redisLocker) lock(ctx context.Context, key string, tries int) (context.
// if it failed to unlock, it will be released automatically after the lock expires. // if it failed to unlock, it will be released automatically after the lock expires.
// Do not call mutex.UnlockContext(ctx) here, or it will fail to release when ctx has timed out. // Do not call mutex.UnlockContext(ctx) here, or it will fail to release when ctx has timed out.
_, _ = mutex.Unlock() _, _ = mutex.Unlock()
cancel(ErrLockReleased)
}) })
return originalCtx
}, nil }, nil
} }
@ -128,16 +113,15 @@ func (l *redisLocker) startExtend() {
return return
} }
toExtend := make([]*redisMutex, 0) toExtend := make([]*redsync.Mutex, 0)
l.mutexM.Range(func(_, value any) bool { l.mutexM.Range(func(_, value any) bool {
m := value.(*redisMutex) m := value.(*redsync.Mutex)
// Extend the lock if it is not expired. // Extend the lock if it is not expired.
// Although the mutex will be removed from the map before it is released, // Although the mutex will be removed from the map before it is released,
// it still can be expired because of a failed extension. // it still can be expired because of a failed extension.
// If it happens, the cancel function should have been called, // If it happens, it does not need to be extended anymore.
// so it does not need to be extended anymore. if time.Now().After(m.Until()) {
if time.Now().After(m.mutex.Until()) {
return true return true
} }
@ -145,9 +129,8 @@ func (l *redisLocker) startExtend() {
return true return true
}) })
for _, v := range toExtend { for _, v := range toExtend {
if ok, err := v.mutex.Extend(); !ok { // If it failed to extend, it will be released automatically after the lock expires.
v.cancel(err) _, _ = v.Extend()
}
} }
time.AfterFunc(redisLockExpiry/2, l.startExtend) time.AfterFunc(redisLockExpiry/2, l.startExtend)

Loading…
Cancel
Save