Compare commits

...

3 Commits

Author SHA1 Message Date
Fu Diwei
a74ec95a6a feat(ui): subscribe workflow runs status 2025-02-08 23:08:25 +08:00
Fu Diwei
0bc40fd676 feat: workflow run dispatcher 2025-02-08 23:08:21 +08:00
Fu Diwei
b9e28db089 fix: nil pointer dereference 2025-02-08 23:08:14 +08:00
16 changed files with 516 additions and 238 deletions

View File

@ -69,6 +69,13 @@ func (r *WorkflowRunRepository) Save(ctx context.Context, workflowRun *domain.Wo
workflowRecord, err := txApp.FindRecordById(domain.CollectionNameWorkflow, workflowRun.WorkflowId) workflowRecord, err := txApp.FindRecordById(domain.CollectionNameWorkflow, workflowRun.WorkflowId)
if err != nil { if err != nil {
return err return err
} else if workflowRun.Id == workflowRecord.GetString("lastRunId") {
workflowRecord.IgnoreUnchangedFields(true)
workflowRecord.Set("lastRunStatus", record.GetString("status"))
err = txApp.Save(workflowRecord)
if err != nil {
return err
}
} else if workflowRecord.GetDateTime("lastRunTime").Time().IsZero() || workflowRun.StartedAt.After(workflowRecord.GetDateTime("lastRunTime").Time()) { } else if workflowRecord.GetDateTime("lastRunTime").Time().IsZero() || workflowRun.StartedAt.After(workflowRecord.GetDateTime("lastRunTime").Time()) {
workflowRecord.IgnoreUnchangedFields(true) workflowRecord.IgnoreUnchangedFields(true)
workflowRecord.Set("lastRunId", record.Id) workflowRecord.Set("lastRunId", record.Id)

View File

@ -13,7 +13,7 @@ import (
type workflowService interface { type workflowService interface {
StartRun(ctx context.Context, req *dtos.WorkflowStartRunReq) error StartRun(ctx context.Context, req *dtos.WorkflowStartRunReq) error
CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error
Stop(ctx context.Context) Shutdown(ctx context.Context)
} }
type WorkflowHandler struct { type WorkflowHandler struct {

View File

@ -46,6 +46,6 @@ func Register(router *router.Router[*core.RequestEvent]) {
func Unregister() { func Unregister() {
if workflowSvc != nil { if workflowSvc != nil {
workflowSvc.Stop(context.Background()) workflowSvc.Shutdown(context.Background())
} }
} }

View File

@ -0,0 +1,274 @@
package dispatcher
import (
"context"
"errors"
"fmt"
"os"
"strconv"
"sync"
"time"
"github.com/usual2970/certimate/internal/app"
"github.com/usual2970/certimate/internal/domain"
"github.com/usual2970/certimate/internal/pkg/utils/slices"
)
var maxWorkers = 16
func init() {
envMaxWorkers := os.Getenv("CERTIMATE_WORKFLOW_MAX_WORKERS")
if n, err := strconv.Atoi(envMaxWorkers); err != nil && n > 0 {
maxWorkers = n
}
}
type workflowWorker struct {
Data *WorkflowWorkerData
Cancel context.CancelFunc
}
type WorkflowWorkerData struct {
WorkflowId string
WorkflowContent *domain.WorkflowNode
RunId string
}
type WorkflowDispatcher struct {
semaphore chan struct{}
queue []*WorkflowWorkerData
queueMutex sync.Mutex
workers map[string]*workflowWorker // key: WorkflowId
workerIdMap map[string]string // key: RunId, value: WorkflowId
workerMutex sync.Mutex
chWork chan *WorkflowWorkerData
chCandi chan struct{}
wg sync.WaitGroup
workflowRepo workflowRepository
workflowRunRepo workflowRunRepository
}
func newWorkflowDispatcher(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository) *WorkflowDispatcher {
dispatcher := &WorkflowDispatcher{
semaphore: make(chan struct{}, maxWorkers),
queue: make([]*WorkflowWorkerData, 0),
queueMutex: sync.Mutex{},
workers: make(map[string]*workflowWorker),
workerIdMap: make(map[string]string),
workerMutex: sync.Mutex{},
chWork: make(chan *WorkflowWorkerData),
chCandi: make(chan struct{}, 1),
workflowRepo: workflowRepo,
workflowRunRepo: workflowRunRepo,
}
go func() {
for {
select {
case <-dispatcher.chWork:
dispatcher.dequeueWorker()
case <-dispatcher.chCandi:
dispatcher.dequeueWorker()
}
}
}()
return dispatcher
}
func (w *WorkflowDispatcher) Dispatch(data *WorkflowWorkerData) {
if data == nil {
panic("worker data is nil")
}
w.enqueueWorker(data)
select {
case w.chWork <- data:
default:
}
}
func (w *WorkflowDispatcher) Cancel(runId string) {
hasWorker := false
// 取消正在执行的 WorkflowRun
w.workerMutex.Lock()
if workflowId, ok := w.workerIdMap[runId]; ok {
if worker, ok := w.workers[workflowId]; ok {
hasWorker = true
worker.Cancel()
delete(w.workers, workflowId)
delete(w.workerIdMap, runId)
}
}
w.workerMutex.Unlock()
// 移除排队中的 WorkflowRun
w.queueMutex.Lock()
w.queue = slices.Filter(w.queue, func(d *WorkflowWorkerData) bool {
return d.RunId != runId
})
w.queueMutex.Unlock()
// 已挂起,查询 WorkflowRun 并更新其状态为 Canceled
if !hasWorker {
if run, err := w.workflowRunRepo.GetById(context.Background(), runId); err == nil {
if run.Status == domain.WorkflowRunStatusTypePending || run.Status == domain.WorkflowRunStatusTypeRunning {
run.Status = domain.WorkflowRunStatusTypeCanceled
w.workflowRunRepo.Save(context.Background(), run)
}
}
}
}
func (w *WorkflowDispatcher) Shutdown() {
// 清空排队中的 WorkflowRun
w.queueMutex.Lock()
w.queue = make([]*WorkflowWorkerData, 0)
w.queueMutex.Unlock()
// 等待所有正在执行的 WorkflowRun 完成
w.wg.Wait()
w.workers = make(map[string]*workflowWorker)
w.workerIdMap = make(map[string]string)
}
func (w *WorkflowDispatcher) enqueueWorker(data *WorkflowWorkerData) {
w.queueMutex.Lock()
defer w.queueMutex.Unlock()
w.queue = append(w.queue, data)
}
func (w *WorkflowDispatcher) dequeueWorker() {
for {
select {
case w.semaphore <- struct{}{}:
default:
// 达到最大并发数
return
}
w.queueMutex.Lock()
if len(w.queue) == 0 {
w.queueMutex.Unlock()
<-w.semaphore
return
}
data := w.queue[0]
w.queue = w.queue[1:]
w.queueMutex.Unlock()
// 检查是否有相同 WorkflowId 的 WorkflowRun 正在执行
// 如果有,则重新排队,以保证同一个工作流同一时间内只有一个正在执行
// 即不同 WorkflowId 的任务并行化,相同 WorkflowId 的任务串行化
w.workerMutex.Lock()
if _, exists := w.workers[data.WorkflowId]; exists {
w.queueMutex.Lock()
w.queue = append(w.queue, data)
w.queueMutex.Unlock()
w.workerMutex.Unlock()
<-w.semaphore
continue
}
ctx, cancel := context.WithCancel(context.Background())
w.workers[data.WorkflowId] = &workflowWorker{data, cancel}
w.workerIdMap[data.RunId] = data.WorkflowId
w.workerMutex.Unlock()
w.wg.Add(1)
go w.work(ctx, data)
}
}
func (w *WorkflowDispatcher) work(ctx context.Context, data *WorkflowWorkerData) {
defer func() {
<-w.semaphore
w.workerMutex.Lock()
delete(w.workers, data.WorkflowId)
delete(w.workerIdMap, data.RunId)
w.workerMutex.Unlock()
w.wg.Done()
// 尝试取出排队中的其他 WorkflowRun 继续执行
select {
case w.chCandi <- struct{}{}:
default:
}
}()
// 查询 WorkflowRun
run, err := w.workflowRunRepo.GetById(ctx, data.RunId)
if err != nil {
if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
app.GetLogger().Error(fmt.Sprintf("failed to get workflow run #%s", data.RunId), "err", err)
}
return
} else if run.Status != domain.WorkflowRunStatusTypePending {
return
} else if ctx.Err() != nil {
run.Status = domain.WorkflowRunStatusTypeCanceled
w.workflowRunRepo.Save(ctx, run)
return
}
// 更新 WorkflowRun 状态为 Running
run.Status = domain.WorkflowRunStatusTypeRunning
if _, err := w.workflowRunRepo.Save(ctx, run); err != nil {
if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
panic(err)
}
return
}
// 执行工作流
invoker := newWorkflowInvoker(data)
if runErr := invoker.Invoke(ctx); runErr != nil {
if errors.Is(runErr, context.Canceled) {
run.Status = domain.WorkflowRunStatusTypeCanceled
run.Logs = invoker.GetLogs()
} else {
run.Status = domain.WorkflowRunStatusTypeFailed
run.EndedAt = time.Now()
run.Logs = invoker.GetLogs()
run.Error = runErr.Error()
}
if _, err := w.workflowRunRepo.Save(ctx, run); err != nil {
if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
panic(err)
}
}
return
}
// 更新 WorkflowRun 状态为 Succeeded/Failed
run.EndedAt = time.Now()
run.Logs = invoker.GetLogs()
run.Error = domain.WorkflowRunLogs(invoker.GetLogs()).ErrorString()
if run.Error == "" {
run.Status = domain.WorkflowRunStatusTypeSucceeded
} else {
run.Status = domain.WorkflowRunStatusTypeFailed
}
if _, err := w.workflowRunRepo.Save(ctx, run); err != nil {
if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
panic(err)
}
}
}

View File

@ -0,0 +1,104 @@
package dispatcher
import (
"context"
"errors"
"github.com/usual2970/certimate/internal/domain"
nodes "github.com/usual2970/certimate/internal/workflow/node-processor"
)
type workflowInvoker struct {
workflowId string
workflowContent *domain.WorkflowNode
runId string
runLogs []domain.WorkflowRunLog
}
func newWorkflowInvoker(data *WorkflowWorkerData) *workflowInvoker {
if data == nil {
panic("worker data is nil")
}
return &workflowInvoker{
workflowId: data.WorkflowId,
workflowContent: data.WorkflowContent,
runId: data.RunId,
runLogs: make([]domain.WorkflowRunLog, 0),
}
}
func (w *workflowInvoker) Invoke(ctx context.Context) error {
ctx = context.WithValue(ctx, "workflow_id", w.workflowId)
ctx = context.WithValue(ctx, "workflow_run_id", w.runId)
return w.processNode(ctx, w.workflowContent)
}
func (w *workflowInvoker) GetLogs() []domain.WorkflowRunLog {
return w.runLogs
}
func (w *workflowInvoker) processNode(ctx context.Context, node *domain.WorkflowNode) error {
current := node
for current != nil {
if ctx.Err() != nil {
return ctx.Err()
}
if current.Type == domain.WorkflowNodeTypeBranch || current.Type == domain.WorkflowNodeTypeExecuteResultBranch {
for _, branch := range current.Branches {
if err := w.processNode(ctx, &branch); err != nil {
// 并行分支的某一分支发生错误时,忽略此错误,继续执行其他分支
if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
continue
}
return err
}
}
}
var processor nodes.NodeProcessor
var procErr error
for {
if current.Type != domain.WorkflowNodeTypeBranch && current.Type != domain.WorkflowNodeTypeExecuteResultBranch {
processor, procErr = nodes.GetProcessor(current)
if procErr != nil {
break
}
procErr = processor.Process(ctx)
log := processor.GetLog(ctx)
if log != nil {
w.runLogs = append(w.runLogs, *log)
}
if procErr != nil {
break
}
}
break
}
// TODO: 优化可读性
if procErr != nil && current.Next != nil && current.Next.Type != domain.WorkflowNodeTypeExecuteResultBranch {
return procErr
} else if procErr != nil && current.Next != nil && current.Next.Type == domain.WorkflowNodeTypeExecuteResultBranch {
current = w.getBranchByType(current.Next.Branches, domain.WorkflowNodeTypeExecuteFailure)
} else if procErr == nil && current.Next != nil && current.Next.Type == domain.WorkflowNodeTypeExecuteResultBranch {
current = w.getBranchByType(current.Next.Branches, domain.WorkflowNodeTypeExecuteSuccess)
} else {
current = current.Next
}
}
return nil
}
func (w *workflowInvoker) getBranchByType(branches []domain.WorkflowNode, nodeType domain.WorkflowNodeType) *domain.WorkflowNode {
for _, branch := range branches {
if branch.Type == nodeType {
return &branch
}
}
return nil
}

View File

@ -0,0 +1,31 @@
package dispatcher
import (
"context"
"sync"
"github.com/usual2970/certimate/internal/domain"
)
type workflowRepository interface {
GetById(ctx context.Context, id string) (*domain.Workflow, error)
Save(ctx context.Context, workflow *domain.Workflow) (*domain.Workflow, error)
}
type workflowRunRepository interface {
GetById(ctx context.Context, id string) (*domain.WorkflowRun, error)
Save(ctx context.Context, workflowRun *domain.WorkflowRun) (*domain.WorkflowRun, error)
}
var (
instance *WorkflowDispatcher
intanceOnce sync.Once
)
func GetSingletonDispatcher(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository) *WorkflowDispatcher {
intanceOnce.Do(func() {
instance = newWorkflowDispatcher(workflowRepo, workflowRunRepo)
})
return instance
}

View File

@ -80,7 +80,6 @@ func (n *applyNode) Process(ctx context.Context) error {
certificate.PopulateFromX509(certX509) certificate.PopulateFromX509(certX509)
// 保存执行结果 // 保存执行结果
// TODO: 先保持一个节点始终只有一个输出,后续增加版本控制
output := &domain.WorkflowOutput{ output := &domain.WorkflowOutput{
WorkflowId: getContextWorkflowId(ctx), WorkflowId: getContextWorkflowId(ctx),
RunId: getContextWorkflowRunId(ctx), RunId: getContextWorkflowRunId(ctx),
@ -124,7 +123,7 @@ func (n *applyNode) checkCanSkip(ctx context.Context, lastOutput *domain.Workflo
renewalInterval := time.Duration(currentNodeConfig.SkipBeforeExpiryDays) * time.Hour * 24 renewalInterval := time.Duration(currentNodeConfig.SkipBeforeExpiryDays) * time.Hour * 24
expirationTime := time.Until(lastCertificate.ExpireAt) expirationTime := time.Until(lastCertificate.ExpireAt)
if expirationTime > renewalInterval { if expirationTime > renewalInterval {
return true, fmt.Sprintf("已申请过证书,且证书尚未临近过期(到期尚余 %d 天,预计不足 %d 天时续期)", int(expirationTime.Hours()/24), currentNodeConfig.SkipBeforeExpiryDays) return true, fmt.Sprintf("已申请过证书,且证书尚未临近过期(尚余 %d 天过期,不足 %d 天时续期)", int(expirationTime.Hours()/24), currentNodeConfig.SkipBeforeExpiryDays)
} }
} }
} }

View File

@ -53,7 +53,7 @@ func (n *deployNode) Process(ctx context.Context) error {
} }
// 检测是否可以跳过本次执行 // 检测是否可以跳过本次执行
if certificate.CreatedAt.Before(lastOutput.UpdatedAt) { if lastOutput != nil && certificate.CreatedAt.Before(lastOutput.UpdatedAt) {
if skippable, skipReason := n.checkCanSkip(ctx, lastOutput); skippable { if skippable, skipReason := n.checkCanSkip(ctx, lastOutput); skippable {
n.AddOutput(ctx, n.node.Name, skipReason) n.AddOutput(ctx, n.node.Name, skipReason)
return nil return nil

View File

@ -1,90 +0,0 @@
package processor
import (
"context"
"github.com/usual2970/certimate/internal/domain"
nodes "github.com/usual2970/certimate/internal/workflow/node-processor"
)
type workflowProcessor struct {
workflowId string
workflowContent *domain.WorkflowNode
runId string
runLogs []domain.WorkflowRunLog
}
func NewWorkflowProcessor(workflowId string, workflowContent *domain.WorkflowNode, workflowRunId string) *workflowProcessor {
return &workflowProcessor{
workflowId: workflowId,
workflowContent: workflowContent,
runId: workflowRunId,
runLogs: make([]domain.WorkflowRunLog, 0),
}
}
func (w *workflowProcessor) Process(ctx context.Context) error {
ctx = context.WithValue(ctx, "workflow_id", w.workflowId)
ctx = context.WithValue(ctx, "workflow_run_id", w.runId)
return w.processNode(ctx, w.workflowContent)
}
func (w *workflowProcessor) GetLogs() []domain.WorkflowRunLog {
return w.runLogs
}
func (w *workflowProcessor) processNode(ctx context.Context, node *domain.WorkflowNode) error {
current := node
for current != nil {
if current.Type == domain.WorkflowNodeTypeBranch || current.Type == domain.WorkflowNodeTypeExecuteResultBranch {
for _, branch := range current.Branches {
if err := w.processNode(ctx, &branch); err != nil {
continue
}
}
}
var processor nodes.NodeProcessor
var runErr error
for {
if current.Type != domain.WorkflowNodeTypeBranch && current.Type != domain.WorkflowNodeTypeExecuteResultBranch {
processor, runErr = nodes.GetProcessor(current)
if runErr != nil {
break
}
runErr = processor.Process(ctx)
log := processor.GetLog(ctx)
if log != nil {
w.runLogs = append(w.runLogs, *log)
}
if runErr != nil {
break
}
}
break
}
if runErr != nil && current.Next != nil && current.Next.Type != domain.WorkflowNodeTypeExecuteResultBranch {
return runErr
} else if runErr != nil && current.Next != nil && current.Next.Type == domain.WorkflowNodeTypeExecuteResultBranch {
current = w.getBranchByType(current.Next.Branches, domain.WorkflowNodeTypeExecuteFailure)
} else if runErr == nil && current.Next != nil && current.Next.Type == domain.WorkflowNodeTypeExecuteResultBranch {
current = w.getBranchByType(current.Next.Branches, domain.WorkflowNodeTypeExecuteSuccess)
} else {
current = current.Next
}
}
return nil
}
func (w *workflowProcessor) getBranchByType(branches []domain.WorkflowNode, nodeType domain.WorkflowNodeType) *domain.WorkflowNode {
for _, branch := range branches {
if branch.Type == nodeType {
return &branch
}
}
return nil
}

View File

@ -4,23 +4,14 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/usual2970/certimate/internal/app" "github.com/usual2970/certimate/internal/app"
"github.com/usual2970/certimate/internal/domain" "github.com/usual2970/certimate/internal/domain"
"github.com/usual2970/certimate/internal/domain/dtos" "github.com/usual2970/certimate/internal/domain/dtos"
processor "github.com/usual2970/certimate/internal/workflow/processor" "github.com/usual2970/certimate/internal/workflow/dispatcher"
) )
const defaultRoutines = 16
type workflowRunData struct {
WorkflowId string
WorkflowContent *domain.WorkflowNode
RunTrigger domain.WorkflowTriggerType
}
type workflowRepository interface { type workflowRepository interface {
ListEnabledAuto(ctx context.Context) ([]*domain.Workflow, error) ListEnabledAuto(ctx context.Context) ([]*domain.Workflow, error)
GetById(ctx context.Context, id string) (*domain.Workflow, error) GetById(ctx context.Context, id string) (*domain.Workflow, error)
@ -33,30 +24,19 @@ type workflowRunRepository interface {
} }
type WorkflowService struct { type WorkflowService struct {
ch chan *workflowRunData dispatcher *dispatcher.WorkflowDispatcher
wg sync.WaitGroup
cancel context.CancelFunc
workflowRepo workflowRepository workflowRepo workflowRepository
workflowRunRepo workflowRunRepository workflowRunRepo workflowRunRepository
} }
func NewWorkflowService(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository) *WorkflowService { func NewWorkflowService(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository) *WorkflowService {
ctx, cancel := context.WithCancel(context.Background())
srv := &WorkflowService{ srv := &WorkflowService{
ch: make(chan *workflowRunData, 1), dispatcher: dispatcher.GetSingletonDispatcher(workflowRepo, workflowRunRepo),
cancel: cancel,
workflowRepo: workflowRepo, workflowRepo: workflowRepo,
workflowRunRepo: workflowRunRepo, workflowRunRepo: workflowRunRepo,
} }
srv.wg.Add(defaultRoutines)
for i := 0; i < defaultRoutines; i++ {
go srv.startRun(ctx)
}
return srv return srv
} }
@ -75,7 +55,6 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error {
}) })
}) })
if err != nil { if err != nil {
app.GetLogger().Error("failed to add schedule", "err", err)
return err return err
} }
} }
@ -86,7 +65,6 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error {
func (s *WorkflowService) StartRun(ctx context.Context, req *dtos.WorkflowStartRunReq) error { func (s *WorkflowService) StartRun(ctx context.Context, req *dtos.WorkflowStartRunReq) error {
workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId) workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId)
if err != nil { if err != nil {
app.GetLogger().Error("failed to get workflow", "id", req.WorkflowId, "err", err)
return err return err
} }
@ -94,69 +72,10 @@ func (s *WorkflowService) StartRun(ctx context.Context, req *dtos.WorkflowStartR
return errors.New("workflow is already pending or running") return errors.New("workflow is already pending or running")
} }
s.ch <- &workflowRunData{
WorkflowId: workflow.Id,
WorkflowContent: workflow.Content,
RunTrigger: req.RunTrigger,
}
return nil
}
func (s *WorkflowService) CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error {
workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId)
if err != nil {
app.GetLogger().Error("failed to get workflow", "id", req.WorkflowId, "err", err)
return err
}
workflowRun, err := s.workflowRunRepo.GetById(ctx, req.RunId)
if err != nil {
app.GetLogger().Error("failed to get workflow run", "id", req.RunId, "err", err)
return err
} else if workflowRun.WorkflowId != workflow.Id {
return errors.New("workflow run not found")
} else if workflowRun.Status != domain.WorkflowRunStatusTypePending && workflowRun.Status != domain.WorkflowRunStatusTypeRunning {
return errors.New("workflow run is not pending or running")
}
// TODO: 取消运行,防止因为某些原因意外挂起(如进程被杀死)导致工作流一直处于 running 状态无法重新运行
// workflowRun.Status = domain.WorkflowRunStatusTypeCanceled
// workflowRun.EndedAt = time.Now()
// if _, err := s.workflowRunRepo.Save(ctx, workflowRun); err != nil {
// return err
// }
// return nil
return errors.New("TODO: 尚未实现")
}
func (s *WorkflowService) Stop(ctx context.Context) {
s.cancel()
s.wg.Wait()
}
func (s *WorkflowService) startRun(ctx context.Context) {
defer s.wg.Done()
for {
select {
case data := <-s.ch:
if err := s.startRunWithData(ctx, data); err != nil {
app.GetLogger().Error("failed to run workflow", "id", data.WorkflowId, "err", err)
}
case <-ctx.Done():
return
}
}
}
func (s *WorkflowService) startRunWithData(ctx context.Context, data *workflowRunData) error {
run := &domain.WorkflowRun{ run := &domain.WorkflowRun{
WorkflowId: data.WorkflowId, WorkflowId: workflow.Id,
Status: domain.WorkflowRunStatusTypeRunning, Status: domain.WorkflowRunStatusTypePending,
Trigger: data.RunTrigger, Trigger: req.RunTrigger,
StartedAt: time.Now(), StartedAt: time.Now(),
} }
if resp, err := s.workflowRunRepo.Save(ctx, run); err != nil { if resp, err := s.workflowRunRepo.Save(ctx, run); err != nil {
@ -165,31 +84,35 @@ func (s *WorkflowService) startRunWithData(ctx context.Context, data *workflowRu
run = resp run = resp
} }
processor := processor.NewWorkflowProcessor(data.WorkflowId, data.WorkflowContent, run.Id) s.dispatcher.Dispatch(&dispatcher.WorkflowWorkerData{
if runErr := processor.Process(ctx); runErr != nil { WorkflowId: workflow.Id,
run.Status = domain.WorkflowRunStatusTypeFailed WorkflowContent: workflow.Content,
run.EndedAt = time.Now() RunId: run.Id,
run.Logs = processor.GetLogs() })
run.Error = runErr.Error()
if _, err := s.workflowRunRepo.Save(ctx, run); err != nil {
app.GetLogger().Error("failed to save workflow run", "err", err)
}
return fmt.Errorf("failed to run workflow: %w", runErr)
}
run.EndedAt = time.Now()
run.Logs = processor.GetLogs()
run.Error = domain.WorkflowRunLogs(run.Logs).ErrorString()
if run.Error == "" {
run.Status = domain.WorkflowRunStatusTypeSucceeded
} else {
run.Status = domain.WorkflowRunStatusTypeFailed
}
if _, err := s.workflowRunRepo.Save(ctx, run); err != nil {
app.GetLogger().Error("failed to save workflow run", "err", err)
return err
}
return nil return nil
} }
func (s *WorkflowService) CancelRun(ctx context.Context, req *dtos.WorkflowCancelRunReq) error {
workflow, err := s.workflowRepo.GetById(ctx, req.WorkflowId)
if err != nil {
return err
}
workflowRun, err := s.workflowRunRepo.GetById(ctx, req.RunId)
if err != nil {
return err
} else if workflowRun.WorkflowId != workflow.Id {
return errors.New("workflow run not found")
} else if workflowRun.Status != domain.WorkflowRunStatusTypePending && workflowRun.Status != domain.WorkflowRunStatusTypeRunning {
return errors.New("workflow run is not pending or running")
}
s.dispatcher.Cancel(workflowRun.Id)
return nil
}
func (s *WorkflowService) Shutdown(ctx context.Context) {
s.dispatcher.Shutdown()
}

