Fix schedule tasks bugs (#28691)

Fix #28157 

This PR fix the possible bugs about actions schedule.

## The Changes

- Move `UpdateRepositoryUnit` and `SetRepoDefaultBranch` from models to
service layer
- Remove schedules plan from database and cancel waiting & running
schedules tasks in this repository when actions unit has been disabled
or global disabled.
- Remove schedules plan from database and cancel waiting & running
schedules tasks in this repository when default branch changed.
pull/28546/head^2
Lunny Xiao 10 months ago committed by GitHub
parent 6c6823935c
commit 97292da960
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      models/actions/run.go
  2. 5
      models/actions/run_list.go
  3. 20
      models/actions/schedule.go
  4. 4
      models/git/branch.go
  5. 3
      models/git/branch_test.go
  6. 26
      models/repo/repo_unit.go
  7. 4
      modules/actions/github.go
  8. 24
      modules/actions/workflows.go
  9. 7
      modules/actions/workflows_test.go
  10. 1
      modules/webhook/type.go
  11. 2
      routers/api/v1/repo/repo.go
  12. 26
      routers/web/repo/setting/default_branch.go
  13. 2
      routers/web/repo/setting/setting.go
  14. 35
      services/actions/notifier_helper.go
  15. 2
      services/actions/schedule_tasks.go
  16. 66
      services/repository/branch.go
  17. 47
      services/repository/setting.go
  18. 3
      services/wiki/wiki.go
  19. 4
      tests/integration/actions_trigger_test.go

@ -168,13 +168,14 @@ func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) err
}
// CancelRunningJobs cancels all running and waiting jobs associated with a specific workflow.
func CancelRunningJobs(ctx context.Context, repoID int64, ref, workflowID string) error {
func CancelRunningJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
// Find all runs in the specified repository, reference, and workflow with statuses 'Running' or 'Waiting'.
runs, total, err := db.FindAndCount[ActionRun](ctx, FindRunOptions{
RepoID: repoID,
Ref: ref,
WorkflowID: workflowID,
Status: []Status{StatusRunning, StatusWaiting},
RepoID: repoID,
Ref: ref,
WorkflowID: workflowID,
TriggerEvent: event,
Status: []Status{StatusRunning, StatusWaiting},
})
if err != nil {
return err

@ -10,6 +10,7 @@ import (
repo_model "code.gitea.io/gitea/models/repo"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/container"
webhook_module "code.gitea.io/gitea/modules/webhook"
"xorm.io/builder"
)
@ -71,6 +72,7 @@ type FindRunOptions struct {
WorkflowID string
Ref string // the commit/tag/… that caused this workflow
TriggerUserID int64
TriggerEvent webhook_module.HookEventType
Approved bool // not util.OptionalBool, it works only when it's true
Status []Status
}
@ -98,6 +100,9 @@ func (opts FindRunOptions) ToConds() builder.Cond {
if opts.Ref != "" {
cond = cond.And(builder.Eq{"ref": opts.Ref})
}
if opts.TriggerEvent != "" {
cond = cond.And(builder.Eq{"trigger_event": opts.TriggerEvent})
}
return cond
}

@ -5,6 +5,7 @@ package actions
import (
"context"
"fmt"
"time"
"code.gitea.io/gitea/models/db"
@ -118,3 +119,22 @@ func DeleteScheduleTaskByRepo(ctx context.Context, id int64) error {
return committer.Commit()
}
func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository) error {
// If actions disabled when there is schedule task, this will remove the outdated schedule tasks
// There is no other place we can do this because the app.ini will be changed manually
if err := DeleteScheduleTaskByRepo(ctx, repo.ID); err != nil {
return fmt.Errorf("DeleteCronTaskByRepo: %v", err)
}
// cancel running cron jobs of this repository and delete old schedules
if err := CancelRunningJobs(
ctx,
repo.ID,
repo.DefaultBranch,
"",
webhook_module.HookEventSchedule,
); err != nil {
return fmt.Errorf("CancelRunningJobs: %v", err)
}
return nil
}

@ -283,7 +283,7 @@ func FindRenamedBranch(ctx context.Context, repoID int64, from string) (branch *
}
// RenameBranch rename a branch
func RenameBranch(ctx context.Context, repo *repo_model.Repository, from, to string, gitAction func(isDefault bool) error) (err error) {
func RenameBranch(ctx context.Context, repo *repo_model.Repository, from, to string, gitAction func(ctx context.Context, isDefault bool) error) (err error) {
ctx, committer, err := db.TxContext(ctx)
if err != nil {
return err
@ -358,7 +358,7 @@ func RenameBranch(ctx context.Context, repo *repo_model.Repository, from, to str
}
// 5. do git action
if err = gitAction(isDefault); err != nil {
if err = gitAction(ctx, isDefault); err != nil {
return err
}

@ -4,6 +4,7 @@
package git_test
import (
"context"
"testing"
"code.gitea.io/gitea/models/db"
@ -132,7 +133,7 @@ func TestRenameBranch(t *testing.T) {
}, git_model.WhitelistOptions{}))
assert.NoError(t, committer.Commit())
assert.NoError(t, git_model.RenameBranch(db.DefaultContext, repo1, "master", "main", func(isDefault bool) error {
assert.NoError(t, git_model.RenameBranch(db.DefaultContext, repo1, "master", "main", func(ctx context.Context, isDefault bool) error {
_isDefault = isDefault
return nil
}))

@ -283,29 +283,3 @@ func UpdateRepoUnit(ctx context.Context, unit *RepoUnit) error {
_, err := db.GetEngine(ctx).ID(unit.ID).Update(unit)
return err
}
// UpdateRepositoryUnits updates a repository's units
func UpdateRepositoryUnits(ctx context.Context, repo *Repository, units []RepoUnit, deleteUnitTypes []unit.Type) (err error) {
ctx, committer, err := db.TxContext(ctx)
if err != nil {
return err
}
defer committer.Close()
// Delete existing settings of units before adding again
for _, u := range units {
deleteUnitTypes = append(deleteUnitTypes, u.Type)
}
if _, err = db.GetEngine(ctx).Where("repo_id = ?", repo.ID).In("type", deleteUnitTypes).Delete(new(RepoUnit)); err != nil {
return err
}
if len(units) > 0 {
if err = db.Insert(ctx, units); err != nil {
return err
}
}
return committer.Commit()
}

@ -22,6 +22,7 @@ const (
GithubEventRelease = "release"
GithubEventPullRequestComment = "pull_request_comment"
GithubEventGollum = "gollum"
GithubEventSchedule = "schedule"
)
// canGithubEventMatch check if the input Github event can match any Gitea event.
@ -69,6 +70,9 @@ func canGithubEventMatch(eventName string, triggedEvent webhook_module.HookEvent
return false
}
case GithubEventSchedule:
return triggedEvent == webhook_module.HookEventSchedule
default:
return eventName == string(triggedEvent)
}

@ -22,7 +22,7 @@ import (
type DetectedWorkflow struct {
EntryName string
TriggerEvent string
TriggerEvent *jobparser.Event
Content []byte
}
@ -100,6 +100,7 @@ func DetectWorkflows(
commit *git.Commit,
triggedEvent webhook_module.HookEventType,
payload api.Payloader,
detectSchedule bool,
) ([]*DetectedWorkflow, []*DetectedWorkflow, error) {
entries, err := ListWorkflows(commit)
if err != nil {
@ -114,6 +115,7 @@ func DetectWorkflows(
return nil, nil, err
}
// one workflow may have multiple events
events, err := GetEventsFromContent(content)
if err != nil {
log.Warn("ignore invalid workflow %q: %v", entry.Name(), err)
@ -122,17 +124,18 @@ func DetectWorkflows(
for _, evt := range events {
log.Trace("detect workflow %q for event %#v matching %q", entry.Name(), evt, triggedEvent)
if evt.IsSchedule() {
dwf := &DetectedWorkflow{
EntryName: entry.Name(),
TriggerEvent: evt.Name,
Content: content,
if detectSchedule {
dwf := &DetectedWorkflow{
EntryName: entry.Name(),
TriggerEvent: evt,
Content: content,
}
schedules = append(schedules, dwf)
}
schedules = append(schedules, dwf)
}
if detectMatched(gitRepo, commit, triggedEvent, payload, evt) {
} else if detectMatched(gitRepo, commit, triggedEvent, payload, evt) {
dwf := &DetectedWorkflow{
EntryName: entry.Name(),
TriggerEvent: evt.Name,
TriggerEvent: evt,
Content: content,
}
workflows = append(workflows, dwf)
@ -153,7 +156,8 @@ func detectMatched(gitRepo *git.Repository, commit *git.Commit, triggedEvent web
webhook_module.HookEventCreate,
webhook_module.HookEventDelete,
webhook_module.HookEventFork,
webhook_module.HookEventWiki:
webhook_module.HookEventWiki,
webhook_module.HookEventSchedule:
if len(evt.Acts()) != 0 {
log.Warn("Ignore unsupported %s event arguments %v", triggedEvent, evt.Acts())
}

@ -118,6 +118,13 @@ func TestDetectMatched(t *testing.T) {
yamlOn: "on: gollum",
expected: true,
},
{
desc: "HookEventSchedue(schedule) matches GithubEventSchedule(schedule)",
triggedEvent: webhook_module.HookEventSchedule,
payload: nil,
yamlOn: "on: schedule",
expected: true,
},
}
for _, tc := range testCases {

@ -31,6 +31,7 @@ const (
HookEventRepository HookEventType = "repository"
HookEventRelease HookEventType = "release"
HookEventPackage HookEventType = "package"
HookEventSchedule HookEventType = "schedule"
)
// Event returns the HookEventType as an event string

@ -983,7 +983,7 @@ func updateRepoUnits(ctx *context.APIContext, opts api.EditRepoOption) error {
}
if len(units)+len(deleteUnitTypes) > 0 {
if err := repo_model.UpdateRepositoryUnits(ctx, repo, units, deleteUnitTypes); err != nil {
if err := repo_service.UpdateRepositoryUnits(ctx, repo, units, deleteUnitTypes); err != nil {
ctx.Error(http.StatusInternalServerError, "UpdateRepositoryUnits", err)
return err
}

@ -6,13 +6,12 @@ package setting
import (
"net/http"
repo_model "code.gitea.io/gitea/models/repo"
git_model "code.gitea.io/gitea/models/git"
"code.gitea.io/gitea/modules/context"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/routers/web/repo"
notify_service "code.gitea.io/gitea/services/notify"
repo_service "code.gitea.io/gitea/services/repository"
)
// SetDefaultBranchPost set default branch
@ -35,23 +34,14 @@ func SetDefaultBranchPost(ctx *context.Context) {
}
branch := ctx.FormString("branch")
if !ctx.Repo.GitRepo.IsBranchExist(branch) {
ctx.Status(http.StatusNotFound)
return
} else if repo.DefaultBranch != branch {
repo.DefaultBranch = branch
if err := ctx.Repo.GitRepo.SetDefaultBranch(branch); err != nil {
if !git.IsErrUnsupportedVersion(err) {
ctx.ServerError("SetDefaultBranch", err)
return
}
}
if err := repo_model.UpdateDefaultBranch(ctx, repo); err != nil {
if err := repo_service.SetRepoDefaultBranch(ctx, ctx.Repo.Repository, ctx.Repo.GitRepo, branch); err != nil {
switch {
case git_model.IsErrBranchNotExist(err):
ctx.Status(http.StatusNotFound)
default:
ctx.ServerError("SetDefaultBranch", err)
return
}
notify_service.ChangeDefaultBranch(ctx, repo)
return
}
log.Trace("Repository basic settings updated: %s/%s", ctx.Repo.Owner.Name, repo.Name)

@ -594,7 +594,7 @@ func SettingsPost(ctx *context.Context) {
return
}
if err := repo_model.UpdateRepositoryUnits(ctx, repo, units, deleteUnitTypes); err != nil {
if err := repo_service.UpdateRepositoryUnits(ctx, repo, units, deleteUnitTypes); err != nil {
ctx.ServerError("UpdateRepositoryUnits", err)
return
}

@ -117,6 +117,9 @@ func notify(ctx context.Context, input *notifyInput) error {
return nil
}
if unit_model.TypeActions.UnitGlobalDisabled() {
if err := actions_model.CleanRepoScheduleTasks(ctx, input.Repo); err != nil {
log.Error("CleanRepoScheduleTasks: %v", err)
}
return nil
}
if err := input.Repo.LoadUnits(ctx); err != nil {
@ -153,7 +156,11 @@ func notify(ctx context.Context, input *notifyInput) error {
var detectedWorkflows []*actions_module.DetectedWorkflow
actionsConfig := input.Repo.MustGetUnit(ctx, unit_model.TypeActions).ActionsConfig()
workflows, schedules, err := actions_module.DetectWorkflows(gitRepo, commit, input.Event, input.Payload)
workflows, schedules, err := actions_module.DetectWorkflows(gitRepo, commit,
input.Event,
input.Payload,
input.Event == webhook_module.HookEventPush && input.Ref == input.Repo.DefaultBranch,
)
if err != nil {
return fmt.Errorf("DetectWorkflows: %w", err)
}
@ -167,7 +174,7 @@ func notify(ctx context.Context, input *notifyInput) error {
continue
}
if wf.TriggerEvent != actions_module.GithubEventPullRequestTarget {
if wf.TriggerEvent.Name != actions_module.GithubEventPullRequestTarget {
detectedWorkflows = append(detectedWorkflows, wf)
}
}
@ -180,7 +187,7 @@ func notify(ctx context.Context, input *notifyInput) error {
if err != nil {
return fmt.Errorf("gitRepo.GetCommit: %w", err)
}
baseWorkflows, _, err := actions_module.DetectWorkflows(gitRepo, baseCommit, input.Event, input.Payload)
baseWorkflows, _, err := actions_module.DetectWorkflows(gitRepo, baseCommit, input.Event, input.Payload, false)
if err != nil {
return fmt.Errorf("DetectWorkflows: %w", err)
}
@ -188,7 +195,7 @@ func notify(ctx context.Context, input *notifyInput) error {
log.Trace("repo %s with commit %s couldn't find pull_request_target workflows", input.Repo.RepoPath(), baseCommit.ID)
} else {
for _, wf := range baseWorkflows {
if wf.TriggerEvent == actions_module.GithubEventPullRequestTarget {
if wf.TriggerEvent.Name == actions_module.GithubEventPullRequestTarget {
detectedWorkflows = append(detectedWorkflows, wf)
}
}
@ -265,7 +272,7 @@ func handleWorkflows(
IsForkPullRequest: isForkPullRequest,
Event: input.Event,
EventPayload: string(p),
TriggerEvent: dwf.TriggerEvent,
TriggerEvent: dwf.TriggerEvent.Name,
Status: actions_model.StatusWaiting,
}
if need, err := ifNeedApproval(ctx, run, input.Repo, input.Doer); err != nil {
@ -289,6 +296,7 @@ func handleWorkflows(
run.RepoID,
run.Ref,
run.WorkflowID,
run.Event,
); err != nil {
log.Error("CancelRunningJobs: %v", err)
}
@ -414,8 +422,8 @@ func handleSchedules(
log.Error("CountSchedules: %v", err)
return err
} else if count > 0 {
if err := actions_model.DeleteScheduleTaskByRepo(ctx, input.Repo.ID); err != nil {
log.Error("DeleteCronTaskByRepo: %v", err)
if err := actions_model.CleanRepoScheduleTasks(ctx, input.Repo); err != nil {
log.Error("CleanRepoScheduleTasks: %v", err)
}
}
@ -456,19 +464,6 @@ func handleSchedules(
Specs: schedules,
Content: dwf.Content,
}
// cancel running jobs if the event is push
if run.Event == webhook_module.HookEventPush {
// cancel running jobs of the same workflow
if err := actions_model.CancelRunningJobs(
ctx,
run.RepoID,
run.Ref,
run.WorkflowID,
); err != nil {
log.Error("CancelRunningJobs: %v", err)
}
}
crons = append(crons, run)
}

@ -59,6 +59,7 @@ func startTasks(ctx context.Context) error {
row.RepoID,
row.Schedule.Ref,
row.Schedule.WorkflowID,
webhook_module.HookEventSchedule,
); err != nil {
log.Error("CancelRunningJobs: %v", err)
}
@ -113,6 +114,7 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule)
CommitSHA: cron.CommitSHA,
Event: cron.Event,
EventPayload: cron.EventPayload,
TriggerEvent: string(webhook_module.HookEventSchedule),
ScheduleID: cron.ID,
Status: actions_model.StatusWaiting,
}

@ -10,6 +10,7 @@ import (
"strings"
"code.gitea.io/gitea/models"
actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db"
git_model "code.gitea.io/gitea/models/git"
issues_model "code.gitea.io/gitea/models/issues"
@ -22,6 +23,7 @@ import (
repo_module "code.gitea.io/gitea/modules/repository"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
webhook_module "code.gitea.io/gitea/modules/webhook"
notify_service "code.gitea.io/gitea/services/notify"
files_service "code.gitea.io/gitea/services/repository/files"
@ -308,13 +310,28 @@ func RenameBranch(ctx context.Context, repo *repo_model.Repository, doer *user_m
return "from_not_exist", nil
}
if err := git_model.RenameBranch(ctx, repo, from, to, func(isDefault bool) error {
if err := git_model.RenameBranch(ctx, repo, from, to, func(ctx context.Context, isDefault bool) error {
err2 := gitRepo.RenameBranch(from, to)
if err2 != nil {
return err2
}
if isDefault {
// if default branch changed, we need to delete all schedules and cron jobs
if err := actions_model.DeleteScheduleTaskByRepo(ctx, repo.ID); err != nil {
log.Error("DeleteCronTaskByRepo: %v", err)
}
// cancel running cron jobs of this repository and delete old schedules
if err := actions_model.CancelRunningJobs(
ctx,
repo.ID,
from,
"",
webhook_module.HookEventSchedule,
); err != nil {
log.Error("CancelRunningJobs: %v", err)
}
err2 = gitRepo.SetDefaultBranch(to)
if err2 != nil {
return err2
@ -450,3 +467,50 @@ func AddAllRepoBranchesToSyncQueue(ctx context.Context, doerID int64) error {
}
return nil
}
func SetRepoDefaultBranch(ctx context.Context, repo *repo_model.Repository, gitRepo *git.Repository, newBranchName string) error {
if repo.DefaultBranch == newBranchName {
return nil
}
if !gitRepo.IsBranchExist(newBranchName) {
return git_model.ErrBranchNotExist{
BranchName: newBranchName,
}
}
oldDefaultBranchName := repo.DefaultBranch
repo.DefaultBranch = newBranchName
if err := db.WithTx(ctx, func(ctx context.Context) error {
if err := repo_model.UpdateDefaultBranch(ctx, repo); err != nil {
return err
}
if err := actions_model.DeleteScheduleTaskByRepo(ctx, repo.ID); err != nil {
log.Error("DeleteCronTaskByRepo: %v", err)
}
// cancel running cron jobs of this repository and delete old schedules
if err := actions_model.CancelRunningJobs(
ctx,
repo.ID,
oldDefaultBranchName,
"",
webhook_module.HookEventSchedule,
); err != nil {
log.Error("CancelRunningJobs: %v", err)
}
if err := gitRepo.SetDefaultBranch(newBranchName); err != nil {
if !git.IsErrUnsupportedVersion(err) {
return err
}
}
return nil
}); err != nil {
return err
}
notify_service.ChangeDefaultBranch(ctx, repo)
return nil
}

@ -0,0 +1,47 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package repository
import (
"context"
"slices"
actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/models/unit"
"code.gitea.io/gitea/modules/log"
)
// UpdateRepositoryUnits updates a repository's units
func UpdateRepositoryUnits(ctx context.Context, repo *repo_model.Repository, units []repo_model.RepoUnit, deleteUnitTypes []unit.Type) (err error) {
ctx, committer, err := db.TxContext(ctx)
if err != nil {
return err
}
defer committer.Close()
// Delete existing settings of units before adding again
for _, u := range units {
deleteUnitTypes = append(deleteUnitTypes, u.Type)
}
if slices.Contains(deleteUnitTypes, unit.TypeActions) {
if err := actions_model.CleanRepoScheduleTasks(ctx, repo); err != nil {
log.Error("CleanRepoScheduleTasks: %v", err)
}
}
if _, err = db.GetEngine(ctx).Where("repo_id = ?", repo.ID).In("type", deleteUnitTypes).Delete(new(repo_model.RepoUnit)); err != nil {
return err
}
if len(units) > 0 {
if err = db.Insert(ctx, units); err != nil {
return err
}
}
return committer.Commit()
}

@ -19,6 +19,7 @@ import (
repo_module "code.gitea.io/gitea/modules/repository"
"code.gitea.io/gitea/modules/sync"
asymkey_service "code.gitea.io/gitea/services/asymkey"
repo_service "code.gitea.io/gitea/services/repository"
)
// TODO: use clustered lock (unique queue? or *abuse* cache)
@ -350,7 +351,7 @@ func DeleteWikiPage(ctx context.Context, doer *user_model.User, repo *repo_model
// DeleteWiki removes the actual and local copy of repository wiki.
func DeleteWiki(ctx context.Context, repo *repo_model.Repository) error {
if err := repo_model.UpdateRepositoryUnits(ctx, repo, nil, []unit.Type{unit.TypeWiki}); err != nil {
if err := repo_service.UpdateRepositoryUnits(ctx, repo, nil, []unit.Type{unit.TypeWiki}); err != nil {
return err
}

@ -47,7 +47,7 @@ func TestPullRequestTargetEvent(t *testing.T) {
assert.NotEmpty(t, baseRepo)
// enable actions
err = repo_model.UpdateRepositoryUnits(db.DefaultContext, baseRepo, []repo_model.RepoUnit{{
err = repo_service.UpdateRepositoryUnits(db.DefaultContext, baseRepo, []repo_model.RepoUnit{{
RepoID: baseRepo.ID,
Type: unit_model.TypeActions,
}}, nil)
@ -216,7 +216,7 @@ func TestSkipCI(t *testing.T) {
assert.NotEmpty(t, repo)
// enable actions
err = repo_model.UpdateRepositoryUnits(db.DefaultContext, repo, []repo_model.RepoUnit{{
err = repo_service.UpdateRepositoryUnits(db.DefaultContext, repo, []repo_model.RepoUnit{{
RepoID: repo.ID,
Type: unit_model.TypeActions,
}}, nil)

Loading…
Cancel
Save