diff --git a/internal/repository/workflow_run.go b/internal/repository/workflow_run.go
index 01185a45..b1a5234b 100644
--- a/internal/repository/workflow_run.go
+++ b/internal/repository/workflow_run.go
@@ -69,6 +69,13 @@ func (r *WorkflowRunRepository) Save(ctx context.Context, workflowRun *domain.Wo
workflowRecord, err := txApp.FindRecordById(domain.CollectionNameWorkflow, workflowRun.WorkflowId)
if err != nil {
return err
+ } else if workflowRun.Id == workflowRecord.GetString("lastRunId") {
+ workflowRecord.IgnoreUnchangedFields(true)
+ workflowRecord.Set("lastRunStatus", record.GetString("status"))
+ err = txApp.Save(workflowRecord)
+ if err != nil {
+ return err
+ }
} else if workflowRecord.GetDateTime("lastRunTime").Time().IsZero() || workflowRun.StartedAt.After(workflowRecord.GetDateTime("lastRunTime").Time()) {
workflowRecord.IgnoreUnchangedFields(true)
workflowRecord.Set("lastRunId", record.Id)
diff --git a/internal/rest/handlers/workflow.go b/internal/rest/handlers/workflow.go
index 83b3302b..bad474f0 100644
--- a/internal/rest/handlers/workflow.go
+++ b/internal/rest/handlers/workflow.go
@@ -13,7 +13,7 @@ import (
type workflowService interface {
StartRun(ctx context.Context, req *dtos.WorkflowStartRunReq) error
CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error
- Stop(ctx context.Context)
+ Shutdown(ctx context.Context)
}
type WorkflowHandler struct {
diff --git a/internal/rest/routes/routes.go b/internal/rest/routes/routes.go
index 756760da..87fcb297 100644
--- a/internal/rest/routes/routes.go
+++ b/internal/rest/routes/routes.go
@@ -46,6 +46,6 @@ func Register(router *router.Router[*core.RequestEvent]) {
func Unregister() {
if workflowSvc != nil {
- workflowSvc.Stop(context.Background())
+ workflowSvc.Shutdown(context.Background())
}
}
diff --git a/internal/workflow/dispatcher/dispatcher.go b/internal/workflow/dispatcher/dispatcher.go
new file mode 100644
index 00000000..0ecc0828
--- /dev/null
+++ b/internal/workflow/dispatcher/dispatcher.go
@@ -0,0 +1,274 @@
+package dispatcher
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/usual2970/certimate/internal/app"
+ "github.com/usual2970/certimate/internal/domain"
+ "github.com/usual2970/certimate/internal/pkg/utils/slices"
+)
+
+var maxWorkers = 16
+
+func init() {
+ envMaxWorkers := os.Getenv("CERTIMATE_WORKFLOW_MAX_WORKERS")
+ if n, err := strconv.Atoi(envMaxWorkers); err != nil && n > 0 {
+ maxWorkers = n
+ }
+}
+
+type workflowWorker struct {
+ Data *WorkflowWorkerData
+ Cancel context.CancelFunc
+}
+
+type WorkflowWorkerData struct {
+ WorkflowId string
+ WorkflowContent *domain.WorkflowNode
+ RunId string
+}
+
+type WorkflowDispatcher struct {
+ semaphore chan struct{}
+
+ queue []*WorkflowWorkerData
+ queueMutex sync.Mutex
+
+ workers map[string]*workflowWorker // key: WorkflowId
+ workerIdMap map[string]string // key: RunId, value: WorkflowId
+ workerMutex sync.Mutex
+
+ chWork chan *WorkflowWorkerData
+ chCandi chan struct{}
+
+ wg sync.WaitGroup
+
+ workflowRepo workflowRepository
+ workflowRunRepo workflowRunRepository
+}
+
+func newWorkflowDispatcher(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository) *WorkflowDispatcher {
+ dispatcher := &WorkflowDispatcher{
+ semaphore: make(chan struct{}, maxWorkers),
+
+ queue: make([]*WorkflowWorkerData, 0),
+ queueMutex: sync.Mutex{},
+
+ workers: make(map[string]*workflowWorker),
+ workerIdMap: make(map[string]string),
+ workerMutex: sync.Mutex{},
+
+ chWork: make(chan *WorkflowWorkerData),
+ chCandi: make(chan struct{}, 1),
+
+ workflowRepo: workflowRepo,
+ workflowRunRepo: workflowRunRepo,
+ }
+
+ go func() {
+ for {
+ select {
+ case <-dispatcher.chWork:
+ dispatcher.dequeueWorker()
+
+ case <-dispatcher.chCandi:
+ dispatcher.dequeueWorker()
+ }
+ }
+ }()
+
+ return dispatcher
+}
+
+func (w *WorkflowDispatcher) Dispatch(data *WorkflowWorkerData) {
+ if data == nil {
+ panic("worker data is nil")
+ }
+
+ w.enqueueWorker(data)
+ select {
+ case w.chWork <- data:
+ default:
+ }
+}
+
+func (w *WorkflowDispatcher) Cancel(runId string) {
+ hasWorker := false
+
+ // 取消正在执行的 WorkflowRun
+ w.workerMutex.Lock()
+ if workflowId, ok := w.workerIdMap[runId]; ok {
+ if worker, ok := w.workers[workflowId]; ok {
+ hasWorker = true
+ worker.Cancel()
+ delete(w.workers, workflowId)
+ delete(w.workerIdMap, runId)
+ }
+ }
+ w.workerMutex.Unlock()
+
+ // 移除排队中的 WorkflowRun
+ w.queueMutex.Lock()
+ w.queue = slices.Filter(w.queue, func(d *WorkflowWorkerData) bool {
+ return d.RunId != runId
+ })
+ w.queueMutex.Unlock()
+
+ // 已挂起,查询 WorkflowRun 并更新其状态为 Canceled
+ if !hasWorker {
+ if run, err := w.workflowRunRepo.GetById(context.Background(), runId); err == nil {
+ if run.Status == domain.WorkflowRunStatusTypePending || run.Status == domain.WorkflowRunStatusTypeRunning {
+ run.Status = domain.WorkflowRunStatusTypeCanceled
+ w.workflowRunRepo.Save(context.Background(), run)
+ }
+ }
+ }
+}
+
+func (w *WorkflowDispatcher) Shutdown() {
+ // 清空排队中的 WorkflowRun
+ w.queueMutex.Lock()
+ w.queue = make([]*WorkflowWorkerData, 0)
+ w.queueMutex.Unlock()
+
+ // 等待所有正在执行的 WorkflowRun 完成
+ w.wg.Wait()
+ w.workers = make(map[string]*workflowWorker)
+ w.workerIdMap = make(map[string]string)
+}
+
+func (w *WorkflowDispatcher) enqueueWorker(data *WorkflowWorkerData) {
+ w.queueMutex.Lock()
+ defer w.queueMutex.Unlock()
+ w.queue = append(w.queue, data)
+}
+
+func (w *WorkflowDispatcher) dequeueWorker() {
+ for {
+ select {
+ case w.semaphore <- struct{}{}:
+ default:
+ // 达到最大并发数
+ return
+ }
+
+ w.queueMutex.Lock()
+ if len(w.queue) == 0 {
+ w.queueMutex.Unlock()
+ <-w.semaphore
+ return
+ }
+
+ data := w.queue[0]
+ w.queue = w.queue[1:]
+ w.queueMutex.Unlock()
+
+ // 检查是否有相同 WorkflowId 的 WorkflowRun 正在执行
+ // 如果有,则重新排队,以保证同一个工作流同一时间内只有一个正在执行
+ // 即不同 WorkflowId 的任务并行化,相同 WorkflowId 的任务串行化
+ w.workerMutex.Lock()
+ if _, exists := w.workers[data.WorkflowId]; exists {
+ w.queueMutex.Lock()
+ w.queue = append(w.queue, data)
+ w.queueMutex.Unlock()
+ w.workerMutex.Unlock()
+
+ <-w.semaphore
+
+ continue
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ w.workers[data.WorkflowId] = &workflowWorker{data, cancel}
+ w.workerIdMap[data.RunId] = data.WorkflowId
+ w.workerMutex.Unlock()
+
+ w.wg.Add(1)
+ go w.work(ctx, data)
+ }
+}
+
+func (w *WorkflowDispatcher) work(ctx context.Context, data *WorkflowWorkerData) {
+ defer func() {
+ <-w.semaphore
+ w.workerMutex.Lock()
+ delete(w.workers, data.WorkflowId)
+ delete(w.workerIdMap, data.RunId)
+ w.workerMutex.Unlock()
+
+ w.wg.Done()
+
+ // 尝试取出排队中的其他 WorkflowRun 继续执行
+ select {
+ case w.chCandi <- struct{}{}:
+ default:
+ }
+ }()
+
+ // 查询 WorkflowRun
+ run, err := w.workflowRunRepo.GetById(ctx, data.RunId)
+ if err != nil {
+ if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
+ app.GetLogger().Error(fmt.Sprintf("failed to get workflow run #%s", data.RunId), "err", err)
+ }
+ return
+ } else if run.Status != domain.WorkflowRunStatusTypePending {
+ return
+ } else if ctx.Err() != nil {
+ run.Status = domain.WorkflowRunStatusTypeCanceled
+ w.workflowRunRepo.Save(ctx, run)
+ return
+ }
+
+ // 更新 WorkflowRun 状态为 Running
+ run.Status = domain.WorkflowRunStatusTypeRunning
+ if _, err := w.workflowRunRepo.Save(ctx, run); err != nil {
+ if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
+ panic(err)
+ }
+ return
+ }
+
+ // 执行工作流
+ invoker := newWorkflowInvoker(data)
+ if runErr := invoker.Invoke(ctx); runErr != nil {
+ if errors.Is(runErr, context.Canceled) {
+ run.Status = domain.WorkflowRunStatusTypeCanceled
+ run.Logs = invoker.GetLogs()
+ } else {
+ run.Status = domain.WorkflowRunStatusTypeFailed
+ run.EndedAt = time.Now()
+ run.Logs = invoker.GetLogs()
+ run.Error = runErr.Error()
+ }
+
+ if _, err := w.workflowRunRepo.Save(ctx, run); err != nil {
+ if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
+ panic(err)
+ }
+ }
+
+ return
+ }
+
+ // 更新 WorkflowRun 状态为 Succeeded/Failed
+ run.EndedAt = time.Now()
+ run.Logs = invoker.GetLogs()
+ run.Error = domain.WorkflowRunLogs(invoker.GetLogs()).ErrorString()
+ if run.Error == "" {
+ run.Status = domain.WorkflowRunStatusTypeSucceeded
+ } else {
+ run.Status = domain.WorkflowRunStatusTypeFailed
+ }
+ if _, err := w.workflowRunRepo.Save(ctx, run); err != nil {
+ if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
+ panic(err)
+ }
+ }
+}
diff --git a/internal/workflow/dispatcher/invoker.go b/internal/workflow/dispatcher/invoker.go
new file mode 100644
index 00000000..3033314e
--- /dev/null
+++ b/internal/workflow/dispatcher/invoker.go
@@ -0,0 +1,104 @@
+package dispatcher
+
+import (
+ "context"
+ "errors"
+
+ "github.com/usual2970/certimate/internal/domain"
+ nodes "github.com/usual2970/certimate/internal/workflow/node-processor"
+)
+
+type workflowInvoker struct {
+ workflowId string
+ workflowContent *domain.WorkflowNode
+ runId string
+ runLogs []domain.WorkflowRunLog
+}
+
+func newWorkflowInvoker(data *WorkflowWorkerData) *workflowInvoker {
+ if data == nil {
+ panic("worker data is nil")
+ }
+
+ return &workflowInvoker{
+ workflowId: data.WorkflowId,
+ workflowContent: data.WorkflowContent,
+ runId: data.RunId,
+ runLogs: make([]domain.WorkflowRunLog, 0),
+ }
+}
+
+func (w *workflowInvoker) Invoke(ctx context.Context) error {
+ ctx = context.WithValue(ctx, "workflow_id", w.workflowId)
+ ctx = context.WithValue(ctx, "workflow_run_id", w.runId)
+ return w.processNode(ctx, w.workflowContent)
+}
+
+func (w *workflowInvoker) GetLogs() []domain.WorkflowRunLog {
+ return w.runLogs
+}
+
+func (w *workflowInvoker) processNode(ctx context.Context, node *domain.WorkflowNode) error {
+ current := node
+ for current != nil {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+
+ if current.Type == domain.WorkflowNodeTypeBranch || current.Type == domain.WorkflowNodeTypeExecuteResultBranch {
+ for _, branch := range current.Branches {
+ if err := w.processNode(ctx, &branch); err != nil {
+ // 并行分支的某一分支发生错误时,忽略此错误,继续执行其他分支
+ if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
+ continue
+ }
+ return err
+ }
+ }
+ }
+
+ var processor nodes.NodeProcessor
+ var procErr error
+ for {
+ if current.Type != domain.WorkflowNodeTypeBranch && current.Type != domain.WorkflowNodeTypeExecuteResultBranch {
+ processor, procErr = nodes.GetProcessor(current)
+ if procErr != nil {
+ break
+ }
+
+ procErr = processor.Process(ctx)
+ log := processor.GetLog(ctx)
+ if log != nil {
+ w.runLogs = append(w.runLogs, *log)
+ }
+ if procErr != nil {
+ break
+ }
+ }
+
+ break
+ }
+
+ // TODO: 优化可读性
+ if procErr != nil && current.Next != nil && current.Next.Type != domain.WorkflowNodeTypeExecuteResultBranch {
+ return procErr
+ } else if procErr != nil && current.Next != nil && current.Next.Type == domain.WorkflowNodeTypeExecuteResultBranch {
+ current = w.getBranchByType(current.Next.Branches, domain.WorkflowNodeTypeExecuteFailure)
+ } else if procErr == nil && current.Next != nil && current.Next.Type == domain.WorkflowNodeTypeExecuteResultBranch {
+ current = w.getBranchByType(current.Next.Branches, domain.WorkflowNodeTypeExecuteSuccess)
+ } else {
+ current = current.Next
+ }
+ }
+
+ return nil
+}
+
+func (w *workflowInvoker) getBranchByType(branches []domain.WorkflowNode, nodeType domain.WorkflowNodeType) *domain.WorkflowNode {
+ for _, branch := range branches {
+ if branch.Type == nodeType {
+ return &branch
+ }
+ }
+ return nil
+}
diff --git a/internal/workflow/dispatcher/singleton.go b/internal/workflow/dispatcher/singleton.go
new file mode 100644
index 00000000..37c34f56
--- /dev/null
+++ b/internal/workflow/dispatcher/singleton.go
@@ -0,0 +1,31 @@
+package dispatcher
+
+import (
+ "context"
+ "sync"
+
+ "github.com/usual2970/certimate/internal/domain"
+)
+
+type workflowRepository interface {
+ GetById(ctx context.Context, id string) (*domain.Workflow, error)
+ Save(ctx context.Context, workflow *domain.Workflow) (*domain.Workflow, error)
+}
+
+type workflowRunRepository interface {
+ GetById(ctx context.Context, id string) (*domain.WorkflowRun, error)
+ Save(ctx context.Context, workflowRun *domain.WorkflowRun) (*domain.WorkflowRun, error)
+}
+
+var (
+ instance *WorkflowDispatcher
+ intanceOnce sync.Once
+)
+
+func GetSingletonDispatcher(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository) *WorkflowDispatcher {
+ intanceOnce.Do(func() {
+ instance = newWorkflowDispatcher(workflowRepo, workflowRunRepo)
+ })
+
+ return instance
+}
diff --git a/internal/workflow/node-processor/apply_node.go b/internal/workflow/node-processor/apply_node.go
index 39a0167f..d75486ea 100644
--- a/internal/workflow/node-processor/apply_node.go
+++ b/internal/workflow/node-processor/apply_node.go
@@ -80,7 +80,6 @@ func (n *applyNode) Process(ctx context.Context) error {
certificate.PopulateFromX509(certX509)
// 保存执行结果
- // TODO: 先保持一个节点始终只有一个输出,后续增加版本控制
output := &domain.WorkflowOutput{
WorkflowId: getContextWorkflowId(ctx),
RunId: getContextWorkflowRunId(ctx),
@@ -124,7 +123,7 @@ func (n *applyNode) checkCanSkip(ctx context.Context, lastOutput *domain.Workflo
renewalInterval := time.Duration(currentNodeConfig.SkipBeforeExpiryDays) * time.Hour * 24
expirationTime := time.Until(lastCertificate.ExpireAt)
if expirationTime > renewalInterval {
- return true, fmt.Sprintf("已申请过证书,且证书尚未临近过期(到期尚余 %d 天,预计不足 %d 天时续期)", int(expirationTime.Hours()/24), currentNodeConfig.SkipBeforeExpiryDays)
+ return true, fmt.Sprintf("已申请过证书,且证书尚未临近过期(尚余 %d 天过期,不足 %d 天时续期)", int(expirationTime.Hours()/24), currentNodeConfig.SkipBeforeExpiryDays)
}
}
}
diff --git a/internal/workflow/processor/processor.go b/internal/workflow/processor/processor.go
deleted file mode 100644
index 39486419..00000000
--- a/internal/workflow/processor/processor.go
+++ /dev/null
@@ -1,90 +0,0 @@
-package processor
-
-import (
- "context"
-
- "github.com/usual2970/certimate/internal/domain"
- nodes "github.com/usual2970/certimate/internal/workflow/node-processor"
-)
-
-type workflowProcessor struct {
- workflowId string
- workflowContent *domain.WorkflowNode
- runId string
- runLogs []domain.WorkflowRunLog
-}
-
-func NewWorkflowProcessor(workflowId string, workflowContent *domain.WorkflowNode, workflowRunId string) *workflowProcessor {
- return &workflowProcessor{
- workflowId: workflowId,
- workflowContent: workflowContent,
- runId: workflowRunId,
- runLogs: make([]domain.WorkflowRunLog, 0),
- }
-}
-
-func (w *workflowProcessor) Process(ctx context.Context) error {
- ctx = context.WithValue(ctx, "workflow_id", w.workflowId)
- ctx = context.WithValue(ctx, "workflow_run_id", w.runId)
- return w.processNode(ctx, w.workflowContent)
-}
-
-func (w *workflowProcessor) GetLogs() []domain.WorkflowRunLog {
- return w.runLogs
-}
-
-func (w *workflowProcessor) processNode(ctx context.Context, node *domain.WorkflowNode) error {
- current := node
- for current != nil {
- if current.Type == domain.WorkflowNodeTypeBranch || current.Type == domain.WorkflowNodeTypeExecuteResultBranch {
- for _, branch := range current.Branches {
- if err := w.processNode(ctx, &branch); err != nil {
- continue
- }
- }
- }
-
- var processor nodes.NodeProcessor
- var runErr error
- for {
- if current.Type != domain.WorkflowNodeTypeBranch && current.Type != domain.WorkflowNodeTypeExecuteResultBranch {
- processor, runErr = nodes.GetProcessor(current)
- if runErr != nil {
- break
- }
-
- runErr = processor.Process(ctx)
- log := processor.GetLog(ctx)
- if log != nil {
- w.runLogs = append(w.runLogs, *log)
- }
- if runErr != nil {
- break
- }
- }
-
- break
- }
-
- if runErr != nil && current.Next != nil && current.Next.Type != domain.WorkflowNodeTypeExecuteResultBranch {
- return runErr
- } else if runErr != nil && current.Next != nil && current.Next.Type == domain.WorkflowNodeTypeExecuteResultBranch {
- current = w.getBranchByType(current.Next.Branches, domain.WorkflowNodeTypeExecuteFailure)
- } else if runErr == nil && current.Next != nil && current.Next.Type == domain.WorkflowNodeTypeExecuteResultBranch {
- current = w.getBranchByType(current.Next.Branches, domain.WorkflowNodeTypeExecuteSuccess)
- } else {
- current = current.Next
- }
- }
-
- return nil
-}
-
-func (w *workflowProcessor) getBranchByType(branches []domain.WorkflowNode, nodeType domain.WorkflowNodeType) *domain.WorkflowNode {
- for _, branch := range branches {
- if branch.Type == nodeType {
- return &branch
- }
- }
- return nil
-}
diff --git a/internal/workflow/service.go b/internal/workflow/service.go
index 8181c969..2d0a224d 100644
--- a/internal/workflow/service.go
+++ b/internal/workflow/service.go
@@ -4,23 +4,14 @@ import (
"context"
"errors"
"fmt"
- "sync"
"time"
"github.com/usual2970/certimate/internal/app"
"github.com/usual2970/certimate/internal/domain"
"github.com/usual2970/certimate/internal/domain/dtos"
- processor "github.com/usual2970/certimate/internal/workflow/processor"
+ "github.com/usual2970/certimate/internal/workflow/dispatcher"
)
-const defaultRoutines = 16
-
-type workflowRunData struct {
- WorkflowId string
- WorkflowContent *domain.WorkflowNode
- RunTrigger domain.WorkflowTriggerType
-}
-
type workflowRepository interface {
ListEnabledAuto(ctx context.Context) ([]*domain.Workflow, error)
GetById(ctx context.Context, id string) (*domain.Workflow, error)
@@ -33,30 +24,19 @@ type workflowRunRepository interface {
}
type WorkflowService struct {
- ch chan *workflowRunData
- wg sync.WaitGroup
- cancel context.CancelFunc
+ dispatcher *dispatcher.WorkflowDispatcher
workflowRepo workflowRepository
workflowRunRepo workflowRunRepository
}
func NewWorkflowService(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository) *WorkflowService {
- ctx, cancel := context.WithCancel(context.Background())
-
srv := &WorkflowService{
- ch: make(chan *workflowRunData, 1),
- cancel: cancel,
+ dispatcher: dispatcher.GetSingletonDispatcher(workflowRepo, workflowRunRepo),
workflowRepo: workflowRepo,
workflowRunRepo: workflowRunRepo,
}
-
- srv.wg.Add(defaultRoutines)
- for i := 0; i < defaultRoutines; i++ {
- go srv.startRun(ctx)
- }
-
return srv
}
@@ -75,7 +55,6 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error {
})
})
if err != nil {
- app.GetLogger().Error("failed to add schedule", "err", err)
return err
}
}
@@ -86,7 +65,6 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error {
func (s *WorkflowService) StartRun(ctx context.Context, req *dtos.WorkflowStartRunReq) error {
workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId)
if err != nil {
- app.GetLogger().Error("failed to get workflow", "id", req.WorkflowId, "err", err)
return err
}
@@ -94,69 +72,10 @@ func (s *WorkflowService) StartRun(ctx context.Context, req *dtos.WorkflowStartR
return errors.New("workflow is already pending or running")
}
- s.ch <- &workflowRunData{
- WorkflowId: workflow.Id,
- WorkflowContent: workflow.Content,
- RunTrigger: req.RunTrigger,
- }
-
- return nil
-}
-
-func (s *WorkflowService) CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error {
- workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId)
- if err != nil {
- app.GetLogger().Error("failed to get workflow", "id", req.WorkflowId, "err", err)
- return err
- }
-
- workflowRun, err := s.workflowRunRepo.GetById(ctx, req.RunId)
- if err != nil {
- app.GetLogger().Error("failed to get workflow run", "id", req.RunId, "err", err)
- return err
- } else if workflowRun.WorkflowId != workflow.Id {
- return errors.New("workflow run not found")
- } else if workflowRun.Status != domain.WorkflowRunStatusTypePending && workflowRun.Status != domain.WorkflowRunStatusTypeRunning {
- return errors.New("workflow run is not pending or running")
- }
-
- // TODO: 取消运行,防止因为某些原因意外挂起(如进程被杀死)导致工作流一直处于 running 状态无法重新运行
- // workflowRun.Status = domain.WorkflowRunStatusTypeCanceled
- // workflowRun.EndedAt = time.Now()
- // if _, err := s.workflowRunRepo.Save(ctx, workflowRun); err != nil {
- // return err
- // }
-
- // return nil
-
- return errors.New("TODO: 尚未实现")
-}
-
-func (s *WorkflowService) Stop(ctx context.Context) {
- s.cancel()
- s.wg.Wait()
-}
-
-func (s *WorkflowService) startRun(ctx context.Context) {
- defer s.wg.Done()
-
- for {
- select {
- case data := <-s.ch:
- if err := s.startRunWithData(ctx, data); err != nil {
- app.GetLogger().Error("failed to run workflow", "id", data.WorkflowId, "err", err)
- }
- case <-ctx.Done():
- return
- }
- }
-}
-
-func (s *WorkflowService) startRunWithData(ctx context.Context, data *workflowRunData) error {
run := &domain.WorkflowRun{
- WorkflowId: data.WorkflowId,
- Status: domain.WorkflowRunStatusTypeRunning,
- Trigger: data.RunTrigger,
+ WorkflowId: workflow.Id,
+ Status: domain.WorkflowRunStatusTypePending,
+ Trigger: req.RunTrigger,
StartedAt: time.Now(),
}
if resp, err := s.workflowRunRepo.Save(ctx, run); err != nil {
@@ -165,31 +84,35 @@ func (s *WorkflowService) startRunWithData(ctx context.Context, data *workflowRu
run = resp
}
- processor := processor.NewWorkflowProcessor(data.WorkflowId, data.WorkflowContent, run.Id)
- if runErr := processor.Process(ctx); runErr != nil {
- run.Status = domain.WorkflowRunStatusTypeFailed
- run.EndedAt = time.Now()
- run.Logs = processor.GetLogs()
- run.Error = runErr.Error()
- if _, err := s.workflowRunRepo.Save(ctx, run); err != nil {
- app.GetLogger().Error("failed to save workflow run", "err", err)
- }
-
- return fmt.Errorf("failed to run workflow: %w", runErr)
- }
-
- run.EndedAt = time.Now()
- run.Logs = processor.GetLogs()
- run.Error = domain.WorkflowRunLogs(run.Logs).ErrorString()
- if run.Error == "" {
- run.Status = domain.WorkflowRunStatusTypeSucceeded
- } else {
- run.Status = domain.WorkflowRunStatusTypeFailed
- }
- if _, err := s.workflowRunRepo.Save(ctx, run); err != nil {
- app.GetLogger().Error("failed to save workflow run", "err", err)
- return err
- }
+ s.dispatcher.Dispatch(&dispatcher.WorkflowWorkerData{
+ WorkflowId: workflow.Id,
+ WorkflowContent: workflow.Content,
+ RunId: run.Id,
+ })
return nil
}
+
+func (s *WorkflowService) CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error {
+ workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId)
+ if err != nil {
+ return err
+ }
+
+ workflowRun, err := s.workflowRunRepo.GetById(ctx, req.RunId)
+ if err != nil {
+ return err
+ } else if workflowRun.WorkflowId != workflow.Id {
+ return errors.New("workflow run not found")
+ } else if workflowRun.Status != domain.WorkflowRunStatusTypePending && workflowRun.Status != domain.WorkflowRunStatusTypeRunning {
+ return errors.New("workflow run is not pending or running")
+ }
+
+ s.dispatcher.Cancel(workflowRun.Id)
+
+ return nil
+}
+
+func (s *WorkflowService) Shutdown(ctx context.Context) {
+ s.dispatcher.Shutdown()
+}
diff --git a/ui/src/components/workflow/WorkflowRuns.tsx b/ui/src/components/workflow/WorkflowRuns.tsx
index c90ab5a3..e670e102 100644
--- a/ui/src/components/workflow/WorkflowRuns.tsx
+++ b/ui/src/components/workflow/WorkflowRuns.tsx
@@ -5,9 +5,9 @@ import {
ClockCircleOutlined as ClockCircleOutlinedIcon,
CloseCircleOutlined as CloseCircleOutlinedIcon,
DeleteOutlined as DeleteOutlinedIcon,
- PauseCircleOutlined as PauseCircleOutlinedIcon,
PauseOutlined as PauseOutlinedIcon,
SelectOutlined as SelectOutlinedIcon,
+ StopOutlined as StopOutlinedIcon,
SyncOutlined as SyncOutlinedIcon,
} from "@ant-design/icons";
import { useRequest } from "ahooks";
@@ -75,7 +75,7 @@ const WorkflowRuns = ({ className, style, workflowId }: WorkflowRunsProps) => {
);
} else if (record.status === WORKFLOW_RUN_STATUSES.CANCELED) {
return (
- } color="warning">
+ } color="warning">
{t("workflow_run.props.status.canceled")}
);
diff --git a/ui/src/pages/dashboard/Dashboard.tsx b/ui/src/pages/dashboard/Dashboard.tsx
index d80c8c10..b5c48cd2 100644
--- a/ui/src/pages/dashboard/Dashboard.tsx
+++ b/ui/src/pages/dashboard/Dashboard.tsx
@@ -7,10 +7,10 @@ import {
ClockCircleOutlined as ClockCircleOutlinedIcon,
CloseCircleOutlined as CloseCircleOutlinedIcon,
LockOutlined as LockOutlinedIcon,
- PauseCircleOutlined as PauseCircleOutlinedIcon,
PlusOutlined as PlusOutlinedIcon,
SelectOutlined as SelectOutlinedIcon,
SendOutlined as SendOutlinedIcon,
+ StopOutlined as StopOutlinedIcon,
SyncOutlined as SyncOutlinedIcon,
} from "@ant-design/icons";
import { PageHeader } from "@ant-design/pro-components";
@@ -89,7 +89,6 @@ const Dashboard = () => {
const workflow = record.expand?.workflowId;
return (
{
if (workflow) {
@@ -129,7 +128,7 @@ const Dashboard = () => {
);
} else if (record.status === WORKFLOW_RUN_STATUSES.CANCELED) {
return (
- } color="warning">
+ } color="warning">
{t("workflow_run.props.status.canceled")}
);
diff --git a/ui/src/pages/workflows/WorkflowDetail.tsx b/ui/src/pages/workflows/WorkflowDetail.tsx
index 4a2f8022..7db866e3 100644
--- a/ui/src/pages/workflows/WorkflowDetail.tsx
+++ b/ui/src/pages/workflows/WorkflowDetail.tsx
@@ -42,7 +42,6 @@ const WorkflowDetail = () => {
useZustandShallowSelector(["workflow", "initialized", "init", "destroy", "setEnabled", "release", "discard"])
);
useEffect(() => {
- // TODO: loading & error
workflowState.init(workflowId!);
return () => {
@@ -52,7 +51,7 @@ const WorkflowDetail = () => {
const [tabValue, setTabValue] = useState<"orchestration" | "runs">("orchestration");
- const [isRunning, setIsRunning] = useState(false);
+ const [isPendingOrRunning, setIsPendingOrRunning] = useState(false);
const lastRunStatus = useMemo(() => workflow.lastRunStatus, [workflow]);
const [allowDiscard, setAllowDiscard] = useState(false);
@@ -60,14 +59,14 @@ const WorkflowDetail = () => {
const [allowRun, setAllowRun] = useState(false);
useEffect(() => {
- setIsRunning(lastRunStatus == WORKFLOW_RUN_STATUSES.PENDING || lastRunStatus == WORKFLOW_RUN_STATUSES.RUNNING);
+ setIsPendingOrRunning(lastRunStatus == WORKFLOW_RUN_STATUSES.PENDING || lastRunStatus == WORKFLOW_RUN_STATUSES.RUNNING);
}, [lastRunStatus]);
useEffect(() => {
- if (!!workflowId && isRunning) {
+ if (!!workflowId && isPendingOrRunning) {
subscribeWorkflow(workflowId, (e) => {
if (e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.PENDING && e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.RUNNING) {
- setIsRunning(false);
+ setIsPendingOrRunning(false);
unsubscribeWorkflow(workflowId);
}
});
@@ -76,15 +75,15 @@ const WorkflowDetail = () => {
unsubscribeWorkflow(workflowId);
};
}
- }, [workflowId, isRunning]);
+ }, [workflowId, isPendingOrRunning]);
useEffect(() => {
const hasReleased = !!workflow.content;
const hasChanges = workflow.hasDraft! || !isEqual(workflow.draft, workflow.content);
- setAllowDiscard(!isRunning && hasReleased && hasChanges);
- setAllowRelease(!isRunning && hasChanges);
+ setAllowDiscard(!isPendingOrRunning && hasReleased && hasChanges);
+ setAllowRelease(!isPendingOrRunning && hasChanges);
setAllowRun(hasReleased);
- }, [workflow.content, workflow.draft, workflow.hasDraft, isRunning]);
+ }, [workflow.content, workflow.draft, workflow.hasDraft, isPendingOrRunning]);
const handleEnableChange = async () => {
if (!workflow.enabled && (!workflow.content || !isAllNodesValidated(workflow.content))) {
@@ -174,12 +173,12 @@ const WorkflowDetail = () => {
let unsubscribeFn: Awaited> | undefined = undefined;
try {
- setIsRunning(true);
+ setIsPendingOrRunning(true);
// subscribe before running workflow
unsubscribeFn = await subscribeWorkflow(workflowId!, (e) => {
if (e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.PENDING && e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.RUNNING) {
- setIsRunning(false);
+ setIsPendingOrRunning(false);
unsubscribeFn?.();
}
});
@@ -188,7 +187,7 @@ const WorkflowDetail = () => {
messageApi.info(t("workflow.detail.orchestration.action.run.prompt"));
} catch (err) {
- setIsRunning(false);
+ setIsPendingOrRunning(false);
unsubscribeFn?.();
console.error(err);
@@ -279,7 +278,7 @@ const WorkflowDetail = () => {
- } loading={isRunning} type="primary" onClick={handleRunClick}>
+ } loading={isPendingOrRunning} type="primary" onClick={handleRunClick}>
{t("workflow.detail.orchestration.action.run")}
diff --git a/ui/src/pages/workflows/WorkflowList.tsx b/ui/src/pages/workflows/WorkflowList.tsx
index 29dad70d..c0241413 100644
--- a/ui/src/pages/workflows/WorkflowList.tsx
+++ b/ui/src/pages/workflows/WorkflowList.tsx
@@ -7,8 +7,8 @@ import {
CloseCircleOutlined as CloseCircleOutlinedIcon,
DeleteOutlined as DeleteOutlinedIcon,
EditOutlined as EditOutlinedIcon,
- PauseCircleOutlined as PauseCircleOutlinedIcon,
PlusOutlined as PlusOutlinedIcon,
+ StopOutlined as StopOutlinedIcon,
SyncOutlined as SyncOutlinedIcon,
} from "@ant-design/icons";
@@ -170,7 +170,7 @@ const WorkflowList = () => {
} else if (record.lastRunStatus === WORKFLOW_RUN_STATUSES.FAILED) {
icon = ;
} else if (record.lastRunStatus === WORKFLOW_RUN_STATUSES.CANCELED) {
- icon = ;
+ icon = ;
}
return (