From 0bc40fd6766625660a0bfeedb482eeae052ca389 Mon Sep 17 00:00:00 2001 From: Fu Diwei Date: Sat, 8 Feb 2025 22:44:13 +0800 Subject: [PATCH] feat: workflow run dispatcher --- internal/repository/workflow_run.go | 7 + internal/rest/handlers/workflow.go | 2 +- internal/rest/routes/routes.go | 2 +- internal/workflow/dispatcher/dispatcher.go | 274 ++++++++++++++++++ internal/workflow/dispatcher/invoker.go | 104 +++++++ internal/workflow/dispatcher/singleton.go | 31 ++ .../workflow/node-processor/apply_node.go | 3 +- internal/workflow/processor/processor.go | 90 ------ internal/workflow/service.go | 147 +++------- ui/src/components/workflow/WorkflowRuns.tsx | 4 +- ui/src/pages/dashboard/Dashboard.tsx | 5 +- ui/src/pages/workflows/WorkflowDetail.tsx | 25 +- ui/src/pages/workflows/WorkflowList.tsx | 4 +- 13 files changed, 472 insertions(+), 226 deletions(-) create mode 100644 internal/workflow/dispatcher/dispatcher.go create mode 100644 internal/workflow/dispatcher/invoker.go create mode 100644 internal/workflow/dispatcher/singleton.go delete mode 100644 internal/workflow/processor/processor.go diff --git a/internal/repository/workflow_run.go b/internal/repository/workflow_run.go index 01185a45..b1a5234b 100644 --- a/internal/repository/workflow_run.go +++ b/internal/repository/workflow_run.go @@ -69,6 +69,13 @@ func (r *WorkflowRunRepository) Save(ctx context.Context, workflowRun *domain.Wo workflowRecord, err := txApp.FindRecordById(domain.CollectionNameWorkflow, workflowRun.WorkflowId) if err != nil { return err + } else if workflowRun.Id == workflowRecord.GetString("lastRunId") { + workflowRecord.IgnoreUnchangedFields(true) + workflowRecord.Set("lastRunStatus", record.GetString("status")) + err = txApp.Save(workflowRecord) + if err != nil { + return err + } } else if workflowRecord.GetDateTime("lastRunTime").Time().IsZero() || workflowRun.StartedAt.After(workflowRecord.GetDateTime("lastRunTime").Time()) { workflowRecord.IgnoreUnchangedFields(true) workflowRecord.Set("lastRunId", record.Id) diff --git a/internal/rest/handlers/workflow.go b/internal/rest/handlers/workflow.go index 83b3302b..bad474f0 100644 --- a/internal/rest/handlers/workflow.go +++ b/internal/rest/handlers/workflow.go @@ -13,7 +13,7 @@ import ( type workflowService interface { StartRun(ctx context.Context, req *dtos.WorkflowStartRunReq) error CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error - Stop(ctx context.Context) + Shutdown(ctx context.Context) } type WorkflowHandler struct { diff --git a/internal/rest/routes/routes.go b/internal/rest/routes/routes.go index 756760da..87fcb297 100644 --- a/internal/rest/routes/routes.go +++ b/internal/rest/routes/routes.go @@ -46,6 +46,6 @@ func Register(router *router.Router[*core.RequestEvent]) { func Unregister() { if workflowSvc != nil { - workflowSvc.Stop(context.Background()) + workflowSvc.Shutdown(context.Background()) } } diff --git a/internal/workflow/dispatcher/dispatcher.go b/internal/workflow/dispatcher/dispatcher.go new file mode 100644 index 00000000..0ecc0828 --- /dev/null +++ b/internal/workflow/dispatcher/dispatcher.go @@ -0,0 +1,274 @@ +package dispatcher + +import ( + "context" + "errors" + "fmt" + "os" + "strconv" + "sync" + "time" + + "github.com/usual2970/certimate/internal/app" + "github.com/usual2970/certimate/internal/domain" + "github.com/usual2970/certimate/internal/pkg/utils/slices" +) + +var maxWorkers = 16 + +func init() { + envMaxWorkers := os.Getenv("CERTIMATE_WORKFLOW_MAX_WORKERS") + if n, err := strconv.Atoi(envMaxWorkers); err != nil && n > 0 { + maxWorkers = n + } +} + +type workflowWorker struct { + Data *WorkflowWorkerData + Cancel context.CancelFunc +} + +type WorkflowWorkerData struct { + WorkflowId string + WorkflowContent *domain.WorkflowNode + RunId string +} + +type WorkflowDispatcher struct { + semaphore chan struct{} + + queue []*WorkflowWorkerData + queueMutex sync.Mutex + + workers map[string]*workflowWorker // key: WorkflowId + workerIdMap map[string]string // key: RunId, value: WorkflowId + workerMutex sync.Mutex + + chWork chan *WorkflowWorkerData + chCandi chan struct{} + + wg sync.WaitGroup + + workflowRepo workflowRepository + workflowRunRepo workflowRunRepository +} + +func newWorkflowDispatcher(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository) *WorkflowDispatcher { + dispatcher := &WorkflowDispatcher{ + semaphore: make(chan struct{}, maxWorkers), + + queue: make([]*WorkflowWorkerData, 0), + queueMutex: sync.Mutex{}, + + workers: make(map[string]*workflowWorker), + workerIdMap: make(map[string]string), + workerMutex: sync.Mutex{}, + + chWork: make(chan *WorkflowWorkerData), + chCandi: make(chan struct{}, 1), + + workflowRepo: workflowRepo, + workflowRunRepo: workflowRunRepo, + } + + go func() { + for { + select { + case <-dispatcher.chWork: + dispatcher.dequeueWorker() + + case <-dispatcher.chCandi: + dispatcher.dequeueWorker() + } + } + }() + + return dispatcher +} + +func (w *WorkflowDispatcher) Dispatch(data *WorkflowWorkerData) { + if data == nil { + panic("worker data is nil") + } + + w.enqueueWorker(data) + select { + case w.chWork <- data: + default: + } +} + +func (w *WorkflowDispatcher) Cancel(runId string) { + hasWorker := false + + // 取消正在执行的 WorkflowRun + w.workerMutex.Lock() + if workflowId, ok := w.workerIdMap[runId]; ok { + if worker, ok := w.workers[workflowId]; ok { + hasWorker = true + worker.Cancel() + delete(w.workers, workflowId) + delete(w.workerIdMap, runId) + } + } + w.workerMutex.Unlock() + + // 移除排队中的 WorkflowRun + w.queueMutex.Lock() + w.queue = slices.Filter(w.queue, func(d *WorkflowWorkerData) bool { + return d.RunId != runId + }) + w.queueMutex.Unlock() + + // 已挂起,查询 WorkflowRun 并更新其状态为 Canceled + if !hasWorker { + if run, err := w.workflowRunRepo.GetById(context.Background(), runId); err == nil { + if run.Status == domain.WorkflowRunStatusTypePending || run.Status == domain.WorkflowRunStatusTypeRunning { + run.Status = domain.WorkflowRunStatusTypeCanceled + w.workflowRunRepo.Save(context.Background(), run) + } + } + } +} + +func (w *WorkflowDispatcher) Shutdown() { + // 清空排队中的 WorkflowRun + w.queueMutex.Lock() + w.queue = make([]*WorkflowWorkerData, 0) + w.queueMutex.Unlock() + + // 等待所有正在执行的 WorkflowRun 完成 + w.wg.Wait() + w.workers = make(map[string]*workflowWorker) + w.workerIdMap = make(map[string]string) +} + +func (w *WorkflowDispatcher) enqueueWorker(data *WorkflowWorkerData) { + w.queueMutex.Lock() + defer w.queueMutex.Unlock() + w.queue = append(w.queue, data) +} + +func (w *WorkflowDispatcher) dequeueWorker() { + for { + select { + case w.semaphore <- struct{}{}: + default: + // 达到最大并发数 + return + } + + w.queueMutex.Lock() + if len(w.queue) == 0 { + w.queueMutex.Unlock() + <-w.semaphore + return + } + + data := w.queue[0] + w.queue = w.queue[1:] + w.queueMutex.Unlock() + + // 检查是否有相同 WorkflowId 的 WorkflowRun 正在执行 + // 如果有,则重新排队,以保证同一个工作流同一时间内只有一个正在执行 + // 即不同 WorkflowId 的任务并行化,相同 WorkflowId 的任务串行化 + w.workerMutex.Lock() + if _, exists := w.workers[data.WorkflowId]; exists { + w.queueMutex.Lock() + w.queue = append(w.queue, data) + w.queueMutex.Unlock() + w.workerMutex.Unlock() + + <-w.semaphore + + continue + } + + ctx, cancel := context.WithCancel(context.Background()) + w.workers[data.WorkflowId] = &workflowWorker{data, cancel} + w.workerIdMap[data.RunId] = data.WorkflowId + w.workerMutex.Unlock() + + w.wg.Add(1) + go w.work(ctx, data) + } +} + +func (w *WorkflowDispatcher) work(ctx context.Context, data *WorkflowWorkerData) { + defer func() { + <-w.semaphore + w.workerMutex.Lock() + delete(w.workers, data.WorkflowId) + delete(w.workerIdMap, data.RunId) + w.workerMutex.Unlock() + + w.wg.Done() + + // 尝试取出排队中的其他 WorkflowRun 继续执行 + select { + case w.chCandi <- struct{}{}: + default: + } + }() + + // 查询 WorkflowRun + run, err := w.workflowRunRepo.GetById(ctx, data.RunId) + if err != nil { + if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) { + app.GetLogger().Error(fmt.Sprintf("failed to get workflow run #%s", data.RunId), "err", err) + } + return + } else if run.Status != domain.WorkflowRunStatusTypePending { + return + } else if ctx.Err() != nil { + run.Status = domain.WorkflowRunStatusTypeCanceled + w.workflowRunRepo.Save(ctx, run) + return + } + + // 更新 WorkflowRun 状态为 Running + run.Status = domain.WorkflowRunStatusTypeRunning + if _, err := w.workflowRunRepo.Save(ctx, run); err != nil { + if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) { + panic(err) + } + return + } + + // 执行工作流 + invoker := newWorkflowInvoker(data) + if runErr := invoker.Invoke(ctx); runErr != nil { + if errors.Is(runErr, context.Canceled) { + run.Status = domain.WorkflowRunStatusTypeCanceled + run.Logs = invoker.GetLogs() + } else { + run.Status = domain.WorkflowRunStatusTypeFailed + run.EndedAt = time.Now() + run.Logs = invoker.GetLogs() + run.Error = runErr.Error() + } + + if _, err := w.workflowRunRepo.Save(ctx, run); err != nil { + if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) { + panic(err) + } + } + + return + } + + // 更新 WorkflowRun 状态为 Succeeded/Failed + run.EndedAt = time.Now() + run.Logs = invoker.GetLogs() + run.Error = domain.WorkflowRunLogs(invoker.GetLogs()).ErrorString() + if run.Error == "" { + run.Status = domain.WorkflowRunStatusTypeSucceeded + } else { + run.Status = domain.WorkflowRunStatusTypeFailed + } + if _, err := w.workflowRunRepo.Save(ctx, run); err != nil { + if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) { + panic(err) + } + } +} diff --git a/internal/workflow/dispatcher/invoker.go b/internal/workflow/dispatcher/invoker.go new file mode 100644 index 00000000..3033314e --- /dev/null +++ b/internal/workflow/dispatcher/invoker.go @@ -0,0 +1,104 @@ +package dispatcher + +import ( + "context" + "errors" + + "github.com/usual2970/certimate/internal/domain" + nodes "github.com/usual2970/certimate/internal/workflow/node-processor" +) + +type workflowInvoker struct { + workflowId string + workflowContent *domain.WorkflowNode + runId string + runLogs []domain.WorkflowRunLog +} + +func newWorkflowInvoker(data *WorkflowWorkerData) *workflowInvoker { + if data == nil { + panic("worker data is nil") + } + + return &workflowInvoker{ + workflowId: data.WorkflowId, + workflowContent: data.WorkflowContent, + runId: data.RunId, + runLogs: make([]domain.WorkflowRunLog, 0), + } +} + +func (w *workflowInvoker) Invoke(ctx context.Context) error { + ctx = context.WithValue(ctx, "workflow_id", w.workflowId) + ctx = context.WithValue(ctx, "workflow_run_id", w.runId) + return w.processNode(ctx, w.workflowContent) +} + +func (w *workflowInvoker) GetLogs() []domain.WorkflowRunLog { + return w.runLogs +} + +func (w *workflowInvoker) processNode(ctx context.Context, node *domain.WorkflowNode) error { + current := node + for current != nil { + if ctx.Err() != nil { + return ctx.Err() + } + + if current.Type == domain.WorkflowNodeTypeBranch || current.Type == domain.WorkflowNodeTypeExecuteResultBranch { + for _, branch := range current.Branches { + if err := w.processNode(ctx, &branch); err != nil { + // 并行分支的某一分支发生错误时,忽略此错误,继续执行其他分支 + if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) { + continue + } + return err + } + } + } + + var processor nodes.NodeProcessor + var procErr error + for { + if current.Type != domain.WorkflowNodeTypeBranch && current.Type != domain.WorkflowNodeTypeExecuteResultBranch { + processor, procErr = nodes.GetProcessor(current) + if procErr != nil { + break + } + + procErr = processor.Process(ctx) + log := processor.GetLog(ctx) + if log != nil { + w.runLogs = append(w.runLogs, *log) + } + if procErr != nil { + break + } + } + + break + } + + // TODO: 优化可读性 + if procErr != nil && current.Next != nil && current.Next.Type != domain.WorkflowNodeTypeExecuteResultBranch { + return procErr + } else if procErr != nil && current.Next != nil && current.Next.Type == domain.WorkflowNodeTypeExecuteResultBranch { + current = w.getBranchByType(current.Next.Branches, domain.WorkflowNodeTypeExecuteFailure) + } else if procErr == nil && current.Next != nil && current.Next.Type == domain.WorkflowNodeTypeExecuteResultBranch { + current = w.getBranchByType(current.Next.Branches, domain.WorkflowNodeTypeExecuteSuccess) + } else { + current = current.Next + } + } + + return nil +} + +func (w *workflowInvoker) getBranchByType(branches []domain.WorkflowNode, nodeType domain.WorkflowNodeType) *domain.WorkflowNode { + for _, branch := range branches { + if branch.Type == nodeType { + return &branch + } + } + return nil +} diff --git a/internal/workflow/dispatcher/singleton.go b/internal/workflow/dispatcher/singleton.go new file mode 100644 index 00000000..37c34f56 --- /dev/null +++ b/internal/workflow/dispatcher/singleton.go @@ -0,0 +1,31 @@ +package dispatcher + +import ( + "context" + "sync" + + "github.com/usual2970/certimate/internal/domain" +) + +type workflowRepository interface { + GetById(ctx context.Context, id string) (*domain.Workflow, error) + Save(ctx context.Context, workflow *domain.Workflow) (*domain.Workflow, error) +} + +type workflowRunRepository interface { + GetById(ctx context.Context, id string) (*domain.WorkflowRun, error) + Save(ctx context.Context, workflowRun *domain.WorkflowRun) (*domain.WorkflowRun, error) +} + +var ( + instance *WorkflowDispatcher + intanceOnce sync.Once +) + +func GetSingletonDispatcher(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository) *WorkflowDispatcher { + intanceOnce.Do(func() { + instance = newWorkflowDispatcher(workflowRepo, workflowRunRepo) + }) + + return instance +} diff --git a/internal/workflow/node-processor/apply_node.go b/internal/workflow/node-processor/apply_node.go index 39a0167f..d75486ea 100644 --- a/internal/workflow/node-processor/apply_node.go +++ b/internal/workflow/node-processor/apply_node.go @@ -80,7 +80,6 @@ func (n *applyNode) Process(ctx context.Context) error { certificate.PopulateFromX509(certX509) // 保存执行结果 - // TODO: 先保持一个节点始终只有一个输出,后续增加版本控制 output := &domain.WorkflowOutput{ WorkflowId: getContextWorkflowId(ctx), RunId: getContextWorkflowRunId(ctx), @@ -124,7 +123,7 @@ func (n *applyNode) checkCanSkip(ctx context.Context, lastOutput *domain.Workflo renewalInterval := time.Duration(currentNodeConfig.SkipBeforeExpiryDays) * time.Hour * 24 expirationTime := time.Until(lastCertificate.ExpireAt) if expirationTime > renewalInterval { - return true, fmt.Sprintf("已申请过证书,且证书尚未临近过期(到期尚余 %d 天,预计不足 %d 天时续期)", int(expirationTime.Hours()/24), currentNodeConfig.SkipBeforeExpiryDays) + return true, fmt.Sprintf("已申请过证书,且证书尚未临近过期(尚余 %d 天过期,不足 %d 天时续期)", int(expirationTime.Hours()/24), currentNodeConfig.SkipBeforeExpiryDays) } } } diff --git a/internal/workflow/processor/processor.go b/internal/workflow/processor/processor.go deleted file mode 100644 index 39486419..00000000 --- a/internal/workflow/processor/processor.go +++ /dev/null @@ -1,90 +0,0 @@ -package processor - -import ( - "context" - - "github.com/usual2970/certimate/internal/domain" - nodes "github.com/usual2970/certimate/internal/workflow/node-processor" -) - -type workflowProcessor struct { - workflowId string - workflowContent *domain.WorkflowNode - runId string - runLogs []domain.WorkflowRunLog -} - -func NewWorkflowProcessor(workflowId string, workflowContent *domain.WorkflowNode, workflowRunId string) *workflowProcessor { - return &workflowProcessor{ - workflowId: workflowId, - workflowContent: workflowContent, - runId: workflowRunId, - runLogs: make([]domain.WorkflowRunLog, 0), - } -} - -func (w *workflowProcessor) Process(ctx context.Context) error { - ctx = context.WithValue(ctx, "workflow_id", w.workflowId) - ctx = context.WithValue(ctx, "workflow_run_id", w.runId) - return w.processNode(ctx, w.workflowContent) -} - -func (w *workflowProcessor) GetLogs() []domain.WorkflowRunLog { - return w.runLogs -} - -func (w *workflowProcessor) processNode(ctx context.Context, node *domain.WorkflowNode) error { - current := node - for current != nil { - if current.Type == domain.WorkflowNodeTypeBranch || current.Type == domain.WorkflowNodeTypeExecuteResultBranch { - for _, branch := range current.Branches { - if err := w.processNode(ctx, &branch); err != nil { - continue - } - } - } - - var processor nodes.NodeProcessor - var runErr error - for { - if current.Type != domain.WorkflowNodeTypeBranch && current.Type != domain.WorkflowNodeTypeExecuteResultBranch { - processor, runErr = nodes.GetProcessor(current) - if runErr != nil { - break - } - - runErr = processor.Process(ctx) - log := processor.GetLog(ctx) - if log != nil { - w.runLogs = append(w.runLogs, *log) - } - if runErr != nil { - break - } - } - - break - } - - if runErr != nil && current.Next != nil && current.Next.Type != domain.WorkflowNodeTypeExecuteResultBranch { - return runErr - } else if runErr != nil && current.Next != nil && current.Next.Type == domain.WorkflowNodeTypeExecuteResultBranch { - current = w.getBranchByType(current.Next.Branches, domain.WorkflowNodeTypeExecuteFailure) - } else if runErr == nil && current.Next != nil && current.Next.Type == domain.WorkflowNodeTypeExecuteResultBranch { - current = w.getBranchByType(current.Next.Branches, domain.WorkflowNodeTypeExecuteSuccess) - } else { - current = current.Next - } - } - - return nil -} - -func (w *workflowProcessor) getBranchByType(branches []domain.WorkflowNode, nodeType domain.WorkflowNodeType) *domain.WorkflowNode { - for _, branch := range branches { - if branch.Type == nodeType { - return &branch - } - } - return nil -} diff --git a/internal/workflow/service.go b/internal/workflow/service.go index 8181c969..2d0a224d 100644 --- a/internal/workflow/service.go +++ b/internal/workflow/service.go @@ -4,23 +4,14 @@ import ( "context" "errors" "fmt" - "sync" "time" "github.com/usual2970/certimate/internal/app" "github.com/usual2970/certimate/internal/domain" "github.com/usual2970/certimate/internal/domain/dtos" - processor "github.com/usual2970/certimate/internal/workflow/processor" + "github.com/usual2970/certimate/internal/workflow/dispatcher" ) -const defaultRoutines = 16 - -type workflowRunData struct { - WorkflowId string - WorkflowContent *domain.WorkflowNode - RunTrigger domain.WorkflowTriggerType -} - type workflowRepository interface { ListEnabledAuto(ctx context.Context) ([]*domain.Workflow, error) GetById(ctx context.Context, id string) (*domain.Workflow, error) @@ -33,30 +24,19 @@ type workflowRunRepository interface { } type WorkflowService struct { - ch chan *workflowRunData - wg sync.WaitGroup - cancel context.CancelFunc + dispatcher *dispatcher.WorkflowDispatcher workflowRepo workflowRepository workflowRunRepo workflowRunRepository } func NewWorkflowService(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository) *WorkflowService { - ctx, cancel := context.WithCancel(context.Background()) - srv := &WorkflowService{ - ch: make(chan *workflowRunData, 1), - cancel: cancel, + dispatcher: dispatcher.GetSingletonDispatcher(workflowRepo, workflowRunRepo), workflowRepo: workflowRepo, workflowRunRepo: workflowRunRepo, } - - srv.wg.Add(defaultRoutines) - for i := 0; i < defaultRoutines; i++ { - go srv.startRun(ctx) - } - return srv } @@ -75,7 +55,6 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error { }) }) if err != nil { - app.GetLogger().Error("failed to add schedule", "err", err) return err } } @@ -86,7 +65,6 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error { func (s *WorkflowService) StartRun(ctx context.Context, req *dtos.WorkflowStartRunReq) error { workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId) if err != nil { - app.GetLogger().Error("failed to get workflow", "id", req.WorkflowId, "err", err) return err } @@ -94,69 +72,10 @@ func (s *WorkflowService) StartRun(ctx context.Context, req *dtos.WorkflowStartR return errors.New("workflow is already pending or running") } - s.ch <- &workflowRunData{ - WorkflowId: workflow.Id, - WorkflowContent: workflow.Content, - RunTrigger: req.RunTrigger, - } - - return nil -} - -func (s *WorkflowService) CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error { - workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId) - if err != nil { - app.GetLogger().Error("failed to get workflow", "id", req.WorkflowId, "err", err) - return err - } - - workflowRun, err := s.workflowRunRepo.GetById(ctx, req.RunId) - if err != nil { - app.GetLogger().Error("failed to get workflow run", "id", req.RunId, "err", err) - return err - } else if workflowRun.WorkflowId != workflow.Id { - return errors.New("workflow run not found") - } else if workflowRun.Status != domain.WorkflowRunStatusTypePending && workflowRun.Status != domain.WorkflowRunStatusTypeRunning { - return errors.New("workflow run is not pending or running") - } - - // TODO: 取消运行,防止因为某些原因意外挂起(如进程被杀死)导致工作流一直处于 running 状态无法重新运行 - // workflowRun.Status = domain.WorkflowRunStatusTypeCanceled - // workflowRun.EndedAt = time.Now() - // if _, err := s.workflowRunRepo.Save(ctx, workflowRun); err != nil { - // return err - // } - - // return nil - - return errors.New("TODO: 尚未实现") -} - -func (s *WorkflowService) Stop(ctx context.Context) { - s.cancel() - s.wg.Wait() -} - -func (s *WorkflowService) startRun(ctx context.Context) { - defer s.wg.Done() - - for { - select { - case data := <-s.ch: - if err := s.startRunWithData(ctx, data); err != nil { - app.GetLogger().Error("failed to run workflow", "id", data.WorkflowId, "err", err) - } - case <-ctx.Done(): - return - } - } -} - -func (s *WorkflowService) startRunWithData(ctx context.Context, data *workflowRunData) error { run := &domain.WorkflowRun{ - WorkflowId: data.WorkflowId, - Status: domain.WorkflowRunStatusTypeRunning, - Trigger: data.RunTrigger, + WorkflowId: workflow.Id, + Status: domain.WorkflowRunStatusTypePending, + Trigger: req.RunTrigger, StartedAt: time.Now(), } if resp, err := s.workflowRunRepo.Save(ctx, run); err != nil { @@ -165,31 +84,35 @@ func (s *WorkflowService) startRunWithData(ctx context.Context, data *workflowRu run = resp } - processor := processor.NewWorkflowProcessor(data.WorkflowId, data.WorkflowContent, run.Id) - if runErr := processor.Process(ctx); runErr != nil { - run.Status = domain.WorkflowRunStatusTypeFailed - run.EndedAt = time.Now() - run.Logs = processor.GetLogs() - run.Error = runErr.Error() - if _, err := s.workflowRunRepo.Save(ctx, run); err != nil { - app.GetLogger().Error("failed to save workflow run", "err", err) - } - - return fmt.Errorf("failed to run workflow: %w", runErr) - } - - run.EndedAt = time.Now() - run.Logs = processor.GetLogs() - run.Error = domain.WorkflowRunLogs(run.Logs).ErrorString() - if run.Error == "" { - run.Status = domain.WorkflowRunStatusTypeSucceeded - } else { - run.Status = domain.WorkflowRunStatusTypeFailed - } - if _, err := s.workflowRunRepo.Save(ctx, run); err != nil { - app.GetLogger().Error("failed to save workflow run", "err", err) - return err - } + s.dispatcher.Dispatch(&dispatcher.WorkflowWorkerData{ + WorkflowId: workflow.Id, + WorkflowContent: workflow.Content, + RunId: run.Id, + }) return nil } + +func (s *WorkflowService) CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error { + workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId) + if err != nil { + return err + } + + workflowRun, err := s.workflowRunRepo.GetById(ctx, req.RunId) + if err != nil { + return err + } else if workflowRun.WorkflowId != workflow.Id { + return errors.New("workflow run not found") + } else if workflowRun.Status != domain.WorkflowRunStatusTypePending && workflowRun.Status != domain.WorkflowRunStatusTypeRunning { + return errors.New("workflow run is not pending or running") + } + + s.dispatcher.Cancel(workflowRun.Id) + + return nil +} + +func (s *WorkflowService) Shutdown(ctx context.Context) { + s.dispatcher.Shutdown() +} diff --git a/ui/src/components/workflow/WorkflowRuns.tsx b/ui/src/components/workflow/WorkflowRuns.tsx index c90ab5a3..e670e102 100644 --- a/ui/src/components/workflow/WorkflowRuns.tsx +++ b/ui/src/components/workflow/WorkflowRuns.tsx @@ -5,9 +5,9 @@ import { ClockCircleOutlined as ClockCircleOutlinedIcon, CloseCircleOutlined as CloseCircleOutlinedIcon, DeleteOutlined as DeleteOutlinedIcon, - PauseCircleOutlined as PauseCircleOutlinedIcon, PauseOutlined as PauseOutlinedIcon, SelectOutlined as SelectOutlinedIcon, + StopOutlined as StopOutlinedIcon, SyncOutlined as SyncOutlinedIcon, } from "@ant-design/icons"; import { useRequest } from "ahooks"; @@ -75,7 +75,7 @@ const WorkflowRuns = ({ className, style, workflowId }: WorkflowRunsProps) => { ); } else if (record.status === WORKFLOW_RUN_STATUSES.CANCELED) { return ( - } color="warning"> + } color="warning"> {t("workflow_run.props.status.canceled")} ); diff --git a/ui/src/pages/dashboard/Dashboard.tsx b/ui/src/pages/dashboard/Dashboard.tsx index d80c8c10..b5c48cd2 100644 --- a/ui/src/pages/dashboard/Dashboard.tsx +++ b/ui/src/pages/dashboard/Dashboard.tsx @@ -7,10 +7,10 @@ import { ClockCircleOutlined as ClockCircleOutlinedIcon, CloseCircleOutlined as CloseCircleOutlinedIcon, LockOutlined as LockOutlinedIcon, - PauseCircleOutlined as PauseCircleOutlinedIcon, PlusOutlined as PlusOutlinedIcon, SelectOutlined as SelectOutlinedIcon, SendOutlined as SendOutlinedIcon, + StopOutlined as StopOutlinedIcon, SyncOutlined as SyncOutlinedIcon, } from "@ant-design/icons"; import { PageHeader } from "@ant-design/pro-components"; @@ -89,7 +89,6 @@ const Dashboard = () => { const workflow = record.expand?.workflowId; return ( { if (workflow) { @@ -129,7 +128,7 @@ const Dashboard = () => { ); } else if (record.status === WORKFLOW_RUN_STATUSES.CANCELED) { return ( - } color="warning"> + } color="warning"> {t("workflow_run.props.status.canceled")} ); diff --git a/ui/src/pages/workflows/WorkflowDetail.tsx b/ui/src/pages/workflows/WorkflowDetail.tsx index 4a2f8022..7db866e3 100644 --- a/ui/src/pages/workflows/WorkflowDetail.tsx +++ b/ui/src/pages/workflows/WorkflowDetail.tsx @@ -42,7 +42,6 @@ const WorkflowDetail = () => { useZustandShallowSelector(["workflow", "initialized", "init", "destroy", "setEnabled", "release", "discard"]) ); useEffect(() => { - // TODO: loading & error workflowState.init(workflowId!); return () => { @@ -52,7 +51,7 @@ const WorkflowDetail = () => { const [tabValue, setTabValue] = useState<"orchestration" | "runs">("orchestration"); - const [isRunning, setIsRunning] = useState(false); + const [isPendingOrRunning, setIsPendingOrRunning] = useState(false); const lastRunStatus = useMemo(() => workflow.lastRunStatus, [workflow]); const [allowDiscard, setAllowDiscard] = useState(false); @@ -60,14 +59,14 @@ const WorkflowDetail = () => { const [allowRun, setAllowRun] = useState(false); useEffect(() => { - setIsRunning(lastRunStatus == WORKFLOW_RUN_STATUSES.PENDING || lastRunStatus == WORKFLOW_RUN_STATUSES.RUNNING); + setIsPendingOrRunning(lastRunStatus == WORKFLOW_RUN_STATUSES.PENDING || lastRunStatus == WORKFLOW_RUN_STATUSES.RUNNING); }, [lastRunStatus]); useEffect(() => { - if (!!workflowId && isRunning) { + if (!!workflowId && isPendingOrRunning) { subscribeWorkflow(workflowId, (e) => { if (e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.PENDING && e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.RUNNING) { - setIsRunning(false); + setIsPendingOrRunning(false); unsubscribeWorkflow(workflowId); } }); @@ -76,15 +75,15 @@ const WorkflowDetail = () => { unsubscribeWorkflow(workflowId); }; } - }, [workflowId, isRunning]); + }, [workflowId, isPendingOrRunning]); useEffect(() => { const hasReleased = !!workflow.content; const hasChanges = workflow.hasDraft! || !isEqual(workflow.draft, workflow.content); - setAllowDiscard(!isRunning && hasReleased && hasChanges); - setAllowRelease(!isRunning && hasChanges); + setAllowDiscard(!isPendingOrRunning && hasReleased && hasChanges); + setAllowRelease(!isPendingOrRunning && hasChanges); setAllowRun(hasReleased); - }, [workflow.content, workflow.draft, workflow.hasDraft, isRunning]); + }, [workflow.content, workflow.draft, workflow.hasDraft, isPendingOrRunning]); const handleEnableChange = async () => { if (!workflow.enabled && (!workflow.content || !isAllNodesValidated(workflow.content))) { @@ -174,12 +173,12 @@ const WorkflowDetail = () => { let unsubscribeFn: Awaited> | undefined = undefined; try { - setIsRunning(true); + setIsPendingOrRunning(true); // subscribe before running workflow unsubscribeFn = await subscribeWorkflow(workflowId!, (e) => { if (e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.PENDING && e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.RUNNING) { - setIsRunning(false); + setIsPendingOrRunning(false); unsubscribeFn?.(); } }); @@ -188,7 +187,7 @@ const WorkflowDetail = () => { messageApi.info(t("workflow.detail.orchestration.action.run.prompt")); } catch (err) { - setIsRunning(false); + setIsPendingOrRunning(false); unsubscribeFn?.(); console.error(err); @@ -279,7 +278,7 @@ const WorkflowDetail = () => {
- diff --git a/ui/src/pages/workflows/WorkflowList.tsx b/ui/src/pages/workflows/WorkflowList.tsx index 29dad70d..c0241413 100644 --- a/ui/src/pages/workflows/WorkflowList.tsx +++ b/ui/src/pages/workflows/WorkflowList.tsx @@ -7,8 +7,8 @@ import { CloseCircleOutlined as CloseCircleOutlinedIcon, DeleteOutlined as DeleteOutlinedIcon, EditOutlined as EditOutlinedIcon, - PauseCircleOutlined as PauseCircleOutlinedIcon, PlusOutlined as PlusOutlinedIcon, + StopOutlined as StopOutlinedIcon, SyncOutlined as SyncOutlinedIcon, } from "@ant-design/icons"; @@ -170,7 +170,7 @@ const WorkflowList = () => { } else if (record.lastRunStatus === WORKFLOW_RUN_STATUSES.FAILED) { icon = ; } else if (record.lastRunStatus === WORKFLOW_RUN_STATUSES.CANCELED) { - icon = ; + icon = ; } return (