mirror of
https://github.com/usual2970/certimate.git
synced 2025-06-08 13:39:53 +00:00
165 lines
4.7 KiB
Go
165 lines
4.7 KiB
Go
package workflow
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/pocketbase/dbx"
|
|
|
|
"github.com/usual2970/certimate/internal/app"
|
|
"github.com/usual2970/certimate/internal/domain"
|
|
"github.com/usual2970/certimate/internal/domain/dtos"
|
|
"github.com/usual2970/certimate/internal/workflow/dispatcher"
|
|
)
|
|
|
|
type workflowRepository interface {
|
|
ListEnabledAuto(ctx context.Context) ([]*domain.Workflow, error)
|
|
GetById(ctx context.Context, id string) (*domain.Workflow, error)
|
|
Save(ctx context.Context, workflow *domain.Workflow) (*domain.Workflow, error)
|
|
}
|
|
|
|
type workflowRunRepository interface {
|
|
GetById(ctx context.Context, id string) (*domain.WorkflowRun, error)
|
|
Save(ctx context.Context, workflowRun *domain.WorkflowRun) (*domain.WorkflowRun, error)
|
|
DeleteWhere(ctx context.Context, exprs ...dbx.Expression) (int, error)
|
|
}
|
|
|
|
type settingsRepository interface {
|
|
GetByName(ctx context.Context, name string) (*domain.Settings, error)
|
|
}
|
|
|
|
type WorkflowService struct {
|
|
dispatcher *dispatcher.WorkflowDispatcher
|
|
|
|
workflowRepo workflowRepository
|
|
workflowRunRepo workflowRunRepository
|
|
settingsRepo settingsRepository
|
|
}
|
|
|
|
func NewWorkflowService(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository, settingsRepo settingsRepository) *WorkflowService {
|
|
srv := &WorkflowService{
|
|
dispatcher: dispatcher.GetSingletonDispatcher(),
|
|
|
|
workflowRepo: workflowRepo,
|
|
workflowRunRepo: workflowRunRepo,
|
|
settingsRepo: settingsRepo,
|
|
}
|
|
return srv
|
|
}
|
|
|
|
func (s *WorkflowService) InitSchedule(ctx context.Context) error {
|
|
// 每日清理工作流执行历史
|
|
app.GetScheduler().MustAdd("workflowHistoryRunsCleanup", "0 0 * * *", func() {
|
|
settings, err := s.settingsRepo.GetByName(ctx, "persistence")
|
|
if err != nil {
|
|
app.GetLogger().Error("failed to get persistence settings", "err", err)
|
|
return
|
|
}
|
|
|
|
var settingsContent *domain.PersistenceSettingsContent
|
|
json.Unmarshal([]byte(settings.Content), &settingsContent)
|
|
if settingsContent != nil && settingsContent.WorkflowRunsMaxDaysRetention != 0 {
|
|
ret, err := s.workflowRunRepo.DeleteWhere(
|
|
context.Background(),
|
|
dbx.NewExp(fmt.Sprintf("status!='%s'", string(domain.WorkflowRunStatusTypePending))),
|
|
dbx.NewExp(fmt.Sprintf("status!='%s'", string(domain.WorkflowRunStatusTypeRunning))),
|
|
dbx.NewExp(fmt.Sprintf("endedAt<DATETIME('now', '-%d days')", settingsContent.WorkflowRunsMaxDaysRetention)),
|
|
)
|
|
if err != nil {
|
|
app.GetLogger().Error("failed to delete workflow history runs", "err", err)
|
|
}
|
|
|
|
if ret > 0 {
|
|
app.GetLogger().Info(fmt.Sprintf("cleanup %d workflow history runs", ret))
|
|
}
|
|
}
|
|
})
|
|
|
|
// 工作流
|
|
{
|
|
workflows, err := s.workflowRepo.ListEnabledAuto(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, workflow := range workflows {
|
|
var errs []error
|
|
|
|
err := app.GetScheduler().Add(fmt.Sprintf("workflow#%s", workflow.Id), workflow.TriggerCron, func() {
|
|
s.StartRun(ctx, &dtos.WorkflowStartRunReq{
|
|
WorkflowId: workflow.Id,
|
|
RunTrigger: domain.WorkflowTriggerTypeAuto,
|
|
})
|
|
})
|
|
if err != nil {
|
|
errs = append(errs, err)
|
|
}
|
|
|
|
if len(errs) > 0 {
|
|
return errors.Join(errs...)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *WorkflowService) StartRun(ctx context.Context, req *dtos.WorkflowStartRunReq) error {
|
|
workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if workflow.LastRunStatus == domain.WorkflowRunStatusTypePending || workflow.LastRunStatus == domain.WorkflowRunStatusTypeRunning {
|
|
return errors.New("workflow is already pending or running")
|
|
}
|
|
|
|
run := &domain.WorkflowRun{
|
|
WorkflowId: workflow.Id,
|
|
Status: domain.WorkflowRunStatusTypePending,
|
|
Trigger: req.RunTrigger,
|
|
StartedAt: time.Now(),
|
|
Detail: workflow.Content,
|
|
}
|
|
if resp, err := s.workflowRunRepo.Save(ctx, run); err != nil {
|
|
return err
|
|
} else {
|
|
run = resp
|
|
}
|
|
|
|
s.dispatcher.Dispatch(&dispatcher.WorkflowWorkerData{
|
|
WorkflowId: run.WorkflowId,
|
|
WorkflowContent: run.Detail,
|
|
RunId: run.Id,
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *WorkflowService) CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error {
|
|
workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
workflowRun, err := s.workflowRunRepo.GetById(ctx, req.RunId)
|
|
if err != nil {
|
|
return err
|
|
} else if workflowRun.WorkflowId != workflow.Id {
|
|
return errors.New("workflow run not found")
|
|
} else if workflowRun.Status != domain.WorkflowRunStatusTypePending && workflowRun.Status != domain.WorkflowRunStatusTypeRunning {
|
|
return errors.New("workflow run is not pending or running")
|
|
}
|
|
|
|
s.dispatcher.Cancel(workflowRun.Id)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *WorkflowService) Shutdown(ctx context.Context) {
|
|
s.dispatcher.Shutdown()
|
|
}
|