mirror of https://github.com/go-gitea/gitea
Implement actions artifacts (#22738)
Implement action artifacts server api. This change is used for supporting https://github.com/actions/upload-artifact and https://github.com/actions/download-artifact in gitea actions. It can run sample workflow from doc https://docs.github.com/en/actions/using-workflows/storing-workflow-data-as-artifacts. The api design is inspired by https://github.com/nektos/act/blob/master/pkg/artifacts/server.go and includes some changes from gitea internal structs and methods. Actions artifacts contains two parts: - Gitea server api and storage (this pr implement basic design without some complex cases supports) - Runner communicate with gitea server api (in comming) Old pr https://github.com/go-gitea/gitea/pull/22345 is outdated after actions merged. I create new pr from main branch. ![897f7694-3e0f-4f7c-bb4b-9936624ead45](https://user-images.githubusercontent.com/2142787/219382371-eb3cf810-e4e0-456b-a8ff-aecc2b1a1032.jpeg) Add artifacts list in actions workflow page.pull/24793/head^2
parent
7985cde84d
commit
c757765a9e
@ -0,0 +1,122 @@ |
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
// This artifact server is inspired by https://github.com/nektos/act/blob/master/pkg/artifacts/server.go.
|
||||
// It updates url setting and uses ObjectStore to handle artifacts persistence.
|
||||
|
||||
package actions |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
|
||||
"code.gitea.io/gitea/models/db" |
||||
"code.gitea.io/gitea/modules/timeutil" |
||||
"code.gitea.io/gitea/modules/util" |
||||
) |
||||
|
||||
const ( |
||||
// ArtifactStatusUploadPending is the status of an artifact upload that is pending
|
||||
ArtifactStatusUploadPending = 1 |
||||
// ArtifactStatusUploadConfirmed is the status of an artifact upload that is confirmed
|
||||
ArtifactStatusUploadConfirmed = 2 |
||||
// ArtifactStatusUploadError is the status of an artifact upload that is errored
|
||||
ArtifactStatusUploadError = 3 |
||||
) |
||||
|
||||
func init() { |
||||
db.RegisterModel(new(ActionArtifact)) |
||||
} |
||||
|
||||
// ActionArtifact is a file that is stored in the artifact storage.
|
||||
type ActionArtifact struct { |
||||
ID int64 `xorm:"pk autoincr"` |
||||
RunID int64 `xorm:"index UNIQUE(runid_name)"` // The run id of the artifact
|
||||
RunnerID int64 |
||||
RepoID int64 `xorm:"index"` |
||||
OwnerID int64 |
||||
CommitSHA string |
||||
StoragePath string // The path to the artifact in the storage
|
||||
FileSize int64 // The size of the artifact in bytes
|
||||
FileCompressedSize int64 // The size of the artifact in bytes after gzip compression
|
||||
ContentEncoding string // The content encoding of the artifact
|
||||
ArtifactPath string // The path to the artifact when runner uploads it
|
||||
ArtifactName string `xorm:"UNIQUE(runid_name)"` // The name of the artifact when runner uploads it
|
||||
Status int64 `xorm:"index"` // The status of the artifact, uploading, expired or need-delete
|
||||
CreatedUnix timeutil.TimeStamp `xorm:"created"` |
||||
UpdatedUnix timeutil.TimeStamp `xorm:"updated index"` |
||||
} |
||||
|
||||
// CreateArtifact create a new artifact with task info or get same named artifact in the same run
|
||||
func CreateArtifact(ctx context.Context, t *ActionTask, artifactName string) (*ActionArtifact, error) { |
||||
if err := t.LoadJob(ctx); err != nil { |
||||
return nil, err |
||||
} |
||||
artifact, err := getArtifactByArtifactName(ctx, t.Job.RunID, artifactName) |
||||
if errors.Is(err, util.ErrNotExist) { |
||||
artifact := &ActionArtifact{ |
||||
RunID: t.Job.RunID, |
||||
RunnerID: t.RunnerID, |
||||
RepoID: t.RepoID, |
||||
OwnerID: t.OwnerID, |
||||
CommitSHA: t.CommitSHA, |
||||
Status: ArtifactStatusUploadPending, |
||||
} |
||||
if _, err := db.GetEngine(ctx).Insert(artifact); err != nil { |
||||
return nil, err |
||||
} |
||||
return artifact, nil |
||||
} else if err != nil { |
||||
return nil, err |
||||
} |
||||
return artifact, nil |
||||
} |
||||
|
||||
func getArtifactByArtifactName(ctx context.Context, runID int64, name string) (*ActionArtifact, error) { |
||||
var art ActionArtifact |
||||
has, err := db.GetEngine(ctx).Where("run_id = ? AND artifact_name = ?", runID, name).Get(&art) |
||||
if err != nil { |
||||
return nil, err |
||||
} else if !has { |
||||
return nil, util.ErrNotExist |
||||
} |
||||
return &art, nil |
||||
} |
||||
|
||||
// GetArtifactByID returns an artifact by id
|
||||
func GetArtifactByID(ctx context.Context, id int64) (*ActionArtifact, error) { |
||||
var art ActionArtifact |
||||
has, err := db.GetEngine(ctx).ID(id).Get(&art) |
||||
if err != nil { |
||||
return nil, err |
||||
} else if !has { |
||||
return nil, util.ErrNotExist |
||||
} |
||||
|
||||
return &art, nil |
||||
} |
||||
|
||||
// UpdateArtifactByID updates an artifact by id
|
||||
func UpdateArtifactByID(ctx context.Context, id int64, art *ActionArtifact) error { |
||||
art.ID = id |
||||
_, err := db.GetEngine(ctx).ID(id).AllCols().Update(art) |
||||
return err |
||||
} |
||||
|
||||
// ListArtifactsByRunID returns all artifacts of a run
|
||||
func ListArtifactsByRunID(ctx context.Context, runID int64) ([]*ActionArtifact, error) { |
||||
arts := make([]*ActionArtifact, 0, 10) |
||||
return arts, db.GetEngine(ctx).Where("run_id=?", runID).Find(&arts) |
||||
} |
||||
|
||||
// ListUploadedArtifactsByRunID returns all uploaded artifacts of a run
|
||||
func ListUploadedArtifactsByRunID(ctx context.Context, runID int64) ([]*ActionArtifact, error) { |
||||
arts := make([]*ActionArtifact, 0, 10) |
||||
return arts, db.GetEngine(ctx).Where("run_id=? AND status=?", runID, ArtifactStatusUploadConfirmed).Find(&arts) |
||||
} |
||||
|
||||
// ListArtifactsByRepoID returns all artifacts of a repo
|
||||
func ListArtifactsByRepoID(ctx context.Context, repoID int64) ([]*ActionArtifact, error) { |
||||
arts := make([]*ActionArtifact, 0, 10) |
||||
return arts, db.GetEngine(ctx).Where("repo_id=?", repoID).Find(&arts) |
||||
} |
@ -0,0 +1,19 @@ |
||||
- |
||||
id: 791 |
||||
title: "update actions" |
||||
repo_id: 4 |
||||
owner_id: 1 |
||||
workflow_id: "artifact.yaml" |
||||
index: 187 |
||||
trigger_user_id: 1 |
||||
ref: "refs/heads/master" |
||||
commit_sha: "c2d72f548424103f01ee1dc02889c1e2bff816b0" |
||||
event: "push" |
||||
is_fork_pull_request: 0 |
||||
status: 1 |
||||
started: 1683636528 |
||||
stopped: 1683636626 |
||||
created: 1683636108 |
||||
updated: 1683636626 |
||||
need_approval: 0 |
||||
approved_by: 0 |
@ -0,0 +1,14 @@ |
||||
- |
||||
id: 192 |
||||
run_id: 791 |
||||
repo_id: 4 |
||||
owner_id: 1 |
||||
commit_sha: c2d72f548424103f01ee1dc02889c1e2bff816b0 |
||||
is_fork_pull_request: 0 |
||||
name: job_2 |
||||
attempt: 1 |
||||
job_id: job_2 |
||||
task_id: 47 |
||||
status: 1 |
||||
started: 1683636528 |
||||
stopped: 1683636626 |
@ -0,0 +1,20 @@ |
||||
- |
||||
id: 47 |
||||
job_id: 192 |
||||
attempt: 3 |
||||
runner_id: 1 |
||||
status: 6 # 6 is the status code for "running", running task can upload artifacts |
||||
started: 1683636528 |
||||
stopped: 1683636626 |
||||
repo_id: 4 |
||||
owner_id: 1 |
||||
commit_sha: c2d72f548424103f01ee1dc02889c1e2bff816b0 |
||||
is_fork_pull_request: 0 |
||||
token_hash: 6d8ef48297195edcc8e22c70b3020eaa06c52976db67d39b4260c64a69a2cc1508825121b7b8394e48e00b1bf8718b2a867e |
||||
token_salt: jVuKnSPGgy |
||||
token_last_eight: eeb1a71a |
||||
log_filename: artifact-test2/2f/47.log |
||||
log_in_storage: 1 |
||||
log_length: 707 |
||||
log_size: 90179 |
||||
log_expired: 0 |
@ -0,0 +1,33 @@ |
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package v1_20 //nolint
|
||||
|
||||
import ( |
||||
"code.gitea.io/gitea/modules/timeutil" |
||||
|
||||
"xorm.io/xorm" |
||||
) |
||||
|
||||
func CreateActionArtifactTable(x *xorm.Engine) error { |
||||
// ActionArtifact is a file that is stored in the artifact storage.
|
||||
type ActionArtifact struct { |
||||
ID int64 `xorm:"pk autoincr"` |
||||
RunID int64 `xorm:"index UNIQUE(runid_name)"` // The run id of the artifact
|
||||
RunnerID int64 |
||||
RepoID int64 `xorm:"index"` |
||||
OwnerID int64 |
||||
CommitSHA string |
||||
StoragePath string // The path to the artifact in the storage
|
||||
FileSize int64 // The size of the artifact in bytes
|
||||
FileCompressedSize int64 // The size of the artifact in bytes after gzip compression
|
||||
ContentEncoding string // The content encoding of the artifact
|
||||
ArtifactPath string // The path to the artifact when runner uploads it
|
||||
ArtifactName string `xorm:"UNIQUE(runid_name)"` // The name of the artifact when runner uploads it
|
||||
Status int64 `xorm:"index"` // The status of the artifact
|
||||
CreatedUnix timeutil.TimeStamp `xorm:"created"` |
||||
UpdatedUnix timeutil.TimeStamp `xorm:"updated index"` |
||||
} |
||||
|
||||
return x.Sync(new(ActionArtifact)) |
||||
} |
@ -0,0 +1,587 @@ |
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package actions |
||||
|
||||
// Github Actions Artifacts API Simple Description
|
||||
//
|
||||
// 1. Upload artifact
|
||||
// 1.1. Post upload url
|
||||
// Post: /api/actions_pipeline/_apis/pipelines/workflows/{run_id}/artifacts?api-version=6.0-preview
|
||||
// Request:
|
||||
// {
|
||||
// "Type": "actions_storage",
|
||||
// "Name": "artifact"
|
||||
// }
|
||||
// Response:
|
||||
// {
|
||||
// "fileContainerResourceUrl":"/api/actions_pipeline/_apis/pipelines/workflows/{run_id}/artifacts/{artifact_id}/upload"
|
||||
// }
|
||||
// it acquires an upload url for artifact upload
|
||||
// 1.2. Upload artifact
|
||||
// PUT: /api/actions_pipeline/_apis/pipelines/workflows/{run_id}/artifacts/{artifact_id}/upload?itemPath=artifact%2Ffilename
|
||||
// it upload chunk with headers:
|
||||
// x-tfs-filelength: 1024 // total file length
|
||||
// content-length: 1024 // chunk length
|
||||
// x-actions-results-md5: md5sum // md5sum of chunk
|
||||
// content-range: bytes 0-1023/1024 // chunk range
|
||||
// we save all chunks to one storage directory after md5sum check
|
||||
// 1.3. Confirm upload
|
||||
// PATCH: /api/actions_pipeline/_apis/pipelines/workflows/{run_id}/artifacts/{artifact_id}/upload?itemPath=artifact%2Ffilename
|
||||
// it confirm upload and merge all chunks to one file, save this file to storage
|
||||
//
|
||||
// 2. Download artifact
|
||||
// 2.1 list artifacts
|
||||
// GET: /api/actions_pipeline/_apis/pipelines/workflows/{run_id}/artifacts?api-version=6.0-preview
|
||||
// Response:
|
||||
// {
|
||||
// "count": 1,
|
||||
// "value": [
|
||||
// {
|
||||
// "name": "artifact",
|
||||
// "fileContainerResourceUrl": "/api/actions_pipeline/_apis/pipelines/workflows/{run_id}/artifacts/{artifact_id}/path"
|
||||
// }
|
||||
// ]
|
||||
// }
|
||||
// 2.2 download artifact
|
||||
// GET: /api/actions_pipeline/_apis/pipelines/workflows/{run_id}/artifacts/{artifact_id}/path?api-version=6.0-preview
|
||||
// Response:
|
||||
// {
|
||||
// "value": [
|
||||
// {
|
||||
// "contentLocation": "/api/actions_pipeline/_apis/pipelines/workflows/{run_id}/artifacts/{artifact_id}/download",
|
||||
// "path": "artifact/filename",
|
||||
// "itemType": "file"
|
||||
// }
|
||||
// ]
|
||||
// }
|
||||
// 2.3 download artifact file
|
||||
// GET: /api/actions_pipeline/_apis/pipelines/workflows/{run_id}/artifacts/{artifact_id}/download?itemPath=artifact%2Ffilename
|
||||
// Response:
|
||||
// download file
|
||||
//
|
||||
|
||||
import ( |
||||
"compress/gzip" |
||||
gocontext "context" |
||||
"crypto/md5" |
||||
"encoding/base64" |
||||
"errors" |
||||
"fmt" |
||||
"io" |
||||
"net/http" |
||||
"sort" |
||||
"strconv" |
||||
"strings" |
||||
"time" |
||||
|
||||
"code.gitea.io/gitea/models/actions" |
||||
"code.gitea.io/gitea/modules/context" |
||||
"code.gitea.io/gitea/modules/json" |
||||
"code.gitea.io/gitea/modules/log" |
||||
"code.gitea.io/gitea/modules/setting" |
||||
"code.gitea.io/gitea/modules/storage" |
||||
"code.gitea.io/gitea/modules/util" |
||||
"code.gitea.io/gitea/modules/web" |
||||
) |
||||
|
||||
const ( |
||||
artifactXTfsFileLengthHeader = "x-tfs-filelength" |
||||
artifactXActionsResultsMD5Header = "x-actions-results-md5" |
||||
) |
||||
|
||||
const artifactRouteBase = "/_apis/pipelines/workflows/{run_id}/artifacts" |
||||
|
||||
func ArtifactsRoutes(goctx gocontext.Context, prefix string) *web.Route { |
||||
m := web.NewRoute() |
||||
m.Use(withContexter(goctx)) |
||||
|
||||
r := artifactRoutes{ |
||||
prefix: prefix, |
||||
fs: storage.ActionsArtifacts, |
||||
} |
||||
|
||||
m.Group(artifactRouteBase, func() { |
||||
// retrieve, list and confirm artifacts
|
||||
m.Combo("").Get(r.listArtifacts).Post(r.getUploadArtifactURL).Patch(r.comfirmUploadArtifact) |
||||
// handle container artifacts list and download
|
||||
m.Group("/{artifact_id}", func() { |
||||
m.Put("/upload", r.uploadArtifact) |
||||
m.Get("/path", r.getDownloadArtifactURL) |
||||
m.Get("/download", r.downloadArtifact) |
||||
}) |
||||
}) |
||||
|
||||
return m |
||||
} |
||||
|
||||
// withContexter initializes a package context for a request.
|
||||
func withContexter(goctx gocontext.Context) func(next http.Handler) http.Handler { |
||||
return func(next http.Handler) http.Handler { |
||||
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { |
||||
ctx := context.Context{ |
||||
Resp: context.NewResponse(resp), |
||||
Data: map[string]interface{}{}, |
||||
} |
||||
defer ctx.Close() |
||||
|
||||
// action task call server api with Bearer ACTIONS_RUNTIME_TOKEN
|
||||
// we should verify the ACTIONS_RUNTIME_TOKEN
|
||||
authHeader := req.Header.Get("Authorization") |
||||
if len(authHeader) == 0 || !strings.HasPrefix(authHeader, "Bearer ") { |
||||
ctx.Error(http.StatusUnauthorized, "Bad authorization header") |
||||
return |
||||
} |
||||
authToken := strings.TrimPrefix(authHeader, "Bearer ") |
||||
task, err := actions.GetRunningTaskByToken(req.Context(), authToken) |
||||
if err != nil { |
||||
log.Error("Error runner api getting task: %v", err) |
||||
ctx.Error(http.StatusInternalServerError, "Error runner api getting task") |
||||
return |
||||
} |
||||
ctx.Data["task"] = task |
||||
|
||||
if err := task.LoadJob(goctx); err != nil { |
||||
log.Error("Error runner api getting job: %v", err) |
||||
ctx.Error(http.StatusInternalServerError, "Error runner api getting job") |
||||
return |
||||
} |
||||
|
||||
ctx.Req = context.WithContext(req, &ctx) |
||||
|
||||
next.ServeHTTP(ctx.Resp, ctx.Req) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
type artifactRoutes struct { |
||||
prefix string |
||||
fs storage.ObjectStorage |
||||
} |
||||
|
||||
func (ar artifactRoutes) buildArtifactURL(runID, artifactID int64, suffix string) string { |
||||
uploadURL := strings.TrimSuffix(setting.AppURL, "/") + strings.TrimSuffix(ar.prefix, "/") + |
||||
strings.ReplaceAll(artifactRouteBase, "{run_id}", strconv.FormatInt(runID, 10)) + |
||||
"/" + strconv.FormatInt(artifactID, 10) + "/" + suffix |
||||
return uploadURL |
||||
} |
||||
|
||||
type getUploadArtifactRequest struct { |
||||
Type string |
||||
Name string |
||||
} |
||||
|
||||
type getUploadArtifactResponse struct { |
||||
FileContainerResourceURL string `json:"fileContainerResourceUrl"` |
||||
} |
||||
|
||||
func (ar artifactRoutes) validateRunID(ctx *context.Context) (*actions.ActionTask, int64, bool) { |
||||
task, ok := ctx.Data["task"].(*actions.ActionTask) |
||||
if !ok { |
||||
log.Error("Error getting task in context") |
||||
ctx.Error(http.StatusInternalServerError, "Error getting task in context") |
||||
return nil, 0, false |
||||
} |
||||
runID := ctx.ParamsInt64("run_id") |
||||
if task.Job.RunID != runID { |
||||
log.Error("Error runID not match") |
||||
ctx.Error(http.StatusBadRequest, "run-id does not match") |
||||
return nil, 0, false |
||||
} |
||||
return task, runID, true |
||||
} |
||||
|
||||
// getUploadArtifactURL generates a URL for uploading an artifact
|
||||
func (ar artifactRoutes) getUploadArtifactURL(ctx *context.Context) { |
||||
task, runID, ok := ar.validateRunID(ctx) |
||||
if !ok { |
||||
return |
||||
} |
||||
|
||||
var req getUploadArtifactRequest |
||||
if err := json.NewDecoder(ctx.Req.Body).Decode(&req); err != nil { |
||||
log.Error("Error decode request body: %v", err) |
||||
ctx.Error(http.StatusInternalServerError, "Error decode request body") |
||||
return |
||||
} |
||||
|
||||
artifact, err := actions.CreateArtifact(ctx, task, req.Name) |
||||
if err != nil { |
||||
log.Error("Error creating artifact: %v", err) |
||||
ctx.Error(http.StatusInternalServerError, err.Error()) |
||||
return |
||||
} |
||||
resp := getUploadArtifactResponse{ |
||||
FileContainerResourceURL: ar.buildArtifactURL(runID, artifact.ID, "upload"), |
||||
} |
||||
log.Debug("[artifact] get upload url: %s, artifact id: %d", resp.FileContainerResourceURL, artifact.ID) |
||||
ctx.JSON(http.StatusOK, resp) |
||||
} |
||||
|
||||
// getUploadFileSize returns the size of the file to be uploaded.
|
||||
// The raw size is the size of the file as reported by the header X-TFS-FileLength.
|
||||
func (ar artifactRoutes) getUploadFileSize(ctx *context.Context) (int64, int64, error) { |
||||
contentLength := ctx.Req.ContentLength |
||||
xTfsLength, _ := strconv.ParseInt(ctx.Req.Header.Get(artifactXTfsFileLengthHeader), 10, 64) |
||||
if xTfsLength > 0 { |
||||
return xTfsLength, contentLength, nil |
||||
} |
||||
return contentLength, contentLength, nil |
||||
} |
||||
|
||||
func (ar artifactRoutes) saveUploadChunk(ctx *context.Context, |
||||
artifact *actions.ActionArtifact, |
||||
contentSize, runID int64, |
||||
) (int64, error) { |
||||
contentRange := ctx.Req.Header.Get("Content-Range") |
||||
start, end, length := int64(0), int64(0), int64(0) |
||||
if _, err := fmt.Sscanf(contentRange, "bytes %d-%d/%d", &start, &end, &length); err != nil { |
||||
return -1, fmt.Errorf("parse content range error: %v", err) |
||||
} |
||||
|
||||
storagePath := fmt.Sprintf("tmp%d/%d-%d-%d.chunk", runID, artifact.ID, start, end) |
||||
|
||||
// use io.TeeReader to avoid reading all body to md5 sum.
|
||||
// it writes data to hasher after reading end
|
||||
// if hash is not matched, delete the read-end result
|
||||
hasher := md5.New() |
||||
r := io.TeeReader(ctx.Req.Body, hasher) |
||||
|
||||
// save chunk to storage
|
||||
writtenSize, err := ar.fs.Save(storagePath, r, -1) |
||||
if err != nil { |
||||
return -1, fmt.Errorf("save chunk to storage error: %v", err) |
||||
} |
||||
|
||||
// check md5
|
||||
reqMd5String := ctx.Req.Header.Get(artifactXActionsResultsMD5Header) |
||||
chunkMd5String := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) |
||||
log.Debug("[artifact] check chunk md5, sum: %s, header: %s", chunkMd5String, reqMd5String) |
||||
if reqMd5String != chunkMd5String || writtenSize != contentSize { |
||||
if err := ar.fs.Delete(storagePath); err != nil { |
||||
log.Error("Error deleting chunk: %s, %v", storagePath, err) |
||||
} |
||||
return -1, fmt.Errorf("md5 not match") |
||||
} |
||||
|
||||
log.Debug("[artifact] save chunk %s, size: %d, artifact id: %d, start: %d, end: %d", |
||||
storagePath, contentSize, artifact.ID, start, end) |
||||
|
||||
return length, nil |
||||
} |
||||
|
||||
// The rules are from https://github.com/actions/toolkit/blob/main/packages/artifact/src/internal/path-and-artifact-name-validation.ts#L32
|
||||
var invalidArtifactNameChars = strings.Join([]string{"\\", "/", "\"", ":", "<", ">", "|", "*", "?", "\r", "\n"}, "") |
||||
|
||||
func (ar artifactRoutes) uploadArtifact(ctx *context.Context) { |
||||
_, runID, ok := ar.validateRunID(ctx) |
||||
if !ok { |
||||
return |
||||
} |
||||
artifactID := ctx.ParamsInt64("artifact_id") |
||||
|
||||
artifact, err := actions.GetArtifactByID(ctx, artifactID) |
||||
if errors.Is(err, util.ErrNotExist) { |
||||
log.Error("Error getting artifact: %v", err) |
||||
ctx.Error(http.StatusNotFound, err.Error()) |
||||
return |
||||
} else if err != nil { |
||||
log.Error("Error getting artifact: %v", err) |
||||
ctx.Error(http.StatusInternalServerError, err.Error()) |
||||
return |
||||
} |
||||
|
||||
// itemPath is generated from upload-artifact action
|
||||
// it's formatted as {artifact_name}/{artfict_path_in_runner}
|
||||
itemPath := util.PathJoinRel(ctx.Req.URL.Query().Get("itemPath")) |
||||
artifactName := strings.Split(itemPath, "/")[0] |
||||
|
||||
// checkArtifactName checks if the artifact name contains invalid characters.
|
||||
// If the name contains invalid characters, an error is returned.
|
||||
if strings.ContainsAny(artifactName, invalidArtifactNameChars) { |
||||
log.Error("Error checking artifact name contains invalid character") |
||||
ctx.Error(http.StatusBadRequest, err.Error()) |
||||
return |
||||
} |
||||
|
||||
// get upload file size
|
||||
fileSize, contentLength, err := ar.getUploadFileSize(ctx) |
||||
if err != nil { |
||||
log.Error("Error getting upload file size: %v", err) |
||||
ctx.Error(http.StatusInternalServerError, err.Error()) |
||||
return |
||||
} |
||||
|
||||
// save chunk
|
||||
chunkAllLength, err := ar.saveUploadChunk(ctx, artifact, contentLength, runID) |
||||
if err != nil { |
||||
log.Error("Error saving upload chunk: %v", err) |
||||
ctx.Error(http.StatusInternalServerError, err.Error()) |
||||
return |
||||
} |
||||
|
||||
// if artifact name is not set, update it
|
||||
if artifact.ArtifactName == "" { |
||||
artifact.ArtifactName = artifactName |
||||
artifact.ArtifactPath = itemPath // path in container
|
||||
artifact.FileSize = fileSize // this is total size of all chunks
|
||||
artifact.FileCompressedSize = chunkAllLength |
||||
artifact.ContentEncoding = ctx.Req.Header.Get("Content-Encoding") |
||||
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil { |
||||
log.Error("Error updating artifact: %v", err) |
||||
ctx.Error(http.StatusInternalServerError, err.Error()) |
||||
return |
||||
} |
||||
} |
||||
|
||||
ctx.JSON(http.StatusOK, map[string]string{ |
||||
"message": "success", |
||||
}) |
||||
} |
||||
|
||||
// comfirmUploadArtifact comfirm upload artifact.
|
||||
// if all chunks are uploaded, merge them to one file.
|
||||
func (ar artifactRoutes) comfirmUploadArtifact(ctx *context.Context) { |
||||
_, runID, ok := ar.validateRunID(ctx) |
||||
if !ok { |
||||
return |
||||
} |
||||
if err := ar.mergeArtifactChunks(ctx, runID); err != nil { |
||||
log.Error("Error merging chunks: %v", err) |
||||
ctx.Error(http.StatusInternalServerError, err.Error()) |
||||
return |
||||
} |
||||
|
||||
ctx.JSON(http.StatusOK, map[string]string{ |
||||
"message": "success", |
||||
}) |
||||
} |
||||
|
||||
type chunkItem struct { |
||||
ArtifactID int64 |
||||
Start int64 |
||||
End int64 |
||||
Path string |
||||
} |
||||
|
||||
func (ar artifactRoutes) mergeArtifactChunks(ctx *context.Context, runID int64) error { |
||||
storageDir := fmt.Sprintf("tmp%d", runID) |
||||
var chunks []*chunkItem |
||||
if err := ar.fs.IterateObjects(storageDir, func(path string, obj storage.Object) error { |
||||
item := chunkItem{Path: path} |
||||
if _, err := fmt.Sscanf(path, storageDir+"/%d-%d-%d.chunk", &item.ArtifactID, &item.Start, &item.End); err != nil { |
||||
return fmt.Errorf("parse content range error: %v", err) |
||||
} |
||||
chunks = append(chunks, &item) |
||||
return nil |
||||
}); err != nil { |
||||
return err |
||||
} |
||||
// group chunks by artifact id
|
||||
chunksMap := make(map[int64][]*chunkItem) |
||||
for _, c := range chunks { |
||||
chunksMap[c.ArtifactID] = append(chunksMap[c.ArtifactID], c) |
||||
} |
||||
|
||||
for artifactID, cs := range chunksMap { |
||||
// get artifact to handle merged chunks
|
||||
artifact, err := actions.GetArtifactByID(ctx, cs[0].ArtifactID) |
||||
if err != nil { |
||||
return fmt.Errorf("get artifact error: %v", err) |
||||
} |
||||
|
||||
sort.Slice(cs, func(i, j int) bool { |
||||
return cs[i].Start < cs[j].Start |
||||
}) |
||||
|
||||
allChunks := make([]*chunkItem, 0) |
||||
startAt := int64(-1) |
||||
// check if all chunks are uploaded and in order and clean repeated chunks
|
||||
for _, c := range cs { |
||||
// startAt is -1 means this is the first chunk
|
||||
// previous c.ChunkEnd + 1 == c.ChunkStart means this chunk is in order
|
||||
// StartAt is not -1 and c.ChunkStart is not startAt + 1 means there is a chunk missing
|
||||
if c.Start == (startAt + 1) { |
||||
allChunks = append(allChunks, c) |
||||
startAt = c.End |
||||
} |
||||
} |
||||
|
||||
// if the last chunk.End + 1 is not equal to chunk.ChunkLength, means chunks are not uploaded completely
|
||||
if startAt+1 != artifact.FileCompressedSize { |
||||
log.Debug("[artifact] chunks are not uploaded completely, artifact_id: %d", artifactID) |
||||
break |
||||
} |
||||
|
||||
// use multiReader
|
||||
readers := make([]io.Reader, 0, len(allChunks)) |
||||
readerClosers := make([]io.Closer, 0, len(allChunks)) |
||||
for _, c := range allChunks { |
||||
reader, err := ar.fs.Open(c.Path) |
||||
if err != nil { |
||||
return fmt.Errorf("open chunk error: %v, %s", err, c.Path) |
||||
} |
||||
readers = append(readers, reader) |
||||
readerClosers = append(readerClosers, reader) |
||||
} |
||||
mergedReader := io.MultiReader(readers...) |
||||
|
||||
// if chunk is gzip, decompress it
|
||||
if artifact.ContentEncoding == "gzip" { |
||||
var err error |
||||
mergedReader, err = gzip.NewReader(mergedReader) |
||||
if err != nil { |
||||
return fmt.Errorf("gzip reader error: %v", err) |
||||
} |
||||
} |
||||
|
||||
// save merged file
|
||||
storagePath := fmt.Sprintf("%d/%d/%d.chunk", runID%255, artifactID%255, time.Now().UnixNano()) |
||||
written, err := ar.fs.Save(storagePath, mergedReader, -1) |
||||
if err != nil { |
||||
return fmt.Errorf("save merged file error: %v", err) |
||||
} |
||||
if written != artifact.FileSize { |
||||
return fmt.Errorf("merged file size is not equal to chunk length") |
||||
} |
||||
|
||||
// close readers
|
||||
for _, r := range readerClosers { |
||||
r.Close() |
||||
} |
||||
|
||||
// save storage path to artifact
|
||||
log.Debug("[artifact] merge chunks to artifact: %d, %s", artifact.ID, storagePath) |
||||
artifact.StoragePath = storagePath |
||||
artifact.Status = actions.ArtifactStatusUploadConfirmed |
||||
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil { |
||||
return fmt.Errorf("update artifact error: %v", err) |
||||
} |
||||
|
||||
// drop chunks
|
||||
for _, c := range cs { |
||||
if err := ar.fs.Delete(c.Path); err != nil { |
||||
return fmt.Errorf("delete chunk file error: %v", err) |
||||
} |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
type ( |
||||
listArtifactsResponse struct { |
||||
Count int64 `json:"count"` |
||||
Value []listArtifactsResponseItem `json:"value"` |
||||
} |
||||
listArtifactsResponseItem struct { |
||||
Name string `json:"name"` |
||||
FileContainerResourceURL string `json:"fileContainerResourceUrl"` |
||||
} |
||||
) |
||||
|
||||
func (ar artifactRoutes) listArtifacts(ctx *context.Context) { |
||||
_, runID, ok := ar.validateRunID(ctx) |
||||
if !ok { |
||||
return |
||||
} |
||||
|
||||
artficats, err := actions.ListArtifactsByRunID(ctx, runID) |
||||
if err != nil { |
||||
log.Error("Error getting artifacts: %v", err) |
||||
ctx.Error(http.StatusInternalServerError, err.Error()) |
||||
return |
||||
} |
||||
|
||||
artficatsData := make([]listArtifactsResponseItem, 0, len(artficats)) |
||||
for _, a := range artficats { |
||||
artficatsData = append(artficatsData, listArtifactsResponseItem{ |
||||
Name: a.ArtifactName, |
||||
FileContainerResourceURL: ar.buildArtifactURL(runID, a.ID, "path"), |
||||
}) |
||||
} |
||||
respData := listArtifactsResponse{ |
||||
Count: int64(len(artficatsData)), |
||||
Value: artficatsData, |
||||
} |
||||
ctx.JSON(http.StatusOK, respData) |
||||
} |
||||
|
||||
type ( |
||||
downloadArtifactResponse struct { |
||||
Value []downloadArtifactResponseItem `json:"value"` |
||||
} |
||||
downloadArtifactResponseItem struct { |
||||
Path string `json:"path"` |
||||
ItemType string `json:"itemType"` |
||||
ContentLocation string `json:"contentLocation"` |
||||
} |
||||
) |
||||
|
||||
func (ar artifactRoutes) getDownloadArtifactURL(ctx *context.Context) { |
||||
_, runID, ok := ar.validateRunID(ctx) |
||||
if !ok { |
||||
return |
||||
} |
||||
|
||||
artifactID := ctx.ParamsInt64("artifact_id") |
||||
artifact, err := actions.GetArtifactByID(ctx, artifactID) |
||||
if errors.Is(err, util.ErrNotExist) { |
||||
log.Error("Error getting artifact: %v", err) |
||||
ctx.Error(http.StatusNotFound, err.Error()) |
||||
return |
||||
} else if err != nil { |
||||
log.Error("Error getting artifact: %v", err) |
||||
ctx.Error(http.StatusInternalServerError, err.Error()) |
||||
return |
||||
} |
||||
downloadURL := ar.buildArtifactURL(runID, artifact.ID, "download") |
||||
itemPath := util.PathJoinRel(ctx.Req.URL.Query().Get("itemPath")) |
||||
respData := downloadArtifactResponse{ |
||||
Value: []downloadArtifactResponseItem{{ |
||||
Path: util.PathJoinRel(itemPath, artifact.ArtifactPath), |
||||
ItemType: "file", |
||||
ContentLocation: downloadURL, |
||||
}}, |
||||
} |
||||
ctx.JSON(http.StatusOK, respData) |
||||
} |
||||
|
||||
func (ar artifactRoutes) downloadArtifact(ctx *context.Context) { |
||||
_, runID, ok := ar.validateRunID(ctx) |
||||
if !ok { |
||||
return |
||||
} |
||||
|
||||
artifactID := ctx.ParamsInt64("artifact_id") |
||||
artifact, err := actions.GetArtifactByID(ctx, artifactID) |
||||
if errors.Is(err, util.ErrNotExist) { |
||||
log.Error("Error getting artifact: %v", err) |
||||
ctx.Error(http.StatusNotFound, err.Error()) |
||||
return |
||||
} else if err != nil { |
||||
log.Error("Error getting artifact: %v", err) |
||||
ctx.Error(http.StatusInternalServerError, err.Error()) |
||||
return |
||||
} |
||||
if artifact.RunID != runID { |
||||
log.Error("Error dismatch runID and artifactID, task: %v, artifact: %v", runID, artifactID) |
||||
ctx.Error(http.StatusBadRequest, err.Error()) |
||||
return |
||||
} |
||||
|
||||
fd, err := ar.fs.Open(artifact.StoragePath) |
||||
if err != nil { |
||||
log.Error("Error opening file: %v", err) |
||||
ctx.Error(http.StatusInternalServerError, err.Error()) |
||||
return |
||||
} |
||||
defer fd.Close() |
||||
|
||||
if strings.HasSuffix(artifact.ArtifactPath, ".gz") { |
||||
ctx.Resp.Header().Set("Content-Encoding", "gzip") |
||||
} |
||||
ctx.ServeContent(fd, &context.ServeHeaderOptions{ |
||||
Filename: artifact.ArtifactName, |
||||
LastModified: artifact.CreatedUnix.AsLocalTime(), |
||||
}) |
||||
} |
@ -0,0 +1,143 @@ |
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package integration |
||||
|
||||
import ( |
||||
"net/http" |
||||
"strings" |
||||
"testing" |
||||
|
||||
"code.gitea.io/gitea/tests" |
||||
|
||||
"github.com/stretchr/testify/assert" |
||||
) |
||||
|
||||
func TestActionsArtifactUpload(t *testing.T) { |
||||
defer tests.PrepareTestEnv(t)() |
||||
|
||||
type uploadArtifactResponse struct { |
||||
FileContainerResourceURL string `json:"fileContainerResourceUrl"` |
||||
} |
||||
|
||||
type getUploadArtifactRequest struct { |
||||
Type string |
||||
Name string |
||||
} |
||||
|
||||
// acquire artifact upload url
|
||||
req := NewRequestWithJSON(t, "POST", "/api/actions_pipeline/_apis/pipelines/workflows/791/artifacts", getUploadArtifactRequest{ |
||||
Type: "actions_storage", |
||||
Name: "artifact", |
||||
}) |
||||
req = addTokenAuthHeader(req, "Bearer 8061e833a55f6fc0157c98b883e91fcfeeb1a71a") |
||||
resp := MakeRequest(t, req, http.StatusOK) |
||||
var uploadResp uploadArtifactResponse |
||||
DecodeJSON(t, resp, &uploadResp) |
||||
assert.Contains(t, uploadResp.FileContainerResourceURL, "/api/actions_pipeline/_apis/pipelines/workflows/791/artifacts") |
||||
|
||||
// get upload url
|
||||
idx := strings.Index(uploadResp.FileContainerResourceURL, "/api/actions_pipeline/_apis/pipelines/") |
||||
url := uploadResp.FileContainerResourceURL[idx:] + "?itemPath=artifact/abc.txt" |
||||
|
||||
// upload artifact chunk
|
||||
body := strings.Repeat("A", 1024) |
||||
req = NewRequestWithBody(t, "PUT", url, strings.NewReader(body)) |
||||
req = addTokenAuthHeader(req, "Bearer 8061e833a55f6fc0157c98b883e91fcfeeb1a71a") |
||||
req.Header.Add("Content-Range", "bytes 0-1023/1024") |
||||
req.Header.Add("x-tfs-filelength", "1024") |
||||
req.Header.Add("x-actions-results-md5", "1HsSe8LeLWh93ILaw1TEFQ==") // base64(md5(body))
|
||||
MakeRequest(t, req, http.StatusOK) |
||||
|
||||
t.Logf("Create artifact confirm") |
||||
|
||||
// confirm artifact upload
|
||||
req = NewRequest(t, "PATCH", "/api/actions_pipeline/_apis/pipelines/workflows/791/artifacts") |
||||
req = addTokenAuthHeader(req, "Bearer 8061e833a55f6fc0157c98b883e91fcfeeb1a71a") |
||||
MakeRequest(t, req, http.StatusOK) |
||||
} |
||||
|
||||
func TestActionsArtifactUploadNotExist(t *testing.T) { |
||||
defer tests.PrepareTestEnv(t)() |
||||
|
||||
// artifact id 54321 not exist
|
||||
url := "/api/actions_pipeline/_apis/pipelines/workflows/791/artifacts/54321/upload?itemPath=artifact/abc.txt" |
||||
body := strings.Repeat("A", 1024) |
||||
req := NewRequestWithBody(t, "PUT", url, strings.NewReader(body)) |
||||
req = addTokenAuthHeader(req, "Bearer 8061e833a55f6fc0157c98b883e91fcfeeb1a71a") |
||||
req.Header.Add("Content-Range", "bytes 0-1023/1024") |
||||
req.Header.Add("x-tfs-filelength", "1024") |
||||
req.Header.Add("x-actions-results-md5", "1HsSe8LeLWh93ILaw1TEFQ==") // base64(md5(body))
|
||||
MakeRequest(t, req, http.StatusNotFound) |
||||
} |
||||
|
||||
func TestActionsArtifactConfirmUpload(t *testing.T) { |
||||
defer tests.PrepareTestEnv(t)() |
||||
|
||||
req := NewRequest(t, "PATCH", "/api/actions_pipeline/_apis/pipelines/workflows/791/artifacts") |
||||
req = addTokenAuthHeader(req, "Bearer 8061e833a55f6fc0157c98b883e91fcfeeb1a71a") |
||||
resp := MakeRequest(t, req, http.StatusOK) |
||||
assert.Contains(t, resp.Body.String(), "success") |
||||
} |
||||
|
||||
func TestActionsArtifactUploadWithoutToken(t *testing.T) { |
||||
defer tests.PrepareTestEnv(t)() |
||||
|
||||
req := NewRequestWithJSON(t, "POST", "/api/actions_pipeline/_apis/pipelines/workflows/1/artifacts", nil) |
||||
MakeRequest(t, req, http.StatusUnauthorized) |
||||
} |
||||
|
||||
func TestActionsArtifactDownload(t *testing.T) { |
||||
defer tests.PrepareTestEnv(t)() |
||||
|
||||
type ( |
||||
listArtifactsResponseItem struct { |
||||
Name string `json:"name"` |
||||
FileContainerResourceURL string `json:"fileContainerResourceUrl"` |
||||
} |
||||
listArtifactsResponse struct { |
||||
Count int64 `json:"count"` |
||||
Value []listArtifactsResponseItem `json:"value"` |
||||
} |
||||
) |
||||
|
||||
req := NewRequest(t, "GET", "/api/actions_pipeline/_apis/pipelines/workflows/791/artifacts") |
||||
req = addTokenAuthHeader(req, "Bearer 8061e833a55f6fc0157c98b883e91fcfeeb1a71a") |
||||
resp := MakeRequest(t, req, http.StatusOK) |
||||
var listResp listArtifactsResponse |
||||
DecodeJSON(t, resp, &listResp) |
||||
assert.Equal(t, int64(1), listResp.Count) |
||||
assert.Equal(t, "artifact", listResp.Value[0].Name) |
||||
assert.Contains(t, listResp.Value[0].FileContainerResourceURL, "/api/actions_pipeline/_apis/pipelines/workflows/791/artifacts") |
||||
|
||||
type ( |
||||
downloadArtifactResponseItem struct { |
||||
Path string `json:"path"` |
||||
ItemType string `json:"itemType"` |
||||
ContentLocation string `json:"contentLocation"` |
||||
} |
||||
downloadArtifactResponse struct { |
||||
Value []downloadArtifactResponseItem `json:"value"` |
||||
} |
||||
) |
||||
|
||||
idx := strings.Index(listResp.Value[0].FileContainerResourceURL, "/api/actions_pipeline/_apis/pipelines/") |
||||
url := listResp.Value[0].FileContainerResourceURL[idx+1:] |
||||
req = NewRequest(t, "GET", url) |
||||
req = addTokenAuthHeader(req, "Bearer 8061e833a55f6fc0157c98b883e91fcfeeb1a71a") |
||||
resp = MakeRequest(t, req, http.StatusOK) |
||||
var downloadResp downloadArtifactResponse |
||||
DecodeJSON(t, resp, &downloadResp) |
||||
assert.Len(t, downloadResp.Value, 1) |
||||
assert.Equal(t, "artifact/abc.txt", downloadResp.Value[0].Path) |
||||
assert.Equal(t, "file", downloadResp.Value[0].ItemType) |
||||
assert.Contains(t, downloadResp.Value[0].ContentLocation, "/api/actions_pipeline/_apis/pipelines/workflows/791/artifacts") |
||||
|
||||
idx = strings.Index(downloadResp.Value[0].ContentLocation, "/api/actions_pipeline/_apis/pipelines/") |
||||
url = downloadResp.Value[0].ContentLocation[idx:] |
||||
req = NewRequest(t, "GET", url) |
||||
req = addTokenAuthHeader(req, "Bearer 8061e833a55f6fc0157c98b883e91fcfeeb1a71a") |
||||
resp = MakeRequest(t, req, http.StatusOK) |
||||
body := strings.Repeat("A", 1024) |
||||
assert.Equal(t, resp.Body.String(), body) |
||||
} |
Loading…
Reference in new issue