mirror of https://github.com/go-gitea/gitea
Azure blob storage support (#30995)
This PR implemented object storages(LFS/Packages/Attachments and etc.) for Azure Blob Storage. It depends on azure official golang SDK and can support both the azure blob storage cloud service and azurite mock server. Replace #25458 Fix #22527 - [x] CI Tests - [x] integration test, MSSQL integration tests will now based on azureblob - [x] unit test - [x] CLI Migrate Storage - [x] Documentation for configuration added ------ TODO (other PRs): - [ ] Improve performance of `blob download`. --------- Co-authored-by: yp05327 <576951401@qq.com>pull/31174/head^2
parent
015efcd8bf
commit
fb7b743bd0
File diff suppressed because one or more lines are too long
@ -0,0 +1,322 @@ |
|||||||
|
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: MIT
|
||||||
|
|
||||||
|
package storage |
||||||
|
|
||||||
|
import ( |
||||||
|
"context" |
||||||
|
"errors" |
||||||
|
"fmt" |
||||||
|
"io" |
||||||
|
"net/url" |
||||||
|
"os" |
||||||
|
"path" |
||||||
|
"strings" |
||||||
|
"time" |
||||||
|
|
||||||
|
"code.gitea.io/gitea/modules/log" |
||||||
|
"code.gitea.io/gitea/modules/setting" |
||||||
|
"code.gitea.io/gitea/modules/util" |
||||||
|
|
||||||
|
"github.com/Azure/azure-sdk-for-go/sdk/azcore" |
||||||
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" |
||||||
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" |
||||||
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" |
||||||
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" |
||||||
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" |
||||||
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas" |
||||||
|
) |
||||||
|
|
||||||
|
var _ Object = &azureBlobObject{} |
||||||
|
|
||||||
|
type azureBlobObject struct { |
||||||
|
blobClient *blob.Client |
||||||
|
Context context.Context |
||||||
|
Name string |
||||||
|
Size int64 |
||||||
|
ModTime *time.Time |
||||||
|
offset int64 |
||||||
|
} |
||||||
|
|
||||||
|
func (a *azureBlobObject) Read(p []byte) (int, error) { |
||||||
|
// TODO: improve the performance, we can implement another interface, maybe implement io.WriteTo
|
||||||
|
if a.offset >= a.Size { |
||||||
|
return 0, io.EOF |
||||||
|
} |
||||||
|
count := min(int64(len(p)), a.Size-a.offset) |
||||||
|
|
||||||
|
res, err := a.blobClient.DownloadBuffer(a.Context, p, &blob.DownloadBufferOptions{ |
||||||
|
Range: blob.HTTPRange{ |
||||||
|
Offset: a.offset, |
||||||
|
Count: count, |
||||||
|
}, |
||||||
|
}) |
||||||
|
if err != nil { |
||||||
|
return 0, convertAzureBlobErr(err) |
||||||
|
} |
||||||
|
a.offset += res |
||||||
|
|
||||||
|
return int(res), nil |
||||||
|
} |
||||||
|
|
||||||
|
func (a *azureBlobObject) Close() error { |
||||||
|
a.offset = 0 |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
func (a *azureBlobObject) Seek(offset int64, whence int) (int64, error) { |
||||||
|
switch whence { |
||||||
|
case io.SeekStart: |
||||||
|
case io.SeekCurrent: |
||||||
|
offset += a.offset |
||||||
|
case io.SeekEnd: |
||||||
|
offset = a.Size - offset |
||||||
|
default: |
||||||
|
return 0, errors.New("Seek: invalid whence") |
||||||
|
} |
||||||
|
|
||||||
|
if offset > a.Size { |
||||||
|
return 0, errors.New("Seek: invalid offset") |
||||||
|
} else if offset < 0 { |
||||||
|
return 0, errors.New("Seek: invalid offset") |
||||||
|
} |
||||||
|
a.offset = offset |
||||||
|
return a.offset, nil |
||||||
|
} |
||||||
|
|
||||||
|
func (a *azureBlobObject) Stat() (os.FileInfo, error) { |
||||||
|
return &azureBlobFileInfo{ |
||||||
|
a.Name, |
||||||
|
a.Size, |
||||||
|
*a.ModTime, |
||||||
|
}, nil |
||||||
|
} |
||||||
|
|
||||||
|
var _ ObjectStorage = &AzureBlobStorage{} |
||||||
|
|
||||||
|
// AzureStorage returns a azure blob storage
|
||||||
|
type AzureBlobStorage struct { |
||||||
|
cfg *setting.AzureBlobStorageConfig |
||||||
|
ctx context.Context |
||||||
|
credential *azblob.SharedKeyCredential |
||||||
|
client *azblob.Client |
||||||
|
} |
||||||
|
|
||||||
|
func convertAzureBlobErr(err error) error { |
||||||
|
if err == nil { |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
if bloberror.HasCode(err, bloberror.BlobNotFound) { |
||||||
|
return os.ErrNotExist |
||||||
|
} |
||||||
|
var respErr *azcore.ResponseError |
||||||
|
if !errors.As(err, &respErr) { |
||||||
|
return err |
||||||
|
} |
||||||
|
return fmt.Errorf(respErr.ErrorCode) |
||||||
|
} |
||||||
|
|
||||||
|
// NewAzureBlobStorage returns a azure blob storage
|
||||||
|
func NewAzureBlobStorage(ctx context.Context, cfg *setting.Storage) (ObjectStorage, error) { |
||||||
|
config := cfg.AzureBlobConfig |
||||||
|
|
||||||
|
log.Info("Creating Azure Blob storage at %s:%s with base path %s", config.Endpoint, config.Container, config.BasePath) |
||||||
|
|
||||||
|
cred, err := azblob.NewSharedKeyCredential(config.AccountName, config.AccountKey) |
||||||
|
if err != nil { |
||||||
|
return nil, convertAzureBlobErr(err) |
||||||
|
} |
||||||
|
client, err := azblob.NewClientWithSharedKeyCredential(config.Endpoint, cred, &azblob.ClientOptions{}) |
||||||
|
if err != nil { |
||||||
|
return nil, convertAzureBlobErr(err) |
||||||
|
} |
||||||
|
|
||||||
|
_, err = client.CreateContainer(ctx, config.Container, &container.CreateOptions{}) |
||||||
|
if err != nil { |
||||||
|
// Check to see if we already own this container (which happens if you run this twice)
|
||||||
|
if !bloberror.HasCode(err, bloberror.ContainerAlreadyExists) { |
||||||
|
return nil, convertMinioErr(err) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
return &AzureBlobStorage{ |
||||||
|
cfg: &config, |
||||||
|
ctx: ctx, |
||||||
|
credential: cred, |
||||||
|
client: client, |
||||||
|
}, nil |
||||||
|
} |
||||||
|
|
||||||
|
func (a *AzureBlobStorage) buildAzureBlobPath(p string) string { |
||||||
|
p = util.PathJoinRelX(a.cfg.BasePath, p) |
||||||
|
if p == "." || p == "/" { |
||||||
|
p = "" // azure uses prefix, so path should be empty as relative path
|
||||||
|
} |
||||||
|
return p |
||||||
|
} |
||||||
|
|
||||||
|
func (a *AzureBlobStorage) getObjectNameFromPath(path string) string { |
||||||
|
s := strings.Split(path, "/") |
||||||
|
return s[len(s)-1] |
||||||
|
} |
||||||
|
|
||||||
|
// Open opens a file
|
||||||
|
func (a *AzureBlobStorage) Open(path string) (Object, error) { |
||||||
|
blobClient, err := a.getBlobClient(path) |
||||||
|
if err != nil { |
||||||
|
return nil, convertAzureBlobErr(err) |
||||||
|
} |
||||||
|
res, err := blobClient.GetProperties(a.ctx, &blob.GetPropertiesOptions{}) |
||||||
|
if err != nil { |
||||||
|
return nil, convertAzureBlobErr(err) |
||||||
|
} |
||||||
|
return &azureBlobObject{ |
||||||
|
Context: a.ctx, |
||||||
|
blobClient: blobClient, |
||||||
|
Name: a.getObjectNameFromPath(path), |
||||||
|
Size: *res.ContentLength, |
||||||
|
ModTime: res.LastModified, |
||||||
|
}, nil |
||||||
|
} |
||||||
|
|
||||||
|
// Save saves a file to azure blob storage
|
||||||
|
func (a *AzureBlobStorage) Save(path string, r io.Reader, size int64) (int64, error) { |
||||||
|
rd := util.NewCountingReader(r) |
||||||
|
_, err := a.client.UploadStream( |
||||||
|
a.ctx, |
||||||
|
a.cfg.Container, |
||||||
|
a.buildAzureBlobPath(path), |
||||||
|
rd, |
||||||
|
// TODO: support set block size and concurrency
|
||||||
|
&blockblob.UploadStreamOptions{}, |
||||||
|
) |
||||||
|
if err != nil { |
||||||
|
return 0, convertAzureBlobErr(err) |
||||||
|
} |
||||||
|
return int64(rd.Count()), nil |
||||||
|
} |
||||||
|
|
||||||
|
type azureBlobFileInfo struct { |
||||||
|
name string |
||||||
|
size int64 |
||||||
|
modTime time.Time |
||||||
|
} |
||||||
|
|
||||||
|
func (a azureBlobFileInfo) Name() string { |
||||||
|
return path.Base(a.name) |
||||||
|
} |
||||||
|
|
||||||
|
func (a azureBlobFileInfo) Size() int64 { |
||||||
|
return a.size |
||||||
|
} |
||||||
|
|
||||||
|
func (a azureBlobFileInfo) ModTime() time.Time { |
||||||
|
return a.modTime |
||||||
|
} |
||||||
|
|
||||||
|
func (a azureBlobFileInfo) IsDir() bool { |
||||||
|
return strings.HasSuffix(a.name, "/") |
||||||
|
} |
||||||
|
|
||||||
|
func (a azureBlobFileInfo) Mode() os.FileMode { |
||||||
|
return os.ModePerm |
||||||
|
} |
||||||
|
|
||||||
|
func (a azureBlobFileInfo) Sys() any { |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
// Stat returns the stat information of the object
|
||||||
|
func (a *AzureBlobStorage) Stat(path string) (os.FileInfo, error) { |
||||||
|
blobClient, err := a.getBlobClient(path) |
||||||
|
if err != nil { |
||||||
|
return nil, convertAzureBlobErr(err) |
||||||
|
} |
||||||
|
res, err := blobClient.GetProperties(a.ctx, &blob.GetPropertiesOptions{}) |
||||||
|
if err != nil { |
||||||
|
return nil, convertAzureBlobErr(err) |
||||||
|
} |
||||||
|
s := strings.Split(path, "/") |
||||||
|
return &azureBlobFileInfo{ |
||||||
|
s[len(s)-1], |
||||||
|
*res.ContentLength, |
||||||
|
*res.LastModified, |
||||||
|
}, nil |
||||||
|
} |
||||||
|
|
||||||
|
// Delete delete a file
|
||||||
|
func (a *AzureBlobStorage) Delete(path string) error { |
||||||
|
blobClient, err := a.getBlobClient(path) |
||||||
|
if err != nil { |
||||||
|
return convertAzureBlobErr(err) |
||||||
|
} |
||||||
|
_, err = blobClient.Delete(a.ctx, nil) |
||||||
|
return convertAzureBlobErr(err) |
||||||
|
} |
||||||
|
|
||||||
|
// URL gets the redirect URL to a file. The presigned link is valid for 5 minutes.
|
||||||
|
func (a *AzureBlobStorage) URL(path, name string) (*url.URL, error) { |
||||||
|
blobClient, err := a.getBlobClient(path) |
||||||
|
if err != nil { |
||||||
|
return nil, convertAzureBlobErr(err) |
||||||
|
} |
||||||
|
|
||||||
|
startTime := time.Now() |
||||||
|
u, err := blobClient.GetSASURL(sas.BlobPermissions{ |
||||||
|
Read: true, |
||||||
|
}, time.Now().Add(5*time.Minute), &blob.GetSASURLOptions{ |
||||||
|
StartTime: &startTime, |
||||||
|
}) |
||||||
|
if err != nil { |
||||||
|
return nil, convertAzureBlobErr(err) |
||||||
|
} |
||||||
|
|
||||||
|
return url.Parse(u) |
||||||
|
} |
||||||
|
|
||||||
|
// IterateObjects iterates across the objects in the azureblobstorage
|
||||||
|
func (a *AzureBlobStorage) IterateObjects(dirName string, fn func(path string, obj Object) error) error { |
||||||
|
dirName = a.buildAzureBlobPath(dirName) |
||||||
|
if dirName != "" { |
||||||
|
dirName += "/" |
||||||
|
} |
||||||
|
pager := a.client.NewListBlobsFlatPager(a.cfg.Container, &container.ListBlobsFlatOptions{ |
||||||
|
Prefix: &dirName, |
||||||
|
}) |
||||||
|
for pager.More() { |
||||||
|
resp, err := pager.NextPage(a.ctx) |
||||||
|
if err != nil { |
||||||
|
return convertAzureBlobErr(err) |
||||||
|
} |
||||||
|
for _, object := range resp.Segment.BlobItems { |
||||||
|
blobClient, err := a.getBlobClient(*object.Name) |
||||||
|
if err != nil { |
||||||
|
return convertAzureBlobErr(err) |
||||||
|
} |
||||||
|
object := &azureBlobObject{ |
||||||
|
Context: a.ctx, |
||||||
|
blobClient: blobClient, |
||||||
|
Name: *object.Name, |
||||||
|
Size: *object.Properties.ContentLength, |
||||||
|
ModTime: object.Properties.LastModified, |
||||||
|
} |
||||||
|
if err := func(object *azureBlobObject, fn func(path string, obj Object) error) error { |
||||||
|
defer object.Close() |
||||||
|
return fn(strings.TrimPrefix(object.Name, a.cfg.BasePath), object) |
||||||
|
}(object, fn); err != nil { |
||||||
|
return convertAzureBlobErr(err) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
// Delete delete a file
|
||||||
|
func (a *AzureBlobStorage) getBlobClient(path string) (*blob.Client, error) { |
||||||
|
return a.client.ServiceClient().NewContainerClient(a.cfg.Container).NewBlobClient(a.buildAzureBlobPath(path)), nil |
||||||
|
} |
||||||
|
|
||||||
|
func init() { |
||||||
|
RegisterStorageType(setting.AzureBlobStorageType, NewAzureBlobStorage) |
||||||
|
} |
@ -0,0 +1,56 @@ |
|||||||
|
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: MIT
|
||||||
|
|
||||||
|
package storage |
||||||
|
|
||||||
|
import ( |
||||||
|
"os" |
||||||
|
"testing" |
||||||
|
|
||||||
|
"code.gitea.io/gitea/modules/setting" |
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert" |
||||||
|
) |
||||||
|
|
||||||
|
func TestAzureBlobStorageIterator(t *testing.T) { |
||||||
|
if os.Getenv("CI") == "" { |
||||||
|
t.Skip("azureBlobStorage not present outside of CI") |
||||||
|
return |
||||||
|
} |
||||||
|
testStorageIterator(t, setting.AzureBlobStorageType, &setting.Storage{ |
||||||
|
AzureBlobConfig: setting.AzureBlobStorageConfig{ |
||||||
|
// https://learn.microsoft.com/azure/storage/common/storage-use-azurite?tabs=visual-studio-code#ip-style-url
|
||||||
|
Endpoint: "http://devstoreaccount1.azurite.local:10000", |
||||||
|
// https://learn.microsoft.com/azure/storage/common/storage-use-azurite?tabs=visual-studio-code#well-known-storage-account-and-key
|
||||||
|
AccountName: "devstoreaccount1", |
||||||
|
AccountKey: "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", |
||||||
|
Container: "test", |
||||||
|
}, |
||||||
|
}) |
||||||
|
} |
||||||
|
|
||||||
|
func TestAzureBlobStoragePath(t *testing.T) { |
||||||
|
m := &AzureBlobStorage{cfg: &setting.AzureBlobStorageConfig{BasePath: ""}} |
||||||
|
assert.Equal(t, "", m.buildAzureBlobPath("/")) |
||||||
|
assert.Equal(t, "", m.buildAzureBlobPath(".")) |
||||||
|
assert.Equal(t, "a", m.buildAzureBlobPath("/a")) |
||||||
|
assert.Equal(t, "a/b", m.buildAzureBlobPath("/a/b/")) |
||||||
|
|
||||||
|
m = &AzureBlobStorage{cfg: &setting.AzureBlobStorageConfig{BasePath: "/"}} |
||||||
|
assert.Equal(t, "", m.buildAzureBlobPath("/")) |
||||||
|
assert.Equal(t, "", m.buildAzureBlobPath(".")) |
||||||
|
assert.Equal(t, "a", m.buildAzureBlobPath("/a")) |
||||||
|
assert.Equal(t, "a/b", m.buildAzureBlobPath("/a/b/")) |
||||||
|
|
||||||
|
m = &AzureBlobStorage{cfg: &setting.AzureBlobStorageConfig{BasePath: "/base"}} |
||||||
|
assert.Equal(t, "base", m.buildAzureBlobPath("/")) |
||||||
|
assert.Equal(t, "base", m.buildAzureBlobPath(".")) |
||||||
|
assert.Equal(t, "base/a", m.buildAzureBlobPath("/a")) |
||||||
|
assert.Equal(t, "base/a/b", m.buildAzureBlobPath("/a/b/")) |
||||||
|
|
||||||
|
m = &AzureBlobStorage{cfg: &setting.AzureBlobStorageConfig{BasePath: "/base/"}} |
||||||
|
assert.Equal(t, "base", m.buildAzureBlobPath("/")) |
||||||
|
assert.Equal(t, "base", m.buildAzureBlobPath(".")) |
||||||
|
assert.Equal(t, "base/a", m.buildAzureBlobPath("/a")) |
||||||
|
assert.Equal(t, "base/a/b", m.buildAzureBlobPath("/a/b/")) |
||||||
|
} |
Loading…
Reference in new issue