From 79c1da6d14b0037bd99fb06a1c9c2828926be8f0 Mon Sep 17 00:00:00 2001 From: Fu Diwei Date: Wed, 22 Jan 2025 02:47:56 +0800 Subject: [PATCH] feat: a new status for canceled workflow run --- internal/certificate/service.go | 2 +- internal/domain/workflow_run.go | 15 ++-- internal/pkg/utils/certs/parser.go | 15 ++++ internal/pkg/utils/certs/transformer.go | 3 +- .../workflow/node-processor/apply_node.go | 2 +- internal/workflow/processor/processor.go | 40 ++++----- internal/workflow/service.go | 84 +++++++++---------- migrations/1737479489_updated_workflow.go | 65 ++++++++++++++ migrations/1737479538_updated_workflow_run.go | 65 ++++++++++++++ .../workflow/WorkflowRunDetailDrawer.tsx | 4 +- ui/src/components/workflow/WorkflowRuns.tsx | 13 ++- .../workflow/node/DeployNodeConfigForm.tsx | 11 ++- .../workflow/node/StartNodeConfigForm.tsx | 2 +- ui/src/domain/workflowRun.ts | 1 + .../i18n/locales/en/nls.workflow.nodes.json | 3 +- ui/src/i18n/locales/en/nls.workflow.runs.json | 1 + .../i18n/locales/zh/nls.workflow.nodes.json | 7 +- ui/src/i18n/locales/zh/nls.workflow.runs.json | 5 +- ui/src/pages/dashboard/Dashboard.tsx | 43 ++++++---- ui/src/pages/workflows/WorkflowList.tsx | 3 + 20 files changed, 280 insertions(+), 104 deletions(-) create mode 100644 migrations/1737479489_updated_workflow.go create mode 100644 migrations/1737479538_updated_workflow_run.go diff --git a/internal/certificate/service.go b/internal/certificate/service.go index 1e9dd462..692c2f89 100644 --- a/internal/certificate/service.go +++ b/internal/certificate/service.go @@ -22,8 +22,8 @@ const ( ) type certificateRepository interface { - GetById(ctx context.Context, id string) (*domain.Certificate, error) ListExpireSoon(ctx context.Context) ([]*domain.Certificate, error) + GetById(ctx context.Context, id string) (*domain.Certificate, error) } type CertificateService struct { diff --git a/internal/domain/workflow_run.go b/internal/domain/workflow_run.go index 27ec30de..25ba1d7a 100644 --- a/internal/domain/workflow_run.go +++ b/internal/domain/workflow_run.go @@ -1,6 +1,9 @@ package domain -import "time" +import ( + "strings" + "time" +) const CollectionNameWorkflowRun = "workflow_run" @@ -22,6 +25,7 @@ const ( WorkflowRunStatusTypeRunning WorkflowRunStatusType = "running" WorkflowRunStatusTypeSucceeded WorkflowRunStatusType = "succeeded" WorkflowRunStatusTypeFailed WorkflowRunStatusType = "failed" + WorkflowRunStatusTypeCanceled WorkflowRunStatusType = "canceled" ) type WorkflowRunLog struct { @@ -40,12 +44,13 @@ type WorkflowRunLogOutput struct { type WorkflowRunLogs []WorkflowRunLog -func (r WorkflowRunLogs) FirstError() string { +func (r WorkflowRunLogs) ErrorString() string { + var builder strings.Builder for _, log := range r { if log.Error != "" { - return log.Error + builder.WriteString(log.Error) + builder.WriteString("\n") } } - - return "" + return builder.String() } diff --git a/internal/pkg/utils/certs/parser.go b/internal/pkg/utils/certs/parser.go index d03d6395..89338336 100644 --- a/internal/pkg/utils/certs/parser.go +++ b/internal/pkg/utils/certs/parser.go @@ -1,12 +1,14 @@ package certs import ( + "crypto" "crypto/ecdsa" "crypto/rsa" "crypto/x509" "encoding/pem" "errors" + "github.com/go-acme/lego/v4/certcrypto" xerrors "github.com/pkg/errors" ) @@ -34,6 +36,19 @@ func ParseCertificateFromPEM(certPem string) (cert *x509.Certificate, err error) return cert, nil } +// 从 PEM 编码的私钥字符串解析并返回一个 crypto.PrivateKey 对象。 +// +// 入参: +// - privkeyPem: 私钥 PEM 内容。 +// +// 出参: +// - privkey: crypto.PrivateKey 对象,可能是 rsa.PrivateKey、ecdsa.PrivateKey 或 ed25519.PrivateKey。 +// - err: 错误。 +func ParsePrivateKeyFromPEM(privkeyPem string) (privkey crypto.PrivateKey, err error) { + pemData := []byte(privkeyPem) + return certcrypto.ParsePEMPrivateKey(pemData) +} + // 从 PEM 编码的私钥字符串解析并返回一个 ecdsa.PrivateKey 对象。 // // 入参: diff --git a/internal/pkg/utils/certs/transformer.go b/internal/pkg/utils/certs/transformer.go index cd1a2039..60105d3e 100644 --- a/internal/pkg/utils/certs/transformer.go +++ b/internal/pkg/utils/certs/transformer.go @@ -6,7 +6,6 @@ import ( "errors" "time" - "github.com/go-acme/lego/v4/certcrypto" "github.com/pavlo-v-chernykh/keystore-go/v4" "software.sslmate.com/src/go-pkcs12" ) @@ -27,7 +26,7 @@ func TransformCertificateFromPEMToPFX(certPem string, privkeyPem string, pfxPass return nil, err } - privkey, err := certcrypto.ParsePEMPrivateKey([]byte(privkeyPem)) + privkey, err := ParsePrivateKeyFromPEM(privkeyPem) if err != nil { return nil, err } diff --git a/internal/workflow/node-processor/apply_node.go b/internal/workflow/node-processor/apply_node.go index 11725d53..5ca379c4 100644 --- a/internal/workflow/node-processor/apply_node.go +++ b/internal/workflow/node-processor/apply_node.go @@ -134,7 +134,7 @@ func (a *applyNode) checkCanSkip(ctx context.Context, lastOutput *domain.Workflo renewalInterval := time.Duration(currentNodeConfig.SkipBeforeExpiryDays) * time.Hour * 24 expirationTime := time.Until(lastCertificate.ExpireAt) if lastCertificate != nil && expirationTime > renewalInterval { - return true, fmt.Sprintf("已申请过证书,且证书尚未临近过期(到期尚余 %d 天,距 %d 天时续期)", int(expirationTime.Hours()/24), currentNodeConfig.SkipBeforeExpiryDays) + return true, fmt.Sprintf("已申请过证书,且证书尚未临近过期(到期尚余 %d 天,预计距 %d 天时续期)", int(expirationTime.Hours()/24), currentNodeConfig.SkipBeforeExpiryDays) } } diff --git a/internal/workflow/processor/processor.go b/internal/workflow/processor/processor.go index f011c152..47663136 100644 --- a/internal/workflow/processor/processor.go +++ b/internal/workflow/processor/processor.go @@ -19,15 +19,15 @@ func NewWorkflowProcessor(workflow *domain.Workflow) *workflowProcessor { } } -func (w *workflowProcessor) Log(ctx context.Context) []domain.WorkflowRunLog { - return w.logs -} - func (w *workflowProcessor) Run(ctx context.Context) error { ctx = setContextWorkflowId(ctx, w.workflow.Id) return w.processNode(ctx, w.workflow.Content) } +func (w *workflowProcessor) GetRunLogs() []domain.WorkflowRunLog { + return w.logs +} + func (w *workflowProcessor) processNode(ctx context.Context, node *domain.WorkflowNode) error { current := node for current != nil { @@ -39,26 +39,26 @@ func (w *workflowProcessor) processNode(ctx context.Context, node *domain.Workfl } } - var runErr error var processor nodes.NodeProcessor + var runErr error for { - if current.Type == domain.WorkflowNodeTypeBranch || current.Type == domain.WorkflowNodeTypeExecuteResultBranch { - break + if current.Type != domain.WorkflowNodeTypeBranch && current.Type != domain.WorkflowNodeTypeExecuteResultBranch { + processor, runErr = nodes.GetProcessor(current) + if runErr != nil { + break + } + + runErr = processor.Run(ctx) + log := processor.Log(ctx) + if log != nil { + w.logs = append(w.logs, *log) + } + if runErr != nil { + break + } } - processor, runErr = nodes.GetProcessor(current) - if runErr != nil { - break - } - - runErr = processor.Run(ctx) - log := processor.Log(ctx) - if log != nil { - w.logs = append(w.logs, *log) - } - if runErr != nil { - break - } + break } if runErr != nil && current.Next != nil && current.Next.Type != domain.WorkflowNodeTypeExecuteResultBranch { diff --git a/internal/workflow/service.go b/internal/workflow/service.go index 62f8a888..5a34805e 100644 --- a/internal/workflow/service.go +++ b/internal/workflow/service.go @@ -35,35 +35,20 @@ type WorkflowService struct { } func NewWorkflowService(repo workflowRepository) *WorkflowService { - rs := &WorkflowService{ + srv := &WorkflowService{ repo: repo, ch: make(chan *workflowRunData, 1), } ctx, cancel := context.WithCancel(context.Background()) - rs.cancel = cancel + srv.cancel = cancel - rs.wg.Add(defaultRoutines) + srv.wg.Add(defaultRoutines) for i := 0; i < defaultRoutines; i++ { - go rs.process(ctx) + go srv.run(ctx) } - return rs -} - -func (s *WorkflowService) process(ctx context.Context) { - defer s.wg.Done() - for { - select { - case data := <-s.ch: - // 执行 - if err := s.run(ctx, data); err != nil { - app.GetLogger().Error("failed to run workflow", "id", data.Workflow.Id, "err", err) - } - case <-ctx.Done(): - return - } - } + return srv } func (s *WorkflowService) InitSchedule(ctx context.Context) error { @@ -90,7 +75,6 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error { } func (s *WorkflowService) Run(ctx context.Context, req *dtos.WorkflowRunReq) error { - // 查询 workflow, err := s.repo.GetById(ctx, req.WorkflowId) if err != nil { app.GetLogger().Error("failed to get workflow", "id", req.WorkflowId, "err", err) @@ -101,9 +85,8 @@ func (s *WorkflowService) Run(ctx context.Context, req *dtos.WorkflowRunReq) err return errors.New("workflow is running") } - // set last run workflow.LastRunTime = time.Now() - workflow.LastRunStatus = domain.WorkflowRunStatusTypeRunning + workflow.LastRunStatus = domain.WorkflowRunStatusTypePending workflow.LastRunId = "" if err := s.repo.Save(ctx, workflow); err != nil { @@ -118,42 +101,56 @@ func (s *WorkflowService) Run(ctx context.Context, req *dtos.WorkflowRunReq) err return nil } -func (s *WorkflowService) run(ctx context.Context, runData *workflowRunData) error { - // 执行 +func (s *WorkflowService) Stop(ctx context.Context) { + s.cancel() + s.wg.Wait() +} + +func (s *WorkflowService) run(ctx context.Context) { + defer s.wg.Done() + for { + select { + case data := <-s.ch: + if err := s.runWithData(ctx, data); err != nil { + app.GetLogger().Error("failed to run workflow", "id", data.Workflow.Id, "err", err) + } + case <-ctx.Done(): + return + } + } +} + +func (s *WorkflowService) runWithData(ctx context.Context, runData *workflowRunData) error { workflow := runData.Workflow + run := &domain.WorkflowRun{ WorkflowId: workflow.Id, Status: domain.WorkflowRunStatusTypeRunning, Trigger: runData.RunTrigger, StartedAt: time.Now(), - EndedAt: time.Now(), } processor := processor.NewWorkflowProcessor(workflow) - if err := processor.Run(ctx); err != nil { + if runErr := processor.Run(ctx); runErr != nil { run.Status = domain.WorkflowRunStatusTypeFailed run.EndedAt = time.Now() - run.Logs = processor.Log(ctx) - run.Error = err.Error() - + run.Logs = processor.GetRunLogs() + run.Error = runErr.Error() if err := s.repo.SaveRun(ctx, run); err != nil { app.GetLogger().Error("failed to save workflow run", "err", err) } - return fmt.Errorf("failed to run workflow: %w", err) + return fmt.Errorf("failed to run workflow: %w", runErr) } - // 保存日志 - logs := processor.Log(ctx) - runStatus := domain.WorkflowRunStatusTypeSucceeded - runError := domain.WorkflowRunLogs(logs).FirstError() - if runError != "" { - runStatus = domain.WorkflowRunStatusTypeFailed - } - run.Status = runStatus run.EndedAt = time.Now() - run.Logs = processor.Log(ctx) - run.Error = runError + run.Logs = processor.GetRunLogs() + run.Error = domain.WorkflowRunLogs(run.Logs).ErrorString() + if run.Error == "" { + run.Status = domain.WorkflowRunStatusTypeSucceeded + } else { + run.Status = domain.WorkflowRunStatusTypeFailed + } if err := s.repo.SaveRun(ctx, run); err != nil { app.GetLogger().Error("failed to save workflow run", "err", err) return err @@ -161,8 +158,3 @@ func (s *WorkflowService) run(ctx context.Context, runData *workflowRunData) err return nil } - -func (s *WorkflowService) Stop(ctx context.Context) { - s.cancel() - s.wg.Wait() -} diff --git a/migrations/1737479489_updated_workflow.go b/migrations/1737479489_updated_workflow.go new file mode 100644 index 00000000..7e06df36 --- /dev/null +++ b/migrations/1737479489_updated_workflow.go @@ -0,0 +1,65 @@ +package migrations + +import ( + "github.com/pocketbase/pocketbase/core" + m "github.com/pocketbase/pocketbase/migrations" +) + +func init() { + m.Register(func(app core.App) error { + collection, err := app.FindCollectionByNameOrId("tovyif5ax6j62ur") + if err != nil { + return err + } + + // update field + if err := collection.Fields.AddMarshaledJSONAt(10, []byte(`{ + "hidden": false, + "id": "zivdxh23", + "maxSelect": 1, + "name": "lastRunStatus", + "presentable": false, + "required": false, + "system": false, + "type": "select", + "values": [ + "pending", + "running", + "succeeded", + "failed", + "canceled" + ] + }`)); err != nil { + return err + } + + return app.Save(collection) + }, func(app core.App) error { + collection, err := app.FindCollectionByNameOrId("tovyif5ax6j62ur") + if err != nil { + return err + } + + // update field + if err := collection.Fields.AddMarshaledJSONAt(10, []byte(`{ + "hidden": false, + "id": "zivdxh23", + "maxSelect": 1, + "name": "lastRunStatus", + "presentable": false, + "required": false, + "system": false, + "type": "select", + "values": [ + "pending", + "running", + "succeeded", + "failed" + ] + }`)); err != nil { + return err + } + + return app.Save(collection) + }) +} diff --git a/migrations/1737479538_updated_workflow_run.go b/migrations/1737479538_updated_workflow_run.go new file mode 100644 index 00000000..b5ff20a7 --- /dev/null +++ b/migrations/1737479538_updated_workflow_run.go @@ -0,0 +1,65 @@ +package migrations + +import ( + "github.com/pocketbase/pocketbase/core" + m "github.com/pocketbase/pocketbase/migrations" +) + +func init() { + m.Register(func(app core.App) error { + collection, err := app.FindCollectionByNameOrId("qjp8lygssgwyqyz") + if err != nil { + return err + } + + // update field + if err := collection.Fields.AddMarshaledJSONAt(2, []byte(`{ + "hidden": false, + "id": "qldmh0tw", + "maxSelect": 1, + "name": "status", + "presentable": false, + "required": false, + "system": false, + "type": "select", + "values": [ + "pending", + "running", + "succeeded", + "failed", + "canceled" + ] + }`)); err != nil { + return err + } + + return app.Save(collection) + }, func(app core.App) error { + collection, err := app.FindCollectionByNameOrId("qjp8lygssgwyqyz") + if err != nil { + return err + } + + // update field + if err := collection.Fields.AddMarshaledJSONAt(2, []byte(`{ + "hidden": false, + "id": "qldmh0tw", + "maxSelect": 1, + "name": "status", + "presentable": false, + "required": false, + "system": false, + "type": "select", + "values": [ + "pending", + "running", + "succeeded", + "failed" + ] + }`)); err != nil { + return err + } + + return app.Save(collection) + }) +} diff --git a/ui/src/components/workflow/WorkflowRunDetailDrawer.tsx b/ui/src/components/workflow/WorkflowRunDetailDrawer.tsx index 5c052aa1..11e2847c 100644 --- a/ui/src/components/workflow/WorkflowRunDetailDrawer.tsx +++ b/ui/src/components/workflow/WorkflowRunDetailDrawer.tsx @@ -41,11 +41,11 @@ const WorkflowRunDetailDrawer = ({ data, loading, trigger, ...props }: WorkflowR
-
+
{data!.logs?.map((item, i) => { return (
-
{item.nodeName}
+
{item.nodeName}
{item.outputs?.map((output, j) => { return ( diff --git a/ui/src/components/workflow/WorkflowRuns.tsx b/ui/src/components/workflow/WorkflowRuns.tsx index 493f6396..788bf1e0 100644 --- a/ui/src/components/workflow/WorkflowRuns.tsx +++ b/ui/src/components/workflow/WorkflowRuns.tsx @@ -5,6 +5,7 @@ import { ClockCircleOutlined as ClockCircleOutlinedIcon, CloseCircleOutlined as CloseCircleOutlinedIcon, DeleteOutlined as DeleteOutlinedIcon, + PauseCircleOutlined as PauseCircleOutlinedIcon, SelectOutlined as SelectOutlinedIcon, SyncOutlined as SyncOutlinedIcon, } from "@ant-design/icons"; @@ -70,6 +71,12 @@ const WorkflowRuns = ({ className, style, workflowId }: WorkflowRunsProps) => { {t("workflow_run.props.status.failed")} ); + } else if (record.status === WORKFLOW_RUN_STATUSES.CANCELED) { + return ( + } color="warning"> + {t("workflow_run.props.status.canceled")} + + ); } return <>; @@ -133,7 +140,11 @@ const WorkflowRuns = ({ className, style, workflowId }: WorkflowRunsProps) => { - - - diff --git a/ui/src/pages/workflows/WorkflowList.tsx b/ui/src/pages/workflows/WorkflowList.tsx index 99eebbbd..28b776e7 100644 --- a/ui/src/pages/workflows/WorkflowList.tsx +++ b/ui/src/pages/workflows/WorkflowList.tsx @@ -7,6 +7,7 @@ import { CloseCircleOutlined as CloseCircleOutlinedIcon, DeleteOutlined as DeleteOutlinedIcon, EditOutlined as EditOutlinedIcon, + PauseCircleOutlined as PauseCircleOutlinedIcon, PlusOutlined as PlusOutlinedIcon, SyncOutlined as SyncOutlinedIcon, } from "@ant-design/icons"; @@ -168,6 +169,8 @@ const WorkflowList = () => { icon = ; } else if (record.lastRunStatus === WORKFLOW_RUN_STATUSES.FAILED) { icon = ; + } else if (record.lastRunStatus === WORKFLOW_RUN_STATUSES.CANCELED) { + icon = ; } return (