feat: workflow run dispatcher

This commit is contained in:
Fu Diwei 2025-02-08 22:44:13 +08:00
parent b9e28db089
commit 0bc40fd676
13 changed files with 472 additions and 226 deletions

View File

@ -69,6 +69,13 @@ func (r *WorkflowRunRepository) Save(ctx context.Context, workflowRun *domain.Wo
workflowRecord, err := txApp.FindRecordById(domain.CollectionNameWorkflow, workflowRun.WorkflowId) workflowRecord, err := txApp.FindRecordById(domain.CollectionNameWorkflow, workflowRun.WorkflowId)
if err != nil { if err != nil {
return err return err
} else if workflowRun.Id == workflowRecord.GetString("lastRunId") {
workflowRecord.IgnoreUnchangedFields(true)
workflowRecord.Set("lastRunStatus", record.GetString("status"))
err = txApp.Save(workflowRecord)
if err != nil {
return err
}
} else if workflowRecord.GetDateTime("lastRunTime").Time().IsZero() || workflowRun.StartedAt.After(workflowRecord.GetDateTime("lastRunTime").Time()) { } else if workflowRecord.GetDateTime("lastRunTime").Time().IsZero() || workflowRun.StartedAt.After(workflowRecord.GetDateTime("lastRunTime").Time()) {
workflowRecord.IgnoreUnchangedFields(true) workflowRecord.IgnoreUnchangedFields(true)
workflowRecord.Set("lastRunId", record.Id) workflowRecord.Set("lastRunId", record.Id)

View File

@ -13,7 +13,7 @@ import (
type workflowService interface { type workflowService interface {
StartRun(ctx context.Context, req *dtos.WorkflowStartRunReq) error StartRun(ctx context.Context, req *dtos.WorkflowStartRunReq) error
CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error
Stop(ctx context.Context) Shutdown(ctx context.Context)
} }
type WorkflowHandler struct { type WorkflowHandler struct {

View File

@ -46,6 +46,6 @@ func Register(router *router.Router[*core.RequestEvent]) {
func Unregister() { func Unregister() {
if workflowSvc != nil { if workflowSvc != nil {
workflowSvc.Stop(context.Background()) workflowSvc.Shutdown(context.Background())
} }
} }

View File

@ -0,0 +1,274 @@
package dispatcher
import (
"context"
"errors"
"fmt"
"os"
"strconv"
"sync"
"time"
"github.com/usual2970/certimate/internal/app"
"github.com/usual2970/certimate/internal/domain"
"github.com/usual2970/certimate/internal/pkg/utils/slices"
)
var maxWorkers = 16
func init() {
envMaxWorkers := os.Getenv("CERTIMATE_WORKFLOW_MAX_WORKERS")
if n, err := strconv.Atoi(envMaxWorkers); err != nil && n > 0 {
maxWorkers = n
}
}
type workflowWorker struct {
Data *WorkflowWorkerData
Cancel context.CancelFunc
}
type WorkflowWorkerData struct {
WorkflowId string
WorkflowContent *domain.WorkflowNode
RunId string
}
type WorkflowDispatcher struct {
semaphore chan struct{}
queue []*WorkflowWorkerData
queueMutex sync.Mutex
workers map[string]*workflowWorker // key: WorkflowId
workerIdMap map[string]string // key: RunId, value: WorkflowId
workerMutex sync.Mutex
chWork chan *WorkflowWorkerData
chCandi chan struct{}
wg sync.WaitGroup
workflowRepo workflowRepository
workflowRunRepo workflowRunRepository
}
func newWorkflowDispatcher(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository) *WorkflowDispatcher {
dispatcher := &WorkflowDispatcher{
semaphore: make(chan struct{}, maxWorkers),
queue: make([]*WorkflowWorkerData, 0),
queueMutex: sync.Mutex{},
workers: make(map[string]*workflowWorker),
workerIdMap: make(map[string]string),
workerMutex: sync.Mutex{},
chWork: make(chan *WorkflowWorkerData),
chCandi: make(chan struct{}, 1),
workflowRepo: workflowRepo,
workflowRunRepo: workflowRunRepo,
}
go func() {
for {
select {
case <-dispatcher.chWork:
dispatcher.dequeueWorker()
case <-dispatcher.chCandi:
dispatcher.dequeueWorker()
}
}
}()
return dispatcher
}
func (w *WorkflowDispatcher) Dispatch(data *WorkflowWorkerData) {
if data == nil {
panic("worker data is nil")
}
w.enqueueWorker(data)
select {
case w.chWork <- data:
default:
}
}
func (w *WorkflowDispatcher) Cancel(runId string) {
hasWorker := false
// 取消正在执行的 WorkflowRun
w.workerMutex.Lock()
if workflowId, ok := w.workerIdMap[runId]; ok {
if worker, ok := w.workers[workflowId]; ok {
hasWorker = true
worker.Cancel()
delete(w.workers, workflowId)
delete(w.workerIdMap, runId)
}
}
w.workerMutex.Unlock()
// 移除排队中的 WorkflowRun
w.queueMutex.Lock()
w.queue = slices.Filter(w.queue, func(d *WorkflowWorkerData) bool {
return d.RunId != runId
})
w.queueMutex.Unlock()
// 已挂起,查询 WorkflowRun 并更新其状态为 Canceled
if !hasWorker {
if run, err := w.workflowRunRepo.GetById(context.Background(), runId); err == nil {
if run.Status == domain.WorkflowRunStatusTypePending || run.Status == domain.WorkflowRunStatusTypeRunning {
run.Status = domain.WorkflowRunStatusTypeCanceled
w.workflowRunRepo.Save(context.Background(), run)
}
}
}
}
func (w *WorkflowDispatcher) Shutdown() {
// 清空排队中的 WorkflowRun
w.queueMutex.Lock()
w.queue = make([]*WorkflowWorkerData, 0)
w.queueMutex.Unlock()
// 等待所有正在执行的 WorkflowRun 完成
w.wg.Wait()
w.workers = make(map[string]*workflowWorker)
w.workerIdMap = make(map[string]string)
}
func (w *WorkflowDispatcher) enqueueWorker(data *WorkflowWorkerData) {
w.queueMutex.Lock()
defer w.queueMutex.Unlock()
w.queue = append(w.queue, data)
}
func (w *WorkflowDispatcher) dequeueWorker() {
for {
select {
case w.semaphore <- struct{}{}:
default:
// 达到最大并发数
return
}
w.queueMutex.Lock()
if len(w.queue) == 0 {
w.queueMutex.Unlock()
<-w.semaphore
return
}
data := w.queue[0]
w.queue = w.queue[1:]
w.queueMutex.Unlock()
// 检查是否有相同 WorkflowId 的 WorkflowRun 正在执行
// 如果有,则重新排队,以保证同一个工作流同一时间内只有一个正在执行
// 即不同 WorkflowId 的任务并行化,相同 WorkflowId 的任务串行化
w.workerMutex.Lock()
if _, exists := w.workers[data.WorkflowId]; exists {
w.queueMutex.Lock()
w.queue = append(w.queue, data)
w.queueMutex.Unlock()
w.workerMutex.Unlock()
<-w.semaphore
continue
}
ctx, cancel := context.WithCancel(context.Background())
w.workers[data.WorkflowId] = &workflowWorker{data, cancel}
w.workerIdMap[data.RunId] = data.WorkflowId
w.workerMutex.Unlock()
w.wg.Add(1)
go w.work(ctx, data)
}
}
func (w *WorkflowDispatcher) work(ctx context.Context, data *WorkflowWorkerData) {
defer func() {
<-w.semaphore
w.workerMutex.Lock()
delete(w.workers, data.WorkflowId)
delete(w.workerIdMap, data.RunId)
w.workerMutex.Unlock()
w.wg.Done()
// 尝试取出排队中的其他 WorkflowRun 继续执行
select {
case w.chCandi <- struct{}{}:
default:
}
}()
// 查询 WorkflowRun
run, err := w.workflowRunRepo.GetById(ctx, data.RunId)
if err != nil {
if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
app.GetLogger().Error(fmt.Sprintf("failed to get workflow run #%s", data.RunId), "err", err)
}
return
} else if run.Status != domain.WorkflowRunStatusTypePending {
return
} else if ctx.Err() != nil {
run.Status = domain.WorkflowRunStatusTypeCanceled
w.workflowRunRepo.Save(ctx, run)
return
}
// 更新 WorkflowRun 状态为 Running
run.Status = domain.WorkflowRunStatusTypeRunning
if _, err := w.workflowRunRepo.Save(ctx, run); err != nil {
if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
panic(err)
}
return
}
// 执行工作流
invoker := newWorkflowInvoker(data)
if runErr := invoker.Invoke(ctx); runErr != nil {
if errors.Is(runErr, context.Canceled) {
run.Status = domain.WorkflowRunStatusTypeCanceled
run.Logs = invoker.GetLogs()
} else {
run.Status = domain.WorkflowRunStatusTypeFailed
run.EndedAt = time.Now()
run.Logs = invoker.GetLogs()
run.Error = runErr.Error()
}
if _, err := w.workflowRunRepo.Save(ctx, run); err != nil {
if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
panic(err)
}
}
return
}
// 更新 WorkflowRun 状态为 Succeeded/Failed
run.EndedAt = time.Now()
run.Logs = invoker.GetLogs()
run.Error = domain.WorkflowRunLogs(invoker.GetLogs()).ErrorString()
if run.Error == "" {
run.Status = domain.WorkflowRunStatusTypeSucceeded
} else {
run.Status = domain.WorkflowRunStatusTypeFailed
}
if _, err := w.workflowRunRepo.Save(ctx, run); err != nil {
if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
panic(err)
}
}
}

View File

@ -0,0 +1,104 @@
package dispatcher
import (
"context"
"errors"
"github.com/usual2970/certimate/internal/domain"
nodes "github.com/usual2970/certimate/internal/workflow/node-processor"
)
type workflowInvoker struct {
workflowId string
workflowContent *domain.WorkflowNode
runId string
runLogs []domain.WorkflowRunLog
}
func newWorkflowInvoker(data *WorkflowWorkerData) *workflowInvoker {
if data == nil {
panic("worker data is nil")
}
return &workflowInvoker{
workflowId: data.WorkflowId,
workflowContent: data.WorkflowContent,
runId: data.RunId,
runLogs: make([]domain.WorkflowRunLog, 0),
}
}
func (w *workflowInvoker) Invoke(ctx context.Context) error {
ctx = context.WithValue(ctx, "workflow_id", w.workflowId)
ctx = context.WithValue(ctx, "workflow_run_id", w.runId)
return w.processNode(ctx, w.workflowContent)
}
func (w *workflowInvoker) GetLogs() []domain.WorkflowRunLog {
return w.runLogs
}
func (w *workflowInvoker) processNode(ctx context.Context, node *domain.WorkflowNode) error {
current := node
for current != nil {
if ctx.Err() != nil {
return ctx.Err()
}
if current.Type == domain.WorkflowNodeTypeBranch || current.Type == domain.WorkflowNodeTypeExecuteResultBranch {
for _, branch := range current.Branches {
if err := w.processNode(ctx, &branch); err != nil {
// 并行分支的某一分支发生错误时,忽略此错误,继续执行其他分支
if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
continue
}
return err
}
}
}
var processor nodes.NodeProcessor
var procErr error
for {
if current.Type != domain.WorkflowNodeTypeBranch && current.Type != domain.WorkflowNodeTypeExecuteResultBranch {
processor, procErr = nodes.GetProcessor(current)
if procErr != nil {
break
}
procErr = processor.Process(ctx)
log := processor.GetLog(ctx)
if log != nil {
w.runLogs = append(w.runLogs, *log)
}
if procErr != nil {
break
}
}
break
}
// TODO: 优化可读性
if procErr != nil && current.Next != nil && current.Next.Type != domain.WorkflowNodeTypeExecuteResultBranch {
return procErr
} else if procErr != nil && current.Next != nil && current.Next.Type == domain.WorkflowNodeTypeExecuteResultBranch {
current = w.getBranchByType(current.Next.Branches, domain.WorkflowNodeTypeExecuteFailure)
} else if procErr == nil && current.Next != nil && current.Next.Type == domain.WorkflowNodeTypeExecuteResultBranch {
current = w.getBranchByType(current.Next.Branches, domain.WorkflowNodeTypeExecuteSuccess)
} else {
current = current.Next
}
}
return nil
}
func (w *workflowInvoker) getBranchByType(branches []domain.WorkflowNode, nodeType domain.WorkflowNodeType) *domain.WorkflowNode {
for _, branch := range branches {
if branch.Type == nodeType {
return &branch
}
}
return nil
}

View File

@ -0,0 +1,31 @@
package dispatcher
import (
"context"
"sync"
"github.com/usual2970/certimate/internal/domain"
)
type workflowRepository interface {
GetById(ctx context.Context, id string) (*domain.Workflow, error)
Save(ctx context.Context, workflow *domain.Workflow) (*domain.Workflow, error)
}
type workflowRunRepository interface {
GetById(ctx context.Context, id string) (*domain.WorkflowRun, error)
Save(ctx context.Context, workflowRun *domain.WorkflowRun) (*domain.WorkflowRun, error)
}
var (
instance *WorkflowDispatcher
intanceOnce sync.Once
)
func GetSingletonDispatcher(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository) *WorkflowDispatcher {
intanceOnce.Do(func() {
instance = newWorkflowDispatcher(workflowRepo, workflowRunRepo)
})
return instance
}

View File

@ -80,7 +80,6 @@ func (n *applyNode) Process(ctx context.Context) error {
certificate.PopulateFromX509(certX509) certificate.PopulateFromX509(certX509)
// 保存执行结果 // 保存执行结果
// TODO: 先保持一个节点始终只有一个输出,后续增加版本控制
output := &domain.WorkflowOutput{ output := &domain.WorkflowOutput{
WorkflowId: getContextWorkflowId(ctx), WorkflowId: getContextWorkflowId(ctx),
RunId: getContextWorkflowRunId(ctx), RunId: getContextWorkflowRunId(ctx),
@ -124,7 +123,7 @@ func (n *applyNode) checkCanSkip(ctx context.Context, lastOutput *domain.Workflo
renewalInterval := time.Duration(currentNodeConfig.SkipBeforeExpiryDays) * time.Hour * 24 renewalInterval := time.Duration(currentNodeConfig.SkipBeforeExpiryDays) * time.Hour * 24
expirationTime := time.Until(lastCertificate.ExpireAt) expirationTime := time.Until(lastCertificate.ExpireAt)
if expirationTime > renewalInterval { if expirationTime > renewalInterval {
return true, fmt.Sprintf("已申请过证书,且证书尚未临近过期(到期尚余 %d 天,预计不足 %d 天时续期)", int(expirationTime.Hours()/24), currentNodeConfig.SkipBeforeExpiryDays) return true, fmt.Sprintf("已申请过证书,且证书尚未临近过期(尚余 %d 天过期,不足 %d 天时续期)", int(expirationTime.Hours()/24), currentNodeConfig.SkipBeforeExpiryDays)
} }
} }
} }

View File

@ -1,90 +0,0 @@
package processor
import (
"context"
"github.com/usual2970/certimate/internal/domain"
nodes "github.com/usual2970/certimate/internal/workflow/node-processor"
)
type workflowProcessor struct {
workflowId string
workflowContent *domain.WorkflowNode
runId string
runLogs []domain.WorkflowRunLog
}
func NewWorkflowProcessor(workflowId string, workflowContent *domain.WorkflowNode, workflowRunId string) *workflowProcessor {
return &workflowProcessor{
workflowId: workflowId,
workflowContent: workflowContent,
runId: workflowRunId,
runLogs: make([]domain.WorkflowRunLog, 0),
}
}
func (w *workflowProcessor) Process(ctx context.Context) error {
ctx = context.WithValue(ctx, "workflow_id", w.workflowId)
ctx = context.WithValue(ctx, "workflow_run_id", w.runId)
return w.processNode(ctx, w.workflowContent)
}
func (w *workflowProcessor) GetLogs() []domain.WorkflowRunLog {
return w.runLogs
}
func (w *workflowProcessor) processNode(ctx context.Context, node *domain.WorkflowNode) error {
current := node
for current != nil {
if current.Type == domain.WorkflowNodeTypeBranch || current.Type == domain.WorkflowNodeTypeExecuteResultBranch {
for _, branch := range current.Branches {
if err := w.processNode(ctx, &branch); err != nil {
continue
}
}
}
var processor nodes.NodeProcessor
var runErr error
for {
if current.Type != domain.WorkflowNodeTypeBranch && current.Type != domain.WorkflowNodeTypeExecuteResultBranch {
processor, runErr = nodes.GetProcessor(current)
if runErr != nil {
break
}
runErr = processor.Process(ctx)
log := processor.GetLog(ctx)
if log != nil {
w.runLogs = append(w.runLogs, *log)
}
if runErr != nil {
break
}
}
break
}
if runErr != nil && current.Next != nil && current.Next.Type != domain.WorkflowNodeTypeExecuteResultBranch {
return runErr
} else if runErr != nil && current.Next != nil && current.Next.Type == domain.WorkflowNodeTypeExecuteResultBranch {
current = w.getBranchByType(current.Next.Branches, domain.WorkflowNodeTypeExecuteFailure)
} else if runErr == nil && current.Next != nil && current.Next.Type == domain.WorkflowNodeTypeExecuteResultBranch {
current = w.getBranchByType(current.Next.Branches, domain.WorkflowNodeTypeExecuteSuccess)
} else {
current = current.Next
}
}
return nil
}
func (w *workflowProcessor) getBranchByType(branches []domain.WorkflowNode, nodeType domain.WorkflowNodeType) *domain.WorkflowNode {
for _, branch := range branches {
if branch.Type == nodeType {
return &branch
}
}
return nil
}

View File

@ -4,23 +4,14 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/usual2970/certimate/internal/app" "github.com/usual2970/certimate/internal/app"
"github.com/usual2970/certimate/internal/domain" "github.com/usual2970/certimate/internal/domain"
"github.com/usual2970/certimate/internal/domain/dtos" "github.com/usual2970/certimate/internal/domain/dtos"
processor "github.com/usual2970/certimate/internal/workflow/processor" "github.com/usual2970/certimate/internal/workflow/dispatcher"
) )
const defaultRoutines = 16
type workflowRunData struct {
WorkflowId string
WorkflowContent *domain.WorkflowNode
RunTrigger domain.WorkflowTriggerType
}
type workflowRepository interface { type workflowRepository interface {
ListEnabledAuto(ctx context.Context) ([]*domain.Workflow, error) ListEnabledAuto(ctx context.Context) ([]*domain.Workflow, error)
GetById(ctx context.Context, id string) (*domain.Workflow, error) GetById(ctx context.Context, id string) (*domain.Workflow, error)
@ -33,30 +24,19 @@ type workflowRunRepository interface {
} }
type WorkflowService struct { type WorkflowService struct {
ch chan *workflowRunData dispatcher *dispatcher.WorkflowDispatcher
wg sync.WaitGroup
cancel context.CancelFunc
workflowRepo workflowRepository workflowRepo workflowRepository
workflowRunRepo workflowRunRepository workflowRunRepo workflowRunRepository
} }
func NewWorkflowService(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository) *WorkflowService { func NewWorkflowService(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository) *WorkflowService {
ctx, cancel := context.WithCancel(context.Background())
srv := &WorkflowService{ srv := &WorkflowService{
ch: make(chan *workflowRunData, 1), dispatcher: dispatcher.GetSingletonDispatcher(workflowRepo, workflowRunRepo),
cancel: cancel,
workflowRepo: workflowRepo, workflowRepo: workflowRepo,
workflowRunRepo: workflowRunRepo, workflowRunRepo: workflowRunRepo,
} }
srv.wg.Add(defaultRoutines)
for i := 0; i < defaultRoutines; i++ {
go srv.startRun(ctx)
}
return srv return srv
} }
@ -75,7 +55,6 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error {
}) })
}) })
if err != nil { if err != nil {
app.GetLogger().Error("failed to add schedule", "err", err)
return err return err
} }
} }
@ -86,7 +65,6 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error {
func (s *WorkflowService) StartRun(ctx context.Context, req *dtos.WorkflowStartRunReq) error { func (s *WorkflowService) StartRun(ctx context.Context, req *dtos.WorkflowStartRunReq) error {
workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId) workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId)
if err != nil { if err != nil {
app.GetLogger().Error("failed to get workflow", "id", req.WorkflowId, "err", err)
return err return err
} }
@ -94,69 +72,10 @@ func (s *WorkflowService) StartRun(ctx context.Context, req *dtos.WorkflowStartR
return errors.New("workflow is already pending or running") return errors.New("workflow is already pending or running")
} }
s.ch <- &workflowRunData{
WorkflowId: workflow.Id,
WorkflowContent: workflow.Content,
RunTrigger: req.RunTrigger,
}
return nil
}
func (s *WorkflowService) CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error {
workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId)
if err != nil {
app.GetLogger().Error("failed to get workflow", "id", req.WorkflowId, "err", err)
return err
}
workflowRun, err := s.workflowRunRepo.GetById(ctx, req.RunId)
if err != nil {
app.GetLogger().Error("failed to get workflow run", "id", req.RunId, "err", err)
return err
} else if workflowRun.WorkflowId != workflow.Id {
return errors.New("workflow run not found")
} else if workflowRun.Status != domain.WorkflowRunStatusTypePending && workflowRun.Status != domain.WorkflowRunStatusTypeRunning {
return errors.New("workflow run is not pending or running")
}
// TODO: 取消运行,防止因为某些原因意外挂起(如进程被杀死)导致工作流一直处于 running 状态无法重新运行
// workflowRun.Status = domain.WorkflowRunStatusTypeCanceled
// workflowRun.EndedAt = time.Now()
// if _, err := s.workflowRunRepo.Save(ctx, workflowRun); err != nil {
// return err
// }
// return nil
return errors.New("TODO: 尚未实现")
}
func (s *WorkflowService) Stop(ctx context.Context) {
s.cancel()
s.wg.Wait()
}
func (s *WorkflowService) startRun(ctx context.Context) {
defer s.wg.Done()
for {
select {
case data := <-s.ch:
if err := s.startRunWithData(ctx, data); err != nil {
app.GetLogger().Error("failed to run workflow", "id", data.WorkflowId, "err", err)
}
case <-ctx.Done():
return
}
}
}
func (s *WorkflowService) startRunWithData(ctx context.Context, data *workflowRunData) error {
run := &domain.WorkflowRun{ run := &domain.WorkflowRun{
WorkflowId: data.WorkflowId, WorkflowId: workflow.Id,
Status: domain.WorkflowRunStatusTypeRunning, Status: domain.WorkflowRunStatusTypePending,
Trigger: data.RunTrigger, Trigger: req.RunTrigger,
StartedAt: time.Now(), StartedAt: time.Now(),
} }
if resp, err := s.workflowRunRepo.Save(ctx, run); err != nil { if resp, err := s.workflowRunRepo.Save(ctx, run); err != nil {
@ -165,31 +84,35 @@ func (s *WorkflowService) startRunWithData(ctx context.Context, data *workflowRu
run = resp run = resp
} }
processor := processor.NewWorkflowProcessor(data.WorkflowId, data.WorkflowContent, run.Id) s.dispatcher.Dispatch(&dispatcher.WorkflowWorkerData{
if runErr := processor.Process(ctx); runErr != nil { WorkflowId: workflow.Id,
run.Status = domain.WorkflowRunStatusTypeFailed WorkflowContent: workflow.Content,
run.EndedAt = time.Now() RunId: run.Id,
run.Logs = processor.GetLogs() })
run.Error = runErr.Error()
if _, err := s.workflowRunRepo.Save(ctx, run); err != nil {
app.GetLogger().Error("failed to save workflow run", "err", err)
}
return fmt.Errorf("failed to run workflow: %w", runErr)
}
run.EndedAt = time.Now()
run.Logs = processor.GetLogs()
run.Error = domain.WorkflowRunLogs(run.Logs).ErrorString()
if run.Error == "" {
run.Status = domain.WorkflowRunStatusTypeSucceeded
} else {
run.Status = domain.WorkflowRunStatusTypeFailed
}
if _, err := s.workflowRunRepo.Save(ctx, run); err != nil {
app.GetLogger().Error("failed to save workflow run", "err", err)
return err
}
return nil return nil
} }
func (s *WorkflowService) CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error {
workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId)
if err != nil {
return err
}
workflowRun, err := s.workflowRunRepo.GetById(ctx, req.RunId)
if err != nil {
return err
} else if workflowRun.WorkflowId != workflow.Id {
return errors.New("workflow run not found")
} else if workflowRun.Status != domain.WorkflowRunStatusTypePending && workflowRun.Status != domain.WorkflowRunStatusTypeRunning {
return errors.New("workflow run is not pending or running")
}
s.dispatcher.Cancel(workflowRun.Id)
return nil
}
func (s *WorkflowService) Shutdown(ctx context.Context) {
s.dispatcher.Shutdown()
}

