mirror of
https://github.com/usual2970/certimate.git
synced 2025-07-30 06:04:29 +00:00
Add workflow execution process
This commit is contained in:
103
internal/workflow/node-processor/apply_node.go
Normal file
103
internal/workflow/node-processor/apply_node.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package nodeprocessor
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/usual2970/certimate/internal/applicant"
|
||||
"github.com/usual2970/certimate/internal/domain"
|
||||
"github.com/usual2970/certimate/internal/pkg/utils/x509"
|
||||
"github.com/usual2970/certimate/internal/repository"
|
||||
"github.com/usual2970/certimate/internal/utils/xtime"
|
||||
)
|
||||
|
||||
type applyNode struct {
|
||||
node *domain.WorkflowNode
|
||||
outputRepo WorkflowOutputRepository
|
||||
*Logger
|
||||
}
|
||||
|
||||
func NewApplyNode(node *domain.WorkflowNode) *applyNode {
|
||||
return &applyNode{
|
||||
node: node,
|
||||
Logger: NewLogger(node),
|
||||
outputRepo: repository.NewWorkflowOutputRepository(),
|
||||
}
|
||||
}
|
||||
|
||||
type WorkflowOutputRepository interface {
|
||||
// 查询节点输出
|
||||
Get(ctx context.Context, nodeId string) (*domain.WorkflowOutput, error)
|
||||
|
||||
// 保存节点输出
|
||||
Save(ctx context.Context, output *domain.WorkflowOutput, certificate *domain.Certificate, cb func(id string) error) error
|
||||
}
|
||||
|
||||
// 申请节点根据申请类型执行不同的操作
|
||||
func (a *applyNode) Run(ctx context.Context) error {
|
||||
a.AddOutput(ctx, xtime.BeijingTimeStr(), a.node.Name, "开始执行")
|
||||
// 查询是否申请过,已申请过则直接返回(先保持和 v0.2 一致)
|
||||
output, err := a.outputRepo.Get(ctx, a.node.Id)
|
||||
if err != nil && !domain.IsRecordNotFound(err) {
|
||||
a.AddOutput(ctx, xtime.BeijingTimeStr(), a.node.Name, "查询申请记录失败", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
if output != nil && output.Succeed {
|
||||
a.AddOutput(ctx, xtime.BeijingTimeStr(), a.node.Name, "已申请过")
|
||||
return nil
|
||||
}
|
||||
|
||||
// 获取Applicant
|
||||
apply, err := applicant.GetWithApplyNode(a.node)
|
||||
if err != nil {
|
||||
a.AddOutput(ctx, xtime.BeijingTimeStr(), a.node.Name, "获取申请对象失败", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
// 申请
|
||||
certificate, err := apply.Apply()
|
||||
if err != nil {
|
||||
a.AddOutput(ctx, xtime.BeijingTimeStr(), a.node.Name, "申请失败", err.Error())
|
||||
return err
|
||||
}
|
||||
a.AddOutput(ctx, xtime.BeijingTimeStr(), a.node.Name, "申请成功")
|
||||
|
||||
// 记录申请结果
|
||||
output = &domain.WorkflowOutput{
|
||||
Workflow: GetWorkflowId(ctx),
|
||||
NodeId: a.node.Id,
|
||||
Node: a.node,
|
||||
Succeed: true,
|
||||
}
|
||||
|
||||
cert, err := x509.ParseCertificateFromPEM(certificate.Certificate)
|
||||
if err != nil {
|
||||
a.AddOutput(ctx, xtime.BeijingTimeStr(), a.node.Name, "解析证书失败", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
certificateRecord := &domain.Certificate{
|
||||
SAN: cert.Subject.CommonName,
|
||||
Certificate: certificate.Certificate,
|
||||
PrivateKey: certificate.PrivateKey,
|
||||
IssuerCertificate: certificate.IssuerCertificate,
|
||||
CertUrl: certificate.CertUrl,
|
||||
CertStableUrl: certificate.CertStableUrl,
|
||||
ExpireAt: cert.NotAfter,
|
||||
}
|
||||
|
||||
if err := a.outputRepo.Save(ctx, output, certificateRecord, func(id string) error {
|
||||
if certificateRecord != nil {
|
||||
certificateRecord.Id = id
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
a.AddOutput(ctx, xtime.BeijingTimeStr(), a.node.Name, "保存申请记录失败", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
a.AddOutput(ctx, xtime.BeijingTimeStr(), a.node.Name, "保存申请记录成功")
|
||||
|
||||
return nil
|
||||
}
|
29
internal/workflow/node-processor/condition_node.go
Normal file
29
internal/workflow/node-processor/condition_node.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package nodeprocessor
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/usual2970/certimate/internal/domain"
|
||||
"github.com/usual2970/certimate/internal/utils/xtime"
|
||||
)
|
||||
|
||||
type conditionNode struct {
|
||||
node *domain.WorkflowNode
|
||||
*Logger
|
||||
}
|
||||
|
||||
func NewConditionNode(node *domain.WorkflowNode) *conditionNode {
|
||||
return &conditionNode{
|
||||
node: node,
|
||||
Logger: NewLogger(node),
|
||||
}
|
||||
}
|
||||
|
||||
// 条件节点没有任何操作
|
||||
func (c *conditionNode) Run(ctx context.Context) error {
|
||||
c.AddOutput(ctx, xtime.BeijingTimeStr(),
|
||||
c.node.Name,
|
||||
"完成",
|
||||
)
|
||||
return nil
|
||||
}
|
1
internal/workflow/node-processor/deploy_node.go
Normal file
1
internal/workflow/node-processor/deploy_node.go
Normal file
@@ -0,0 +1 @@
|
||||
package nodeprocessor
|
1
internal/workflow/node-processor/notify_node.go
Normal file
1
internal/workflow/node-processor/notify_node.go
Normal file
@@ -0,0 +1 @@
|
||||
package nodeprocessor
|
69
internal/workflow/node-processor/processor.go
Normal file
69
internal/workflow/node-processor/processor.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package nodeprocessor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/usual2970/certimate/internal/domain"
|
||||
)
|
||||
|
||||
type RunLog struct {
|
||||
NodeName string `json:"node_name"`
|
||||
Err string `json:"err"`
|
||||
Outputs []RunLogOutput `json:"outputs"`
|
||||
}
|
||||
|
||||
type RunLogOutput struct {
|
||||
Time string `json:"time"`
|
||||
Title string `json:"title"`
|
||||
Content string `json:"content"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
type NodeProcessor interface {
|
||||
Run(ctx context.Context) error
|
||||
Log(ctx context.Context) *RunLog
|
||||
AddOutput(ctx context.Context, time, title, content string, err ...string)
|
||||
}
|
||||
|
||||
type Logger struct {
|
||||
log *RunLog
|
||||
}
|
||||
|
||||
func NewLogger(node *domain.WorkflowNode) *Logger {
|
||||
return &Logger{
|
||||
log: &RunLog{
|
||||
NodeName: node.Name,
|
||||
Outputs: make([]RunLogOutput, 0),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Logger) Log(ctx context.Context) *RunLog {
|
||||
return l.log
|
||||
}
|
||||
|
||||
func (l *Logger) AddOutput(ctx context.Context, time, title, content string, err ...string) {
|
||||
output := RunLogOutput{
|
||||
Time: time,
|
||||
Title: title,
|
||||
Content: content,
|
||||
}
|
||||
if len(err) > 0 {
|
||||
output.Error = err[0]
|
||||
l.log.Err = err[0]
|
||||
}
|
||||
l.log.Outputs = append(l.log.Outputs, output)
|
||||
}
|
||||
|
||||
func GetProcessor(node *domain.WorkflowNode) (NodeProcessor, error) {
|
||||
switch node.Type {
|
||||
case domain.WorkflowNodeTypeStart:
|
||||
return NewStartNode(node), nil
|
||||
case domain.WorkflowNodeTypeCondition:
|
||||
return NewConditionNode(node), nil
|
||||
case domain.WorkflowNodeTypeApply:
|
||||
return NewApplyNode(node), nil
|
||||
}
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
29
internal/workflow/node-processor/start_node.go
Normal file
29
internal/workflow/node-processor/start_node.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package nodeprocessor
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/usual2970/certimate/internal/domain"
|
||||
"github.com/usual2970/certimate/internal/utils/xtime"
|
||||
)
|
||||
|
||||
type startNode struct {
|
||||
node *domain.WorkflowNode
|
||||
*Logger
|
||||
}
|
||||
|
||||
func NewStartNode(node *domain.WorkflowNode) *startNode {
|
||||
return &startNode{
|
||||
node: node,
|
||||
Logger: NewLogger(node),
|
||||
}
|
||||
}
|
||||
|
||||
// 开始节点没有任何操作
|
||||
func (s *startNode) Run(ctx context.Context) error {
|
||||
s.AddOutput(ctx, xtime.BeijingTimeStr(),
|
||||
s.node.Name,
|
||||
"完成",
|
||||
)
|
||||
return nil
|
||||
}
|
64
internal/workflow/node-processor/workflow_processor.go
Normal file
64
internal/workflow/node-processor/workflow_processor.go
Normal file
@@ -0,0 +1,64 @@
|
||||
package nodeprocessor
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/usual2970/certimate/internal/domain"
|
||||
)
|
||||
|
||||
type workflowProcessor struct {
|
||||
workflow *domain.Workflow
|
||||
logs []RunLog
|
||||
}
|
||||
|
||||
func NewWorkflowProcessor(workflow *domain.Workflow) *workflowProcessor {
|
||||
return &workflowProcessor{
|
||||
workflow: workflow,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *workflowProcessor) Run(ctx context.Context) error {
|
||||
ctx = WithWorkflowId(ctx, w.workflow.Id)
|
||||
return w.runNode(ctx, w.workflow.Content)
|
||||
}
|
||||
|
||||
func (w *workflowProcessor) runNode(ctx context.Context, node *domain.WorkflowNode) error {
|
||||
current := node
|
||||
for current != nil {
|
||||
if current.Type == domain.WorkflowNodeTypeBranch {
|
||||
for _, branch := range current.Branches {
|
||||
if err := w.runNode(ctx, &branch); err != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if current.Type != domain.WorkflowNodeTypeBranch {
|
||||
processor, err := GetProcessor(current)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = processor.Run(ctx)
|
||||
|
||||
log := processor.Log(ctx)
|
||||
if log != nil {
|
||||
w.logs = append(w.logs, *log)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func WithWorkflowId(ctx context.Context, id string) context.Context {
|
||||
return context.WithValue(ctx, "workflow_id", id)
|
||||
}
|
||||
|
||||
func GetWorkflowId(ctx context.Context) string {
|
||||
return ctx.Value("workflow_id").(string)
|
||||
}
|
52
internal/workflow/service.go
Normal file
52
internal/workflow/service.go
Normal file
@@ -0,0 +1,52 @@
|
||||
package workflow
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/usual2970/certimate/internal/domain"
|
||||
"github.com/usual2970/certimate/internal/utils/app"
|
||||
nodeprocessor "github.com/usual2970/certimate/internal/workflow/node-processor"
|
||||
)
|
||||
|
||||
type WorkflowRepository interface {
|
||||
Get(ctx context.Context, id string) (*domain.Workflow, error)
|
||||
}
|
||||
|
||||
type WorkflowService struct {
|
||||
repo WorkflowRepository
|
||||
}
|
||||
|
||||
func NewWorkflowService(repo WorkflowRepository) *WorkflowService {
|
||||
return &WorkflowService{
|
||||
repo: repo,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *WorkflowService) Run(ctx context.Context, req *domain.WorkflowRunReq) error {
|
||||
// 查询
|
||||
if req.Id == "" {
|
||||
return domain.ErrInvalidParams
|
||||
}
|
||||
|
||||
workflow, err := s.repo.Get(ctx, req.Id)
|
||||
if err != nil {
|
||||
app.GetApp().Logger().Error("failed to get workflow", "id", req.Id, "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// 执行
|
||||
if !workflow.Enabled {
|
||||
app.GetApp().Logger().Error("workflow is disabled", "id", req.Id)
|
||||
return fmt.Errorf("workflow is disabled")
|
||||
}
|
||||
|
||||
processor := nodeprocessor.NewWorkflowProcessor(workflow)
|
||||
if err := processor.Run(ctx); err != nil {
|
||||
return fmt.Errorf("failed to run workflow: %w", err)
|
||||
}
|
||||
|
||||
// 保存执行日志
|
||||
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user