mirror of https://github.com/go-gitea/gitea
Actions Artifacts support uploading multiple files and directories (#24874)
current actions artifacts implementation only support single file artifact. To support multiple files uploading, it needs: - save each file to each db record with same run-id, same artifact-name and proper artifact-path - need change artifact uploading url without artifact-id, multiple files creates multiple artifact-ids - support `path` in download-artifact action. artifact should download to `{path}/{artifact-path}`. - in repo action view, it provides zip download link in artifacts list in summary page, no matter this artifact contains single or multiple files.pull/26026/head^2
parent
3acaaa29dd
commit
f3d293d2bb
@ -0,0 +1,19 @@ |
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package v1_21 //nolint
|
||||
|
||||
import ( |
||||
"xorm.io/xorm" |
||||
) |
||||
|
||||
func AlterActionArtifactTable(x *xorm.Engine) error { |
||||
// ActionArtifact is a file that is stored in the artifact storage.
|
||||
type ActionArtifact struct { |
||||
RunID int64 `xorm:"index unique(runid_name_path)"` // The run id of the artifact
|
||||
ArtifactPath string `xorm:"index unique(runid_name_path)"` // The path to the artifact when runner uploads it
|
||||
ArtifactName string `xorm:"index unique(runid_name_path)"` // The name of the artifact when
|
||||
} |
||||
|
||||
return x.Sync(new(ActionArtifact)) |
||||
} |
@ -0,0 +1,187 @@ |
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package actions |
||||
|
||||
import ( |
||||
"crypto/md5" |
||||
"encoding/base64" |
||||
"fmt" |
||||
"io" |
||||
"sort" |
||||
"time" |
||||
|
||||
"code.gitea.io/gitea/models/actions" |
||||
"code.gitea.io/gitea/modules/log" |
||||
"code.gitea.io/gitea/modules/storage" |
||||
) |
||||
|
||||
func saveUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext, |
||||
artifact *actions.ActionArtifact, |
||||
contentSize, runID int64, |
||||
) (int64, error) { |
||||
// parse content-range header, format: bytes 0-1023/146515
|
||||
contentRange := ctx.Req.Header.Get("Content-Range") |
||||
start, end, length := int64(0), int64(0), int64(0) |
||||
if _, err := fmt.Sscanf(contentRange, "bytes %d-%d/%d", &start, &end, &length); err != nil { |
||||
return -1, fmt.Errorf("parse content range error: %v", err) |
||||
} |
||||
// build chunk store path
|
||||
storagePath := fmt.Sprintf("tmp%d/%d-%d-%d.chunk", runID, artifact.ID, start, end) |
||||
// use io.TeeReader to avoid reading all body to md5 sum.
|
||||
// it writes data to hasher after reading end
|
||||
// if hash is not matched, delete the read-end result
|
||||
hasher := md5.New() |
||||
r := io.TeeReader(ctx.Req.Body, hasher) |
||||
// save chunk to storage
|
||||
writtenSize, err := st.Save(storagePath, r, -1) |
||||
if err != nil { |
||||
return -1, fmt.Errorf("save chunk to storage error: %v", err) |
||||
} |
||||
// check md5
|
||||
reqMd5String := ctx.Req.Header.Get(artifactXActionsResultsMD5Header) |
||||
chunkMd5String := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) |
||||
log.Info("[artifact] check chunk md5, sum: %s, header: %s", chunkMd5String, reqMd5String) |
||||
// if md5 not match, delete the chunk
|
||||
if reqMd5String != chunkMd5String || writtenSize != contentSize { |
||||
if err := st.Delete(storagePath); err != nil { |
||||
log.Error("Error deleting chunk: %s, %v", storagePath, err) |
||||
} |
||||
return -1, fmt.Errorf("md5 not match") |
||||
} |
||||
log.Info("[artifact] save chunk %s, size: %d, artifact id: %d, start: %d, end: %d", |
||||
storagePath, contentSize, artifact.ID, start, end) |
||||
// return chunk total size
|
||||
return length, nil |
||||
} |
||||
|
||||
type chunkFileItem struct { |
||||
ArtifactID int64 |
||||
Start int64 |
||||
End int64 |
||||
Path string |
||||
} |
||||
|
||||
func listChunksByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chunkFileItem, error) { |
||||
storageDir := fmt.Sprintf("tmp%d", runID) |
||||
var chunks []*chunkFileItem |
||||
if err := st.IterateObjects(storageDir, func(path string, obj storage.Object) error { |
||||
item := chunkFileItem{Path: path} |
||||
if _, err := fmt.Sscanf(path, storageDir+"/%d-%d-%d.chunk", &item.ArtifactID, &item.Start, &item.End); err != nil { |
||||
return fmt.Errorf("parse content range error: %v", err) |
||||
} |
||||
chunks = append(chunks, &item) |
||||
return nil |
||||
}); err != nil { |
||||
return nil, err |
||||
} |
||||
// chunks group by artifact id
|
||||
chunksMap := make(map[int64][]*chunkFileItem) |
||||
for _, c := range chunks { |
||||
chunksMap[c.ArtifactID] = append(chunksMap[c.ArtifactID], c) |
||||
} |
||||
return chunksMap, nil |
||||
} |
||||
|
||||
func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int64, artifactName string) error { |
||||
// read all db artifacts by name
|
||||
artifacts, err := actions.ListArtifactsByRunIDAndName(ctx, runID, artifactName) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
// read all uploading chunks from storage
|
||||
chunksMap, err := listChunksByRunID(st, runID) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
// range db artifacts to merge chunks
|
||||
for _, art := range artifacts { |
||||
chunks, ok := chunksMap[art.ID] |
||||
if !ok { |
||||
log.Debug("artifact %d chunks not found", art.ID) |
||||
continue |
||||
} |
||||
if err := mergeChunksForArtifact(ctx, chunks, st, art); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st storage.ObjectStorage, artifact *actions.ActionArtifact) error { |
||||
sort.Slice(chunks, func(i, j int) bool { |
||||
return chunks[i].Start < chunks[j].Start |
||||
}) |
||||
allChunks := make([]*chunkFileItem, 0) |
||||
startAt := int64(-1) |
||||
// check if all chunks are uploaded and in order and clean repeated chunks
|
||||
for _, c := range chunks { |
||||
// startAt is -1 means this is the first chunk
|
||||
// previous c.ChunkEnd + 1 == c.ChunkStart means this chunk is in order
|
||||
// StartAt is not -1 and c.ChunkStart is not startAt + 1 means there is a chunk missing
|
||||
if c.Start == (startAt + 1) { |
||||
allChunks = append(allChunks, c) |
||||
startAt = c.End |
||||
} |
||||
} |
||||
// if the last chunk.End + 1 is not equal to chunk.ChunkLength, means chunks are not uploaded completely
|
||||
if startAt+1 != artifact.FileCompressedSize { |
||||
log.Debug("[artifact] chunks are not uploaded completely, artifact_id: %d", artifact.ID) |
||||
return nil |
||||
} |
||||
// use multiReader
|
||||
readers := make([]io.Reader, 0, len(allChunks)) |
||||
closeReaders := func() { |
||||
for _, r := range readers { |
||||
_ = r.(io.Closer).Close() // it guarantees to be io.Closer by the following loop's Open function
|
||||
} |
||||
readers = nil |
||||
} |
||||
defer closeReaders() |
||||
for _, c := range allChunks { |
||||
var readCloser io.ReadCloser |
||||
var err error |
||||
if readCloser, err = st.Open(c.Path); err != nil { |
||||
return fmt.Errorf("open chunk error: %v, %s", err, c.Path) |
||||
} |
||||
readers = append(readers, readCloser) |
||||
} |
||||
mergedReader := io.MultiReader(readers...) |
||||
|
||||
// if chunk is gzip, use gz as extension
|
||||
// download-artifact action will use content-encoding header to decide if it should decompress the file
|
||||
extension := "chunk" |
||||
if artifact.ContentEncoding == "gzip" { |
||||
extension = "chunk.gz" |
||||
} |
||||
|
||||
// save merged file
|
||||
storagePath := fmt.Sprintf("%d/%d/%d.%s", artifact.RunID%255, artifact.ID%255, time.Now().UnixNano(), extension) |
||||
written, err := st.Save(storagePath, mergedReader, -1) |
||||
if err != nil { |
||||
return fmt.Errorf("save merged file error: %v", err) |
||||
} |
||||
if written != artifact.FileCompressedSize { |
||||
return fmt.Errorf("merged file size is not equal to chunk length") |
||||
} |
||||
|
||||
defer func() { |
||||
closeReaders() // close before delete
|
||||
// drop chunks
|
||||
for _, c := range chunks { |
||||
if err := st.Delete(c.Path); err != nil { |
||||
log.Warn("Error deleting chunk: %s, %v", c.Path, err) |
||||
} |
||||
} |
||||
}() |
||||
|
||||
// save storage path to artifact
|
||||
log.Debug("[artifact] merge chunks to artifact: %d, %s", artifact.ID, storagePath) |
||||
artifact.StoragePath = storagePath |
||||
artifact.Status = actions.ArtifactStatusUploadConfirmed |
||||
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil { |
||||
return fmt.Errorf("update artifact error: %v", err) |
||||
} |
||||
|
||||
return nil |
||||
} |
@ -0,0 +1,82 @@ |
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package actions |
||||
|
||||
import ( |
||||
"crypto/md5" |
||||
"fmt" |
||||
"net/http" |
||||
"strconv" |
||||
"strings" |
||||
|
||||
"code.gitea.io/gitea/models/actions" |
||||
"code.gitea.io/gitea/modules/log" |
||||
"code.gitea.io/gitea/modules/util" |
||||
) |
||||
|
||||
const ( |
||||
artifactXTfsFileLengthHeader = "x-tfs-filelength" |
||||
artifactXActionsResultsMD5Header = "x-actions-results-md5" |
||||
) |
||||
|
||||
// The rules are from https://github.com/actions/toolkit/blob/main/packages/artifact/src/internal/path-and-artifact-name-validation.ts#L32
|
||||
var invalidArtifactNameChars = strings.Join([]string{"\\", "/", "\"", ":", "<", ">", "|", "*", "?", "\r", "\n"}, "") |
||||
|
||||
func validateArtifactName(ctx *ArtifactContext, artifactName string) bool { |
||||
if strings.ContainsAny(artifactName, invalidArtifactNameChars) { |
||||
log.Error("Error checking artifact name contains invalid character") |
||||
ctx.Error(http.StatusBadRequest, "Error checking artifact name contains invalid character") |
||||
return false |
||||
} |
||||
return true |
||||
} |
||||
|
||||
func validateRunID(ctx *ArtifactContext) (*actions.ActionTask, int64, bool) { |
||||
task := ctx.ActionTask |
||||
runID := ctx.ParamsInt64("run_id") |
||||
if task.Job.RunID != runID { |
||||
log.Error("Error runID not match") |
||||
ctx.Error(http.StatusBadRequest, "run-id does not match") |
||||
return nil, 0, false |
||||
} |
||||
return task, runID, true |
||||
} |
||||
|
||||
func validateArtifactHash(ctx *ArtifactContext, artifactName string) bool { |
||||
paramHash := ctx.Params("artifact_hash") |
||||
// use artifact name to create upload url
|
||||
artifactHash := fmt.Sprintf("%x", md5.Sum([]byte(artifactName))) |
||||
if paramHash == artifactHash { |
||||
return true |
||||
} |
||||
log.Error("Invalid artifact hash: %s", paramHash) |
||||
ctx.Error(http.StatusBadRequest, "Invalid artifact hash") |
||||
return false |
||||
} |
||||
|
||||
func parseArtifactItemPath(ctx *ArtifactContext) (string, string, bool) { |
||||
// itemPath is generated from upload-artifact action
|
||||
// it's formatted as {artifact_name}/{artfict_path_in_runner}
|
||||
itemPath := util.PathJoinRel(ctx.Req.URL.Query().Get("itemPath")) |
||||
artifactName := strings.Split(itemPath, "/")[0] |
||||
artifactPath := strings.TrimPrefix(itemPath, artifactName+"/") |
||||
if !validateArtifactHash(ctx, artifactName) { |
||||
return "", "", false |
||||
} |
||||
if !validateArtifactName(ctx, artifactName) { |
||||
return "", "", false |
||||
} |
||||
return artifactName, artifactPath, true |
||||
} |
||||
|
||||
// getUploadFileSize returns the size of the file to be uploaded.
|
||||
// The raw size is the size of the file as reported by the header X-TFS-FileLength.
|
||||
func getUploadFileSize(ctx *ArtifactContext) (int64, int64, error) { |
||||
contentLength := ctx.Req.ContentLength |
||||
xTfsLength, _ := strconv.ParseInt(ctx.Req.Header.Get(artifactXTfsFileLengthHeader), 10, 64) |
||||
if xTfsLength > 0 { |
||||
return xTfsLength, contentLength, nil |
||||
} |
||||
return contentLength, contentLength, nil |
||||
} |
Loading…
Reference in new issue