View File

@ -1,13 +1,13 @@
import { useState } from "react"; import { useEffect, useState } from "react";
import { useTranslation } from "react-i18next"; import { useTranslation } from "react-i18next";
import { import {
CheckCircleOutlined as CheckCircleOutlinedIcon, CheckCircleOutlined as CheckCircleOutlinedIcon,
ClockCircleOutlined as ClockCircleOutlinedIcon, ClockCircleOutlined as ClockCircleOutlinedIcon,
CloseCircleOutlined as CloseCircleOutlinedIcon, CloseCircleOutlined as CloseCircleOutlinedIcon,
DeleteOutlined as DeleteOutlinedIcon, DeleteOutlined as DeleteOutlinedIcon,
PauseCircleOutlined as PauseCircleOutlinedIcon,
PauseOutlined as PauseOutlinedIcon, PauseOutlined as PauseOutlinedIcon,
SelectOutlined as SelectOutlinedIcon, SelectOutlined as SelectOutlinedIcon,
StopOutlined as StopOutlinedIcon,
SyncOutlined as SyncOutlinedIcon, SyncOutlined as SyncOutlinedIcon,
} from "@ant-design/icons"; } from "@ant-design/icons";
import { useRequest } from "ahooks"; import { useRequest } from "ahooks";
@ -18,7 +18,12 @@ import { ClientResponseError } from "pocketbase";
import { cancelRun as cancelWorkflowRun } from "@/api/workflows"; import { cancelRun as cancelWorkflowRun } from "@/api/workflows";
import { WORKFLOW_TRIGGERS } from "@/domain/workflow"; import { WORKFLOW_TRIGGERS } from "@/domain/workflow";
import { WORKFLOW_RUN_STATUSES, type WorkflowRunModel } from "@/domain/workflowRun"; import { WORKFLOW_RUN_STATUSES, type WorkflowRunModel } from "@/domain/workflowRun";
import { list as listWorkflowRuns, remove as removeWorkflowRun } from "@/repository/workflowRun"; import {
list as listWorkflowRuns,
remove as removeWorkflowRun,
subscribe as subscribeWorkflowRun,
unsubscribe as unsubscribeWorkflowRun,
} from "@/repository/workflowRun";
import { getErrMsg } from "@/utils/error"; import { getErrMsg } from "@/utils/error";
import WorkflowRunDetailDrawer from "./WorkflowRunDetailDrawer"; import WorkflowRunDetailDrawer from "./WorkflowRunDetailDrawer";
@ -75,7 +80,7 @@ const WorkflowRuns = ({ className, style, workflowId }: WorkflowRunsProps) => {
); );
} else if (record.status === WORKFLOW_RUN_STATUSES.CANCELED) { } else if (record.status === WORKFLOW_RUN_STATUSES.CANCELED) {
return ( return (
<Tag icon={<PauseCircleOutlinedIcon />} color="warning"> <Tag icon={<StopOutlinedIcon />} color="warning">
{t("workflow_run.props.status.canceled")} {t("workflow_run.props.status.canceled")}
</Tag> </Tag>
); );
@ -211,6 +216,27 @@ const WorkflowRuns = ({ className, style, workflowId }: WorkflowRunsProps) => {
} }
); );
useEffect(() => {
const items = tableData.filter((e) => e.status === WORKFLOW_RUN_STATUSES.PENDING || e.status === WORKFLOW_RUN_STATUSES.RUNNING);
for (const item of items) {
subscribeWorkflowRun(item.id, (cb) => {
setTableData((prev) => {
const index = prev.findIndex((e) => e.id === item.id);
if (index !== -1) {
prev[index] = cb.record;
}
return [...prev];
});
});
}
return () => {
for (const item of items) {
unsubscribeWorkflowRun(item.id);
}
};
}, [tableData]);
const handleCancelClick = (workflowRun: WorkflowRunModel) => { const handleCancelClick = (workflowRun: WorkflowRunModel) => {
modalApi.confirm({ modalApi.confirm({
title: t("workflow_run.action.cancel"), title: t("workflow_run.action.cancel"),

View File

@ -7,10 +7,10 @@ import {
ClockCircleOutlined as ClockCircleOutlinedIcon, ClockCircleOutlined as ClockCircleOutlinedIcon,
CloseCircleOutlined as CloseCircleOutlinedIcon, CloseCircleOutlined as CloseCircleOutlinedIcon,
LockOutlined as LockOutlinedIcon, LockOutlined as LockOutlinedIcon,
PauseCircleOutlined as PauseCircleOutlinedIcon,
PlusOutlined as PlusOutlinedIcon, PlusOutlined as PlusOutlinedIcon,
SelectOutlined as SelectOutlinedIcon, SelectOutlined as SelectOutlinedIcon,
SendOutlined as SendOutlinedIcon, SendOutlined as SendOutlinedIcon,
StopOutlined as StopOutlinedIcon,
SyncOutlined as SyncOutlinedIcon, SyncOutlined as SyncOutlinedIcon,
} from "@ant-design/icons"; } from "@ant-design/icons";
import { PageHeader } from "@ant-design/pro-components"; import { PageHeader } from "@ant-design/pro-components";
@ -89,7 +89,6 @@ const Dashboard = () => {
const workflow = record.expand?.workflowId; const workflow = record.expand?.workflowId;
return ( return (
<Typography.Link <Typography.Link
type="secondary"
ellipsis ellipsis
onClick={() => { onClick={() => {
if (workflow) { if (workflow) {
@ -129,7 +128,7 @@ const Dashboard = () => {
); );
} else if (record.status === WORKFLOW_RUN_STATUSES.CANCELED) { } else if (record.status === WORKFLOW_RUN_STATUSES.CANCELED) {
return ( return (
<Tag icon={<PauseCircleOutlinedIcon />} color="warning"> <Tag icon={<StopOutlinedIcon />} color="warning">
{t("workflow_run.props.status.canceled")} {t("workflow_run.props.status.canceled")}
</Tag> </Tag>
); );

View File

@ -42,7 +42,6 @@ const WorkflowDetail = () => {
useZustandShallowSelector(["workflow", "initialized", "init", "destroy", "setEnabled", "release", "discard"]) useZustandShallowSelector(["workflow", "initialized", "init", "destroy", "setEnabled", "release", "discard"])
); );
useEffect(() => { useEffect(() => {
// TODO: loading & error
workflowState.init(workflowId!); workflowState.init(workflowId!);
return () => { return () => {
@ -52,7 +51,7 @@ const WorkflowDetail = () => {
const [tabValue, setTabValue] = useState<"orchestration" | "runs">("orchestration"); const [tabValue, setTabValue] = useState<"orchestration" | "runs">("orchestration");
const [isRunning, setIsRunning] = useState(false); const [isPendingOrRunning, setIsPendingOrRunning] = useState(false);
const lastRunStatus = useMemo(() => workflow.lastRunStatus, [workflow]); const lastRunStatus = useMemo(() => workflow.lastRunStatus, [workflow]);
const [allowDiscard, setAllowDiscard] = useState(false); const [allowDiscard, setAllowDiscard] = useState(false);
@ -60,14 +59,14 @@ const WorkflowDetail = () => {
const [allowRun, setAllowRun] = useState(false); const [allowRun, setAllowRun] = useState(false);
useEffect(() => { useEffect(() => {
setIsRunning(lastRunStatus == WORKFLOW_RUN_STATUSES.PENDING || lastRunStatus == WORKFLOW_RUN_STATUSES.RUNNING); setIsPendingOrRunning(lastRunStatus == WORKFLOW_RUN_STATUSES.PENDING || lastRunStatus == WORKFLOW_RUN_STATUSES.RUNNING);
}, [lastRunStatus]); }, [lastRunStatus]);
useEffect(() => { useEffect(() => {
if (!!workflowId && isRunning) { if (!!workflowId && isPendingOrRunning) {
subscribeWorkflow(workflowId, (e) => { subscribeWorkflow(workflowId, (cb) => {
if (e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.PENDING && e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.RUNNING) { if (cb.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.PENDING && cb.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.RUNNING) {
setIsRunning(false); setIsPendingOrRunning(false);
unsubscribeWorkflow(workflowId); unsubscribeWorkflow(workflowId);
} }
}); });
@ -76,15 +75,15 @@ const WorkflowDetail = () => {
unsubscribeWorkflow(workflowId); unsubscribeWorkflow(workflowId);
}; };
} }
}, [workflowId, isRunning]); }, [workflowId, isPendingOrRunning]);
useEffect(() => { useEffect(() => {
const hasReleased = !!workflow.content; const hasReleased = !!workflow.content;
const hasChanges = workflow.hasDraft! || !isEqual(workflow.draft, workflow.content); const hasChanges = workflow.hasDraft! || !isEqual(workflow.draft, workflow.content);
setAllowDiscard(!isRunning && hasReleased && hasChanges); setAllowDiscard(!isPendingOrRunning && hasReleased && hasChanges);
setAllowRelease(!isRunning && hasChanges); setAllowRelease(!isPendingOrRunning && hasChanges);
setAllowRun(hasReleased); setAllowRun(hasReleased);
}, [workflow.content, workflow.draft, workflow.hasDraft, isRunning]); }, [workflow.content, workflow.draft, workflow.hasDraft, isPendingOrRunning]);
const handleEnableChange = async () => { const handleEnableChange = async () => {
if (!workflow.enabled && (!workflow.content || !isAllNodesValidated(workflow.content))) { if (!workflow.enabled && (!workflow.content || !isAllNodesValidated(workflow.content))) {
@ -174,12 +173,12 @@ const WorkflowDetail = () => {
let unsubscribeFn: Awaited<ReturnType<typeof subscribeWorkflow>> | undefined = undefined; let unsubscribeFn: Awaited<ReturnType<typeof subscribeWorkflow>> | undefined = undefined;
try { try {
setIsRunning(true); setIsPendingOrRunning(true);
// subscribe before running workflow // subscribe before running workflow
unsubscribeFn = await subscribeWorkflow(workflowId!, (e) => { unsubscribeFn = await subscribeWorkflow(workflowId!, (e) => {
if (e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.PENDING && e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.RUNNING) { if (e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.PENDING && e.record.lastRunStatus !== WORKFLOW_RUN_STATUSES.RUNNING) {
setIsRunning(false); setIsPendingOrRunning(false);
unsubscribeFn?.(); unsubscribeFn?.();
} }
}); });
@ -188,7 +187,7 @@ const WorkflowDetail = () => {
messageApi.info(t("workflow.detail.orchestration.action.run.prompt")); messageApi.info(t("workflow.detail.orchestration.action.run.prompt"));
} catch (err) { } catch (err) {
setIsRunning(false); setIsPendingOrRunning(false);
unsubscribeFn?.(); unsubscribeFn?.();
console.error(err); console.error(err);
@ -279,7 +278,7 @@ const WorkflowDetail = () => {
</div> </div>
<div className="flex justify-end"> <div className="flex justify-end">
<Space> <Space>
<Button disabled={!allowRun} icon={<CaretRightOutlinedIcon />} loading={isRunning} type="primary" onClick={handleRunClick}> <Button disabled={!allowRun} icon={<CaretRightOutlinedIcon />} loading={isPendingOrRunning} type="primary" onClick={handleRunClick}>
{t("workflow.detail.orchestration.action.run")} {t("workflow.detail.orchestration.action.run")}
</Button> </Button>

View File

@ -7,8 +7,8 @@ import {
CloseCircleOutlined as CloseCircleOutlinedIcon, CloseCircleOutlined as CloseCircleOutlinedIcon,
DeleteOutlined as DeleteOutlinedIcon, DeleteOutlined as DeleteOutlinedIcon,
EditOutlined as EditOutlinedIcon, EditOutlined as EditOutlinedIcon,
PauseCircleOutlined as PauseCircleOutlinedIcon,
PlusOutlined as PlusOutlinedIcon, PlusOutlined as PlusOutlinedIcon,
StopOutlined as StopOutlinedIcon,
SyncOutlined as SyncOutlinedIcon, SyncOutlined as SyncOutlinedIcon,
} from "@ant-design/icons"; } from "@ant-design/icons";
@ -170,7 +170,7 @@ const WorkflowList = () => {
} else if (record.lastRunStatus === WORKFLOW_RUN_STATUSES.FAILED) { } else if (record.lastRunStatus === WORKFLOW_RUN_STATUSES.FAILED) {
icon = <CloseCircleOutlinedIcon style={{ color: themeToken.colorError }} />; icon = <CloseCircleOutlinedIcon style={{ color: themeToken.colorError }} />;
} else if (record.lastRunStatus === WORKFLOW_RUN_STATUSES.CANCELED) { } else if (record.lastRunStatus === WORKFLOW_RUN_STATUSES.CANCELED) {
icon = <PauseCircleOutlinedIcon style={{ color: themeToken.colorWarning }} />; icon = <StopOutlinedIcon style={{ color: themeToken.colorWarning }} />;
} }
return ( return (

View File

@ -50,13 +50,9 @@ export const remove = async (record: MaybeModelRecordWithId<WorkflowModel>) => {
}; };
export const subscribe = async (id: string, cb: (e: RecordSubscription<WorkflowModel>) => void) => { export const subscribe = async (id: string, cb: (e: RecordSubscription<WorkflowModel>) => void) => {
const pb = getPocketBase(); return getPocketBase().collection(COLLECTION_NAME).subscribe(id, cb);
return pb.collection("workflow").subscribe(id, cb);
}; };
export const unsubscribe = async (id: string) => { export const unsubscribe = async (id: string) => {
const pb = getPocketBase(); return getPocketBase().collection(COLLECTION_NAME).unsubscribe(id);
return pb.collection("workflow").unsubscribe(id);
}; };

View File

@ -1,4 +1,6 @@
import { type WorkflowRunModel } from "@/domain/workflowRun"; import { type RecordSubscription } from "pocketbase";
import { type WorkflowRunModel } from "@/domain/workflowRun";
import { getPocketBase } from "./_pocketbase"; import { getPocketBase } from "./_pocketbase";
@ -35,3 +37,11 @@ export const list = async (request: ListWorkflowRunsRequest) => {
export const remove = async (record: MaybeModelRecordWithId<WorkflowRunModel>) => { export const remove = async (record: MaybeModelRecordWithId<WorkflowRunModel>) => {
return await getPocketBase().collection(COLLECTION_NAME).delete(record.id); return await getPocketBase().collection(COLLECTION_NAME).delete(record.id);
}; };
export const subscribe = async (id: string, cb: (e: RecordSubscription<WorkflowRunModel>) => void) => {
return getPocketBase().collection(COLLECTION_NAME).subscribe(id, cb);
};
export const unsubscribe = async (id: string) => {
return getPocketBase().collection(COLLECTION_NAME).unsubscribe(id);
};