View File

@ -5,9 +5,9 @@ import {
ClockCircleOutlined as ClockCircleOutlinedIcon, ClockCircleOutlined as ClockCircleOutlinedIcon,
CloseCircleOutlined as CloseCircleOutlinedIcon, CloseCircleOutlined as CloseCircleOutlinedIcon,
DeleteOutlined as DeleteOutlinedIcon, DeleteOutlined as DeleteOutlinedIcon,
PauseCircleOutlined as PauseCircleOutlinedIcon,
PauseOutlined as PauseOutlinedIcon, PauseOutlined as PauseOutlinedIcon,
SelectOutlined as SelectOutlinedIcon, SelectOutlined as SelectOutlinedIcon,
StopOutlined as StopOutlinedIcon,
SyncOutlined as SyncOutlinedIcon, SyncOutlined as SyncOutlinedIcon,
} from "@ant-design/icons"; } from "@ant-design/icons";
import { useRequest } from "ahooks"; import { useRequest } from "ahooks";
@ -75,7 +75,7 @@ const WorkflowRuns = ({ className, style, workflowId }: WorkflowRunsProps) => {
); );
} else if (record.status === WORKFLOW_RUN_STATUSES.CANCELED) { } else if (record.status === WORKFLOW_RUN_STATUSES.CANCELED) {
return ( return (
<Tag icon={<PauseCircleOutlinedIcon />} color="warning"> <Tag icon={<StopOutlinedIcon />} color="warning">
{t("workflow_run.props.status.canceled")} {t("workflow_run.props.status.canceled")}
</Tag> </Tag>
); );

View File

@ -7,10 +7,10 @@ import {
ClockCircleOutlined as ClockCircleOutlinedIcon, ClockCircleOutlined as ClockCircleOutlinedIcon,
CloseCircleOutlined as CloseCircleOutlinedIcon, CloseCircleOutlined as CloseCircleOutlinedIcon,
LockOutlined as LockOutlinedIcon, LockOutlined as LockOutlinedIcon,
PauseCircleOutlined as PauseCircleOutlinedIcon,
PlusOutlined as PlusOutlinedIcon, PlusOutlined as PlusOutlinedIcon,
SelectOutlined as SelectOutlinedIcon, SelectOutlined as SelectOutlinedIcon,
SendOutlined as SendOutlinedIcon, SendOutlined as SendOutlinedIcon,
StopOutlined as StopOutlinedIcon,
SyncOutlined as SyncOutlinedIcon, SyncOutlined as SyncOutlinedIcon,
} from "@ant-design/icons"; } from "@ant-design/icons";
import { PageHeader } from "@ant-design/pro-components"; import { PageHeader } from "@ant-design/pro-components";
@ -89,7 +89,6 @@ const Dashboard = () => {
const workflow = record.expand?.workflowId; const workflow = record.expand?.workflowId;
return ( return (
<Typography.Link <Typography.Link
type="secondary"
ellipsis ellipsis
onClick={() => { onClick={() => {
if (workflow) { if (workflow) {
@ -129,7 +128,7 @@ const Dashboard = () => {
); );
} else if (record.status === WORKFLOW_RUN_STATUSES.CANCELED) { } else if (record.status === WORKFLOW_RUN_STATUSES.CANCELED) {
return ( return (
<Tag icon={<PauseCircleOutlinedIcon />} color="warning"> <Tag icon={<StopOutlinedIcon />} color="warning">
{t("workflow_run.props.status.canceled")} {t("workflow_run.props.status.canceled")}
</Tag> </Tag>
); );

View File

@ -42,7 +42,6 @@ const WorkflowDetail = () => {
useZustandShallowSelector(["workflow", "initialized", "init", "destroy", "setEnabled", "release", "discard"]) useZustandShallowSelector(["workflow", "initialized", "init", "destroy", "setEnabled", "release", "discard"])
); );
useEffect(() => { useEffect(() => {
// TODO: loading & error
workflowState.init(workflowId!); workflowState.init(workflowId!);
return () => { return () => {
@ -52,7 +51,7 @@ const WorkflowDetail = () => {
const [tabValue, setTabValue] = useState<"orchestration" | "runs">("orchestration"); const [tabValue, setTabValue] = useState<"orchestration" | "runs">("orchestration");
const [isRunning, setIsRunning] = useState(false); const [isPendingOrRunning, setIsPendingOrRunning] = useState(false);
const lastRunStatus = useMemo(() => workflow.lastRunStatus, [workflow]); const lastRunStatus = useMemo(() => workflow.lastRunStatus, [workflow]);
const [allowDiscard, setAllowDiscard] = useState(false); const [allowDiscard, setAllowDiscard] = useState(false);
@ -60,14 +59,14 @@ const WorkflowDetail = () => {
const [allowRun, setAllowRun] = useState(false); const [allowRun, setAllowRun] = useState(false);
useEffect(() => { useEffect(() => {
setIsRunning(lastRunStatus == WORKFLOW_RUN_STATUSES.PENDING || lastRunStatus == WORKFLOW_RUN_STATUSES.RUNNING); setIsPendingOrRunning(lastRunStatus == WORKFLOW_RUN_STATUSES.PENDING || lastRunStatus == WORKFLOW_RUN_STATUSES.RUNNING);
}, [lastRunStatus]); }, [lastRunStatus]);
useEffect(() => { useEffect(() => {
if (!!workflowId && isRunning) { if (!!workflowId && isPendingOrRunning) {
subscribeWorkflow(workflowId, (e) => { subscribeWorkflow(workflowId, (e) => {
if (e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.PENDING && e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.RUNNING) { if (e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.PENDING && e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.RUNNING) {
setIsRunning(false); setIsPendingOrRunning(false);
unsubscribeWorkflow(workflowId); unsubscribeWorkflow(workflowId);
} }
}); });
@ -76,15 +75,15 @@ const WorkflowDetail = () => {
unsubscribeWorkflow(workflowId); unsubscribeWorkflow(workflowId);
}; };
} }
}, [workflowId, isRunning]); }, [workflowId, isPendingOrRunning]);
useEffect(() => { useEffect(() => {
const hasReleased = !!workflow.content; const hasReleased = !!workflow.content;
const hasChanges = workflow.hasDraft! || !isEqual(workflow.draft, workflow.content); const hasChanges = workflow.hasDraft! || !isEqual(workflow.draft, workflow.content);
setAllowDiscard(!isRunning && hasReleased && hasChanges); setAllowDiscard(!isPendingOrRunning && hasReleased && hasChanges);
setAllowRelease(!isRunning && hasChanges); setAllowRelease(!isPendingOrRunning && hasChanges);
setAllowRun(hasReleased); setAllowRun(hasReleased);
}, [workflow.content, workflow.draft, workflow.hasDraft, isRunning]); }, [workflow.content, workflow.draft, workflow.hasDraft, isPendingOrRunning]);
const handleEnableChange = async () => { const handleEnableChange = async () => {
if (!workflow.enabled && (!workflow.content || !isAllNodesValidated(workflow.content))) { if (!workflow.enabled && (!workflow.content || !isAllNodesValidated(workflow.content))) {
@ -174,12 +173,12 @@ const WorkflowDetail = () => {
let unsubscribeFn: Awaited<ReturnType<typeof subscribeWorkflow>> | undefined = undefined; let unsubscribeFn: Awaited<ReturnType<typeof subscribeWorkflow>> | undefined = undefined;
try { try {
setIsRunning(true); setIsPendingOrRunning(true);
// subscribe before running workflow // subscribe before running workflow
unsubscribeFn = await subscribeWorkflow(workflowId!, (e) => { unsubscribeFn = await subscribeWorkflow(workflowId!, (e) => {
if (e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.PENDING && e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.RUNNING) { if (e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.PENDING && e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.RUNNING) {
setIsRunning(false); setIsPendingOrRunning(false);
unsubscribeFn?.(); unsubscribeFn?.();
} }
}); });
@ -188,7 +187,7 @@ const WorkflowDetail = () => {
messageApi.info(t("workflow.detail.orchestration.action.run.prompt")); messageApi.info(t("workflow.detail.orchestration.action.run.prompt"));
} catch (err) { } catch (err) {
setIsRunning(false); setIsPendingOrRunning(false);
unsubscribeFn?.(); unsubscribeFn?.();
console.error(err); console.error(err);
@ -279,7 +278,7 @@ const WorkflowDetail = () => {
</div> </div>
<div className="flex justify-end"> <div className="flex justify-end">
<Space> <Space>
<Button disabled={!allowRun} icon={<CaretRightOutlinedIcon />} loading={isRunning} type="primary" onClick={handleRunClick}> <Button disabled={!allowRun} icon={<CaretRightOutlinedIcon />} loading={isPendingOrRunning} type="primary" onClick={handleRunClick}>
{t("workflow.detail.orchestration.action.run")} {t("workflow.detail.orchestration.action.run")}
</Button> </Button>

View File

@ -7,8 +7,8 @@ import {
CloseCircleOutlined as CloseCircleOutlinedIcon, CloseCircleOutlined as CloseCircleOutlinedIcon,
DeleteOutlined as DeleteOutlinedIcon, DeleteOutlined as DeleteOutlinedIcon,
EditOutlined as EditOutlinedIcon, EditOutlined as EditOutlinedIcon,
PauseCircleOutlined as PauseCircleOutlinedIcon,
PlusOutlined as PlusOutlinedIcon, PlusOutlined as PlusOutlinedIcon,
StopOutlined as StopOutlinedIcon,
SyncOutlined as SyncOutlinedIcon, SyncOutlined as SyncOutlinedIcon,
} from "@ant-design/icons"; } from "@ant-design/icons";
@ -170,7 +170,7 @@ const WorkflowList = () => {
} else if (record.lastRunStatus === WORKFLOW_RUN_STATUSES.FAILED) { } else if (record.lastRunStatus === WORKFLOW_RUN_STATUSES.FAILED) {
icon = <CloseCircleOutlinedIcon style={{ color: themeToken.colorError }} />; icon = <CloseCircleOutlinedIcon style={{ color: themeToken.colorError }} />;
} else if (record.lastRunStatus === WORKFLOW_RUN_STATUSES.CANCELED) { } else if (record.lastRunStatus === WORKFLOW_RUN_STATUSES.CANCELED) {
icon = <PauseCircleOutlinedIcon style={{ color: themeToken.colorWarning }} />; icon = <StopOutlinedIcon style={{ color: themeToken.colorWarning }} />;
} }
return ( return (