package workflow import ( "context" "encoding/json" "errors" "fmt" "time" "github.com/pocketbase/dbx" "github.com/usual2970/certimate/internal/app" "github.com/usual2970/certimate/internal/domain" "github.com/usual2970/certimate/internal/domain/dtos" "github.com/usual2970/certimate/internal/workflow/dispatcher" ) type workflowRepository interface { ListEnabledAuto(ctx context.Context) ([]*domain.Workflow, error) GetById(ctx context.Context, id string) (*domain.Workflow, error) Save(ctx context.Context, workflow *domain.Workflow) (*domain.Workflow, error) } type workflowRunRepository interface { GetById(ctx context.Context, id string) (*domain.WorkflowRun, error) Save(ctx context.Context, workflowRun *domain.WorkflowRun) (*domain.WorkflowRun, error) DeleteWhere(ctx context.Context, exprs ...dbx.Expression) (int, error) } type settingsRepository interface { GetByName(ctx context.Context, name string) (*domain.Settings, error) } type WorkflowService struct { dispatcher *dispatcher.WorkflowDispatcher workflowRepo workflowRepository workflowRunRepo workflowRunRepository settingsRepo settingsRepository } func NewWorkflowService(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository, settingsRepo settingsRepository) *WorkflowService { srv := &WorkflowService{ dispatcher: dispatcher.GetSingletonDispatcher(), workflowRepo: workflowRepo, workflowRunRepo: workflowRunRepo, settingsRepo: settingsRepo, } return srv } func (s *WorkflowService) InitSchedule(ctx context.Context) error { // 每日清理工作流执行历史 app.GetScheduler().MustAdd("workflowHistoryRunsCleanup", "0 0 * * *", func() { settings, err := s.settingsRepo.GetByName(ctx, "persistence") if err != nil { app.GetLogger().Error("failed to get persistence settings", "err", err) return } var settingsContent *domain.PersistenceSettingsContent json.Unmarshal([]byte(settings.Content), &settingsContent) if settingsContent != nil && settingsContent.WorkflowRunsMaxDaysRetention != 0 { ret, err := s.workflowRunRepo.DeleteWhere( context.Background(), dbx.NewExp(fmt.Sprintf("status!='%s'", string(domain.WorkflowRunStatusTypePending))), dbx.NewExp(fmt.Sprintf("status!='%s'", string(domain.WorkflowRunStatusTypeRunning))), dbx.NewExp(fmt.Sprintf("endedAt 0 { app.GetLogger().Info(fmt.Sprintf("cleanup %d workflow history runs", ret)) } } }) // 工作流 { workflows, err := s.workflowRepo.ListEnabledAuto(ctx) if err != nil { return err } for _, workflow := range workflows { var errs []error err := app.GetScheduler().Add(fmt.Sprintf("workflow#%s", workflow.Id), workflow.TriggerCron, func() { s.StartRun(ctx, &dtos.WorkflowStartRunReq{ WorkflowId: workflow.Id, RunTrigger: domain.WorkflowTriggerTypeAuto, }) }) if err != nil { errs = append(errs, err) } if len(errs) > 0 { return errors.Join(errs...) } } } return nil } func (s *WorkflowService) StartRun(ctx context.Context, req *dtos.WorkflowStartRunReq) error { workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId) if err != nil { return err } if workflow.LastRunStatus == domain.WorkflowRunStatusTypePending || workflow.LastRunStatus == domain.WorkflowRunStatusTypeRunning { return errors.New("workflow is already pending or running") } run := &domain.WorkflowRun{ WorkflowId: workflow.Id, Status: domain.WorkflowRunStatusTypePending, Trigger: req.RunTrigger, StartedAt: time.Now(), Detail: workflow.Content, } if resp, err := s.workflowRunRepo.Save(ctx, run); err != nil { return err } else { run = resp } s.dispatcher.Dispatch(&dispatcher.WorkflowWorkerData{ WorkflowId: run.WorkflowId, WorkflowContent: run.Detail, RunId: run.Id, }) return nil } func (s *WorkflowService) CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error { workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId) if err != nil { return err } workflowRun, err := s.workflowRunRepo.GetById(ctx, req.RunId) if err != nil { return err } else if workflowRun.WorkflowId != workflow.Id { return errors.New("workflow run not found") } else if workflowRun.Status != domain.WorkflowRunStatusTypePending && workflowRun.Status != domain.WorkflowRunStatusTypeRunning { return errors.New("workflow run is not pending or running") } s.dispatcher.Cancel(workflowRun.Id) return nil } func (s *WorkflowService) Shutdown(ctx context.Context) { s.dispatcher.Shutdown() }