From d32fce98ae7f0a3a0fcfec6ba467e2e7791b49f5 Mon Sep 17 00:00:00 2001 From: Fu Diwei Date: Thu, 6 Feb 2025 19:56:41 +0800 Subject: [PATCH] feat: save related runId in certificates or workflow outputs --- internal/domain/certificate.go | 1 + internal/domain/workflow_output.go | 1 + internal/repository/certificate.go | 2 + internal/repository/workflow_output.go | 9 ++- .../workflow/node-processor/apply_node.go | 9 ++- .../workflow/node-processor/deploy_node.go | 14 ++-- .../workflow/node-processor/notify_node.go | 10 +-- internal/workflow/node-processor/processor.go | 8 ++- .../workflow/node-processor/upload_node.go | 39 ++++++++++- internal/workflow/processor/processor.go | 25 ++++--- internal/workflow/service.go | 2 +- migrations/1738839725_updated_certificate.go | 67 +++++++++++++++++++ .../1738840633_updated_workflow_output.go | 63 +++++++++++++++++ 13 files changed, 215 insertions(+), 35 deletions(-) create mode 100644 migrations/1738839725_updated_certificate.go create mode 100644 migrations/1738840633_updated_workflow_output.go diff --git a/internal/domain/certificate.go b/internal/domain/certificate.go index 1abecb9c..f57a2c4a 100644 --- a/internal/domain/certificate.go +++ b/internal/domain/certificate.go @@ -27,6 +27,7 @@ type Certificate struct { ACMECertStableUrl string `json:"acmeCertStableUrl" db:"acmeCertStableUrl"` WorkflowId string `json:"workflowId" db:"workflowId"` WorkflowNodeId string `json:"workflowNodeId" db:"workflowNodeId"` + WorkflowRunId string `json:"workflowRunId" db:"workflowRunId"` WorkflowOutputId string `json:"workflowOutputId" db:"workflowOutputId"` DeletedAt *time.Time `json:"deleted" db:"deleted"` } diff --git a/internal/domain/workflow_output.go b/internal/domain/workflow_output.go index 44a70bc2..57af169b 100644 --- a/internal/domain/workflow_output.go +++ b/internal/domain/workflow_output.go @@ -5,6 +5,7 @@ const CollectionNameWorkflowOutput = "workflow_output" type WorkflowOutput struct { Meta WorkflowId string `json:"workflowId" db:"workflow"` + RunId string `json:"runId" db:"runId"` NodeId string `json:"nodeId" db:"nodeId"` Node *WorkflowNode `json:"node" db:"node"` Outputs []WorkflowNodeIO `json:"outputs" db:"outputs"` diff --git a/internal/repository/certificate.go b/internal/repository/certificate.go index db0e2b4c..0695ca47 100644 --- a/internal/repository/certificate.go +++ b/internal/repository/certificate.go @@ -112,6 +112,7 @@ func (r *CertificateRepository) Save(ctx context.Context, certificate *domain.Ce record.Set("acmeCertUrl", certificate.ACMECertUrl) record.Set("acmeCertStableUrl", certificate.ACMECertStableUrl) record.Set("workflowId", certificate.WorkflowId) + record.Set("workflowRunId", certificate.WorkflowRunId) record.Set("workflowNodeId", certificate.WorkflowNodeId) record.Set("workflowOutputId", certificate.WorkflowOutputId) if err := app.GetApp().Save(record); err != nil { @@ -149,6 +150,7 @@ func (r *CertificateRepository) castRecordToModel(record *core.Record) (*domain. ACMECertUrl: record.GetString("acmeCertUrl"), ACMECertStableUrl: record.GetString("acmeCertStableUrl"), WorkflowId: record.GetString("workflowId"), + WorkflowRunId: record.GetString("workflowRunId"), WorkflowNodeId: record.GetString("workflowNodeId"), WorkflowOutputId: record.GetString("workflowOutputId"), } diff --git a/internal/repository/workflow_output.go b/internal/repository/workflow_output.go index f5965396..1adf0c8c 100644 --- a/internal/repository/workflow_output.go +++ b/internal/repository/workflow_output.go @@ -40,7 +40,7 @@ func (r *WorkflowOutputRepository) GetByNodeId(ctx context.Context, workflowNode } func (r *WorkflowOutputRepository) Save(ctx context.Context, workflowOutput *domain.WorkflowOutput) (*domain.WorkflowOutput, error) { - record, err := r.saveRecord(ctx, workflowOutput) + record, err := r.saveRecord(workflowOutput) if err != nil { return workflowOutput, err } @@ -52,7 +52,7 @@ func (r *WorkflowOutputRepository) Save(ctx context.Context, workflowOutput *dom } func (r *WorkflowOutputRepository) SaveWithCertificate(ctx context.Context, workflowOutput *domain.WorkflowOutput, certificate *domain.Certificate) (*domain.WorkflowOutput, error) { - record, err := r.saveRecord(ctx, workflowOutput) + record, err := r.saveRecord(workflowOutput) if err != nil { return workflowOutput, err } else { @@ -63,6 +63,7 @@ func (r *WorkflowOutputRepository) SaveWithCertificate(ctx context.Context, work if certificate != nil { certificate.WorkflowId = workflowOutput.WorkflowId + certificate.WorkflowRunId = workflowOutput.RunId certificate.WorkflowNodeId = workflowOutput.NodeId certificate.WorkflowOutputId = workflowOutput.Id certificate, err := NewCertificateRepository().Save(ctx, certificate) @@ -108,6 +109,7 @@ func (r *WorkflowOutputRepository) castRecordToModel(record *core.Record) (*doma UpdatedAt: record.GetDateTime("updated").Time(), }, WorkflowId: record.GetString("workflowId"), + RunId: record.GetString("runId"), NodeId: record.GetString("nodeId"), Node: node, Outputs: outputs, @@ -116,7 +118,7 @@ func (r *WorkflowOutputRepository) castRecordToModel(record *core.Record) (*doma return workflowOutput, nil } -func (r *WorkflowOutputRepository) saveRecord(ctx context.Context, output *domain.WorkflowOutput) (*core.Record, error) { +func (r *WorkflowOutputRepository) saveRecord(output *domain.WorkflowOutput) (*core.Record, error) { collection, err := app.GetApp().FindCollectionByNameOrId(domain.CollectionNameWorkflowOutput) if err != nil { return nil, err @@ -132,6 +134,7 @@ func (r *WorkflowOutputRepository) saveRecord(ctx context.Context, output *domai } } record.Set("workflowId", output.WorkflowId) + record.Set("runId", output.RunId) record.Set("nodeId", output.NodeId) record.Set("node", output.Node) record.Set("outputs", output.Outputs) diff --git a/internal/workflow/node-processor/apply_node.go b/internal/workflow/node-processor/apply_node.go index d600ebb1..0fdf268f 100644 --- a/internal/workflow/node-processor/apply_node.go +++ b/internal/workflow/node-processor/apply_node.go @@ -14,18 +14,20 @@ import ( ) type applyNode struct { - node *domain.WorkflowNode + node *domain.WorkflowNode + *nodeLogger + certRepo certificateRepository outputRepo workflowOutputRepository - *nodeLogger } func NewApplyNode(node *domain.WorkflowNode) *applyNode { return &applyNode{ node: node, nodeLogger: NewNodeLogger(node), - outputRepo: repository.NewWorkflowOutputRepository(), + certRepo: repository.NewCertificateRepository(), + outputRepo: repository.NewWorkflowOutputRepository(), } } @@ -81,6 +83,7 @@ func (n *applyNode) Run(ctx context.Context) error { // TODO: 先保持一个节点始终只有一个输出,后续增加版本控制 currentOutput := &domain.WorkflowOutput{ WorkflowId: getContextWorkflowId(ctx), + RunId: getContextWorkflowRunId(ctx), NodeId: n.node.Id, Node: n.node, Succeeded: true, diff --git a/internal/workflow/node-processor/deploy_node.go b/internal/workflow/node-processor/deploy_node.go index 28acadb0..f5379519 100644 --- a/internal/workflow/node-processor/deploy_node.go +++ b/internal/workflow/node-processor/deploy_node.go @@ -12,18 +12,20 @@ import ( ) type deployNode struct { - node *domain.WorkflowNode + node *domain.WorkflowNode + *nodeLogger + certRepo certificateRepository outputRepo workflowOutputRepository - *nodeLogger } func NewDeployNode(node *domain.WorkflowNode) *deployNode { return &deployNode{ node: node, nodeLogger: NewNodeLogger(node), - outputRepo: repository.NewWorkflowOutputRepository(), + certRepo: repository.NewCertificateRepository(), + outputRepo: repository.NewWorkflowOutputRepository(), } } @@ -61,7 +63,7 @@ func (n *deployNode) Run(ctx context.Context) error { } // 初始化部署器 - deploy, err := deployer.NewWithDeployNode(n.node, struct { + deployer, err := deployer.NewWithDeployNode(n.node, struct { Certificate string PrivateKey string }{Certificate: certificate.Certificate, PrivateKey: certificate.PrivateKey}) @@ -71,7 +73,7 @@ func (n *deployNode) Run(ctx context.Context) error { } // 部署证书 - if err := deploy.Deploy(ctx); err != nil { + if err := deployer.Deploy(ctx); err != nil { n.AddOutput(ctx, n.node.Name, "部署失败", err.Error()) return err } @@ -80,8 +82,8 @@ func (n *deployNode) Run(ctx context.Context) error { // 保存执行结果 // TODO: 先保持一个节点始终只有一个输出,后续增加版本控制 currentOutput := &domain.WorkflowOutput{ - Meta: domain.Meta{}, WorkflowId: getContextWorkflowId(ctx), + RunId: getContextWorkflowRunId(ctx), NodeId: n.node.Id, Node: n.node, Succeeded: true, diff --git a/internal/workflow/node-processor/notify_node.go b/internal/workflow/node-processor/notify_node.go index 0ba5eb1f..052ebda7 100644 --- a/internal/workflow/node-processor/notify_node.go +++ b/internal/workflow/node-processor/notify_node.go @@ -9,15 +9,17 @@ import ( ) type notifyNode struct { - node *domain.WorkflowNode - settingsRepo settingsRepository + node *domain.WorkflowNode *nodeLogger + + settingsRepo settingsRepository } func NewNotifyNode(node *domain.WorkflowNode) *notifyNode { return ¬ifyNode{ - node: node, - nodeLogger: NewNodeLogger(node), + node: node, + nodeLogger: NewNodeLogger(node), + settingsRepo: repository.NewSettingsRepository(), } } diff --git a/internal/workflow/node-processor/processor.go b/internal/workflow/node-processor/processor.go index bf2b12f4..33e82e3b 100644 --- a/internal/workflow/node-processor/processor.go +++ b/internal/workflow/node-processor/processor.go @@ -10,7 +10,7 @@ import ( type NodeProcessor interface { Run(ctx context.Context) error - Log(ctx context.Context) *domain.WorkflowRunLog + GetLog(ctx context.Context) *domain.WorkflowRunLog AddOutput(ctx context.Context, title, content string, err ...string) } @@ -42,7 +42,7 @@ func NewNodeLogger(node *domain.WorkflowNode) *nodeLogger { } } -func (l *nodeLogger) Log(ctx context.Context) *domain.WorkflowRunLog { +func (l *nodeLogger) GetLog(ctx context.Context) *domain.WorkflowRunLog { return l.log } @@ -84,3 +84,7 @@ func GetProcessor(node *domain.WorkflowNode) (NodeProcessor, error) { func getContextWorkflowId(ctx context.Context) string { return ctx.Value("workflow_id").(string) } + +func getContextWorkflowRunId(ctx context.Context) string { + return ctx.Value("workflow_run_id").(string) +} diff --git a/internal/workflow/node-processor/upload_node.go b/internal/workflow/node-processor/upload_node.go index 7b1908d9..ed9ecd21 100644 --- a/internal/workflow/node-processor/upload_node.go +++ b/internal/workflow/node-processor/upload_node.go @@ -3,6 +3,7 @@ package nodeprocessor import ( "context" "errors" + "strings" "time" "github.com/usual2970/certimate/internal/domain" @@ -11,15 +12,19 @@ import ( ) type uploadNode struct { - node *domain.WorkflowNode - outputRepo workflowOutputRepository + node *domain.WorkflowNode *nodeLogger + + certRepo certificateRepository + outputRepo workflowOutputRepository } func NewUploadNode(node *domain.WorkflowNode) *uploadNode { return &uploadNode{ node: node, nodeLogger: NewNodeLogger(node), + + certRepo: repository.NewCertificateRepository(), outputRepo: repository.NewWorkflowOutputRepository(), } } @@ -38,6 +43,12 @@ func (n *uploadNode) Run(ctx context.Context) error { return err } + // 检测是否可以跳过本次执行 + if skippable, skipReason := n.checkCanSkip(ctx, lastOutput); skippable { + n.AddOutput(ctx, n.node.Name, skipReason) + return nil + } + // 检查证书是否过期 // 如果证书过期,则直接返回错误 certX509, err := certs.ParseCertificateFromPEM(nodeConfig.Certificate) @@ -50,7 +61,7 @@ func (n *uploadNode) Run(ctx context.Context) error { return errors.New("certificate is expired") } - // 生成实体 + // 生成证书实体 certificate := &domain.Certificate{ Source: domain.CertificateSourceTypeUpload, } @@ -60,6 +71,7 @@ func (n *uploadNode) Run(ctx context.Context) error { // TODO: 先保持一个节点始终只有一个输出,后续增加版本控制 currentOutput := &domain.WorkflowOutput{ WorkflowId: getContextWorkflowId(ctx), + RunId: getContextWorkflowRunId(ctx), NodeId: n.node.Id, Node: n.node, Succeeded: true, @@ -76,3 +88,24 @@ func (n *uploadNode) Run(ctx context.Context) error { return nil } + +func (n *uploadNode) checkCanSkip(ctx context.Context, lastOutput *domain.WorkflowOutput) (skip bool, reason string) { + if lastOutput != nil && lastOutput.Succeeded { + // 比较和上次上传时的关键配置(即影响证书上传的)参数是否一致 + currentNodeConfig := n.node.GetConfigForUpload() + lastNodeConfig := lastOutput.Node.GetConfigForUpload() + if strings.TrimSpace(currentNodeConfig.Certificate) != strings.TrimSpace(lastNodeConfig.Certificate) { + return false, "配置项变化:证书" + } + if strings.TrimSpace(currentNodeConfig.PrivateKey) != strings.TrimSpace(lastNodeConfig.PrivateKey) { + return false, "配置项变化:私钥" + } + + lastCertificate, _ := n.certRepo.GetByWorkflowNodeId(ctx, n.node.Id) + if lastCertificate != nil { + return true, "已上传过证书" + } + } + + return false, "" +} diff --git a/internal/workflow/processor/processor.go b/internal/workflow/processor/processor.go index 47663136..0923a8b2 100644 --- a/internal/workflow/processor/processor.go +++ b/internal/workflow/processor/processor.go @@ -8,24 +8,27 @@ import ( ) type workflowProcessor struct { - workflow *domain.Workflow - logs []domain.WorkflowRunLog + workflow *domain.Workflow + workflowRun *domain.WorkflowRun + workflorRunLogs []domain.WorkflowRunLog } -func NewWorkflowProcessor(workflow *domain.Workflow) *workflowProcessor { +func NewWorkflowProcessor(workflow *domain.Workflow, workflowRun *domain.WorkflowRun) *workflowProcessor { return &workflowProcessor{ - workflow: workflow, - logs: make([]domain.WorkflowRunLog, 0), + workflow: workflow, + workflowRun: workflowRun, + workflorRunLogs: make([]domain.WorkflowRunLog, 0), } } func (w *workflowProcessor) Run(ctx context.Context) error { - ctx = setContextWorkflowId(ctx, w.workflow.Id) + ctx = context.WithValue(ctx, "workflow_id", w.workflow.Id) + ctx = context.WithValue(ctx, "workflow_run_id", w.workflowRun.Id) return w.processNode(ctx, w.workflow.Content) } func (w *workflowProcessor) GetRunLogs() []domain.WorkflowRunLog { - return w.logs + return w.workflorRunLogs } func (w *workflowProcessor) processNode(ctx context.Context, node *domain.WorkflowNode) error { @@ -49,9 +52,9 @@ func (w *workflowProcessor) processNode(ctx context.Context, node *domain.Workfl } runErr = processor.Run(ctx) - log := processor.Log(ctx) + log := processor.GetLog(ctx) if log != nil { - w.logs = append(w.logs, *log) + w.workflorRunLogs = append(w.workflorRunLogs, *log) } if runErr != nil { break @@ -75,10 +78,6 @@ func (w *workflowProcessor) processNode(ctx context.Context, node *domain.Workfl return nil } -func setContextWorkflowId(ctx context.Context, id string) context.Context { - return context.WithValue(ctx, "workflow_id", id) -} - func getBranchByType(branches []domain.WorkflowNode, nodeType domain.WorkflowNodeType) *domain.WorkflowNode { for _, branch := range branches { if branch.Type == nodeType { diff --git a/internal/workflow/service.go b/internal/workflow/service.go index 0a5f2f96..c3b4a3e0 100644 --- a/internal/workflow/service.go +++ b/internal/workflow/service.go @@ -141,7 +141,7 @@ func (s *WorkflowService) runWithData(ctx context.Context, runData *workflowRunD run = resp } - processor := processor.NewWorkflowProcessor(workflow) + processor := processor.NewWorkflowProcessor(workflow, run) if runErr := processor.Run(ctx); runErr != nil { run.Status = domain.WorkflowRunStatusTypeFailed run.EndedAt = time.Now() diff --git a/migrations/1738839725_updated_certificate.go b/migrations/1738839725_updated_certificate.go new file mode 100644 index 00000000..447d5297 --- /dev/null +++ b/migrations/1738839725_updated_certificate.go @@ -0,0 +1,67 @@ +package migrations + +import ( + "encoding/json" + + "github.com/pocketbase/pocketbase/core" + m "github.com/pocketbase/pocketbase/migrations" +) + +func init() { + m.Register(func(app core.App) error { + collection, err := app.FindCollectionByNameOrId("4szxr9x43tpj6np") + if err != nil { + return err + } + + // update collection data + if err := json.Unmarshal([]byte(`{ + "indexes": [ + "CREATE INDEX ` + "`" + `idx_Jx8TXzDCmw` + "`" + ` ON ` + "`" + `certificate` + "`" + ` (` + "`" + `workflowId` + "`" + `)", + "CREATE INDEX ` + "`" + `idx_kcKpgAZapk` + "`" + ` ON ` + "`" + `certificate` + "`" + ` (` + "`" + `workflowNodeId` + "`" + `)", + "CREATE INDEX ` + "`" + `idx_2cRXqNDyyp` + "`" + ` ON ` + "`" + `certificate` + "`" + ` (` + "`" + `workflowRunId` + "`" + `)" + ] + }`), &collection); err != nil { + return err + } + + // add field + if err := collection.Fields.AddMarshaledJSONAt(15, []byte(`{ + "cascadeDelete": false, + "collectionId": "qjp8lygssgwyqyz", + "hidden": false, + "id": "relation3917999135", + "maxSelect": 1, + "minSelect": 0, + "name": "workflowRunId", + "presentable": false, + "required": false, + "system": false, + "type": "relation" + }`)); err != nil { + return err + } + + return app.Save(collection) + }, func(app core.App) error { + collection, err := app.FindCollectionByNameOrId("4szxr9x43tpj6np") + if err != nil { + return err + } + + // update collection data + if err := json.Unmarshal([]byte(`{ + "indexes": [ + "CREATE INDEX ` + "`" + `idx_Jx8TXzDCmw` + "`" + ` ON ` + "`" + `certificate` + "`" + ` (` + "`" + `workflowId` + "`" + `)", + "CREATE INDEX ` + "`" + `idx_kcKpgAZapk` + "`" + ` ON ` + "`" + `certificate` + "`" + ` (` + "`" + `workflowNodeId` + "`" + `)" + ] + }`), &collection); err != nil { + return err + } + + // remove field + collection.Fields.RemoveById("relation3917999135") + + return app.Save(collection) + }) +} diff --git a/migrations/1738840633_updated_workflow_output.go b/migrations/1738840633_updated_workflow_output.go new file mode 100644 index 00000000..6e836a76 --- /dev/null +++ b/migrations/1738840633_updated_workflow_output.go @@ -0,0 +1,63 @@ +package migrations + +import ( + "encoding/json" + + "github.com/pocketbase/pocketbase/core" + m "github.com/pocketbase/pocketbase/migrations" +) + +func init() { + m.Register(func(app core.App) error { + collection, err := app.FindCollectionByNameOrId("bqnxb95f2cooowp") + if err != nil { + return err + } + + // update collection data + if err := json.Unmarshal([]byte(`{ + "indexes": [ + "CREATE INDEX ` + "`" + `idx_BYoQPsz4my` + "`" + ` ON ` + "`" + `workflow_output` + "`" + ` (` + "`" + `workflowId` + "`" + `)", + "CREATE INDEX ` + "`" + `idx_O9zxLETuxJ` + "`" + ` ON ` + "`" + `workflow_output` + "`" + ` (` + "`" + `runId` + "`" + `)" + ] + }`), &collection); err != nil { + return err + } + + // add field + if err := collection.Fields.AddMarshaledJSONAt(2, []byte(`{ + "cascadeDelete": false, + "collectionId": "qjp8lygssgwyqyz", + "hidden": false, + "id": "relation821863227", + "maxSelect": 1, + "minSelect": 0, + "name": "runId", + "presentable": false, + "required": false, + "system": false, + "type": "relation" + }`)); err != nil { + return err + } + + return app.Save(collection) + }, func(app core.App) error { + collection, err := app.FindCollectionByNameOrId("bqnxb95f2cooowp") + if err != nil { + return err + } + + // update collection data + if err := json.Unmarshal([]byte(`{ + "indexes": [] + }`), &collection); err != nil { + return err + } + + // remove field + collection.Fields.RemoveById("relation821863227") + + return app.Save(collection) + }) +}