Implement complete workflow execution process

This commit is contained in:
yoan
2024-11-19 16:02:31 +08:00
parent a9d5b53460
commit 03b2a9da66
13 changed files with 302 additions and 27 deletions

View File

@@ -7,7 +7,6 @@ import (
"github.com/usual2970/certimate/internal/domain"
"github.com/usual2970/certimate/internal/pkg/utils/x509"
"github.com/usual2970/certimate/internal/repository"
"github.com/usual2970/certimate/internal/utils/xtime"
)
type applyNode struct {
@@ -28,39 +27,42 @@ type WorkflowOutputRepository interface {
// 查询节点输出
Get(ctx context.Context, nodeId string) (*domain.WorkflowOutput, error)
// 查询申请节点的证书
GetCertificate(ctx context.Context, nodeId string) (*domain.Certificate, error)
// 保存节点输出
Save(ctx context.Context, output *domain.WorkflowOutput, certificate *domain.Certificate, cb func(id string) error) error
}
// 申请节点根据申请类型执行不同的操作
func (a *applyNode) Run(ctx context.Context) error {
a.AddOutput(ctx, xtime.BeijingTimeStr(), a.node.Name, "开始执行")
a.AddOutput(ctx, a.node.Name, "开始执行")
// 查询是否申请过,已申请过则直接返回(先保持和 v0.2 一致)
output, err := a.outputRepo.Get(ctx, a.node.Id)
if err != nil && !domain.IsRecordNotFound(err) {
a.AddOutput(ctx, xtime.BeijingTimeStr(), a.node.Name, "查询申请记录失败", err.Error())
a.AddOutput(ctx, a.node.Name, "查询申请记录失败", err.Error())
return err
}
if output != nil && output.Succeed {
a.AddOutput(ctx, xtime.BeijingTimeStr(), a.node.Name, "已申请过")
a.AddOutput(ctx, a.node.Name, "已申请过")
return nil
}
// 获取Applicant
apply, err := applicant.GetWithApplyNode(a.node)
if err != nil {
a.AddOutput(ctx, xtime.BeijingTimeStr(), a.node.Name, "获取申请对象失败", err.Error())
a.AddOutput(ctx, a.node.Name, "获取申请对象失败", err.Error())
return err
}
// 申请
certificate, err := apply.Apply()
if err != nil {
a.AddOutput(ctx, xtime.BeijingTimeStr(), a.node.Name, "申请失败", err.Error())
a.AddOutput(ctx, a.node.Name, "申请失败", err.Error())
return err
}
a.AddOutput(ctx, xtime.BeijingTimeStr(), a.node.Name, "申请成功")
a.AddOutput(ctx, a.node.Name, "申请成功")
// 记录申请结果
output = &domain.WorkflowOutput{
@@ -68,11 +70,12 @@ func (a *applyNode) Run(ctx context.Context) error {
NodeId: a.node.Id,
Node: a.node,
Succeed: true,
Output: a.node.Output,
}
cert, err := x509.ParseCertificateFromPEM(certificate.Certificate)
if err != nil {
a.AddOutput(ctx, xtime.BeijingTimeStr(), a.node.Name, "解析证书失败", err.Error())
a.AddOutput(ctx, a.node.Name, "解析证书失败", err.Error())
return err
}
@@ -84,20 +87,22 @@ func (a *applyNode) Run(ctx context.Context) error {
CertUrl: certificate.CertUrl,
CertStableUrl: certificate.CertStableUrl,
ExpireAt: cert.NotAfter,
Workflow: GetWorkflowId(ctx),
NodeId: a.node.Id,
}
if err := a.outputRepo.Save(ctx, output, certificateRecord, func(id string) error {
if certificateRecord != nil {
certificateRecord.Id = id
certificateRecord.Output = id
}
return nil
}); err != nil {
a.AddOutput(ctx, xtime.BeijingTimeStr(), a.node.Name, "保存申请记录失败", err.Error())
a.AddOutput(ctx, a.node.Name, "保存申请记录失败", err.Error())
return err
}
a.AddOutput(ctx, xtime.BeijingTimeStr(), a.node.Name, "保存申请记录成功")
a.AddOutput(ctx, a.node.Name, "保存申请记录成功")
return nil
}

View File

@@ -1 +1,104 @@
package nodeprocessor
package nodeprocessor
import (
"context"
"fmt"
"strings"
"github.com/usual2970/certimate/internal/deployer"
"github.com/usual2970/certimate/internal/domain"
"github.com/usual2970/certimate/internal/repository"
)
type deployNode struct {
node *domain.WorkflowNode
outputRepo WorkflowOutputRepository
*Logger
}
func NewDeployNode(node *domain.WorkflowNode) *deployNode {
return &deployNode{
node: node,
Logger: NewLogger(node),
outputRepo: repository.NewWorkflowOutputRepository(),
}
}
func (d *deployNode) Run(ctx context.Context) error {
d.AddOutput(ctx, d.node.Name, "开始执行")
// 检查是否部署过(部署过则直接返回,和 v0.2 暂时保持一致)
output, err := d.outputRepo.Get(ctx, d.node.Id)
if err != nil && !domain.IsRecordNotFound(err) {
d.AddOutput(ctx, d.node.Name, "查询部署记录失败", err.Error())
return err
}
if output != nil && output.Succeed {
d.AddOutput(ctx, d.node.Name, "已部署过")
return nil
}
// 获取部署对象
// 获取证书
certSource := d.node.GetConfigString("certificate")
certSourceSlice := strings.Split(certSource, "#")
if len(certSourceSlice) != 2 {
d.AddOutput(ctx, d.node.Name, "证书来源配置错误", certSource)
return fmt.Errorf("证书来源配置错误: %s", certSource)
}
cert, err := d.outputRepo.GetCertificate(ctx, certSourceSlice[0])
if err != nil {
d.AddOutput(ctx, d.node.Name, "获取证书失败", err.Error())
return err
}
accessRepo := repository.NewAccessRepository()
access, err := accessRepo.GetById(context.Background(), d.node.GetConfigString("access"))
if err != nil {
d.AddOutput(ctx, d.node.Name, "获取授权配置失败", err.Error())
return err
}
option := &deployer.DeployerOption{
DomainId: d.node.Id,
Domain: cert.SAN,
Access: access.Config,
AccessRecord: access,
DeployConfig: domain.DeployConfig{
Id: d.node.Id,
Access: access.Id,
Type: d.node.GetConfigString("providerType"),
Config: d.node.Config,
},
}
deploy, err := deployer.GetWithTypeAndOption(d.node.GetConfigString("providerType"), option)
if err != nil {
d.AddOutput(ctx, d.node.Name, "获取部署对象失败", err.Error())
return err
}
// 部署
if err := deploy.Deploy(ctx); err != nil {
d.AddOutput(ctx, d.node.Name, "部署失败", err.Error())
return err
}
d.AddOutput(ctx, d.node.Name, "部署成功")
// 记录部署结果
output = &domain.WorkflowOutput{
Workflow: GetWorkflowId(ctx),
NodeId: d.node.Id,
Node: d.node,
Succeed: true,
}
if err := d.outputRepo.Save(ctx, output, nil, nil); err != nil {
d.AddOutput(ctx, d.node.Name, "保存部署记录失败", err.Error())
return err
}
d.AddOutput(ctx, d.node.Name, "保存部署记录成功")
return nil
}

View File

@@ -1 +1,55 @@
package nodeprocessor
package nodeprocessor
import (
"context"
"github.com/usual2970/certimate/internal/domain"
"github.com/usual2970/certimate/internal/notify"
"github.com/usual2970/certimate/internal/repository"
)
type SettingRepository interface {
GetByName(ctx context.Context, name string) (*domain.Setting, error)
}
type notifyNode struct {
node *domain.WorkflowNode
settingRepo SettingRepository
*Logger
}
func NewNotifyNode(node *domain.WorkflowNode) *notifyNode {
return &notifyNode{
node: node,
Logger: NewLogger(node),
settingRepo: repository.NewSettingRepository(),
}
}
func (n *notifyNode) Run(ctx context.Context) error {
n.AddOutput(ctx, n.node.Name, "开始执行")
// 获取通知配置
setting, err := n.settingRepo.GetByName(ctx, "notifyChannels")
if err != nil {
n.AddOutput(ctx, n.node.Name, "获取通知配置失败", err.Error())
return err
}
channelConfig, err := setting.GetChannelContent(n.node.GetConfigString("channel"))
if err != nil {
n.AddOutput(ctx, n.node.Name, "获取通知渠道配置失败", err.Error())
return err
}
if err := notify.SendToChannel(n.node.GetConfigString("title"),
n.node.GetConfigString("content"),
n.node.GetConfigString("channel"),
channelConfig,
); err != nil {
n.AddOutput(ctx, n.node.Name, "发送通知失败", err.Error())
return err
}
n.AddOutput(ctx, n.node.Name, "发送通知成功")
return nil
}

View File

@@ -5,6 +5,7 @@ import (
"errors"
"github.com/usual2970/certimate/internal/domain"
"github.com/usual2970/certimate/internal/utils/xtime"
)
type RunLog struct {
@@ -23,7 +24,7 @@ type RunLogOutput struct {
type NodeProcessor interface {
Run(ctx context.Context) error
Log(ctx context.Context) *RunLog
AddOutput(ctx context.Context, time, title, content string, err ...string)
AddOutput(ctx context.Context, title, content string, err ...string)
}
type Logger struct {
@@ -43,9 +44,9 @@ func (l *Logger) Log(ctx context.Context) *RunLog {
return l.log
}
func (l *Logger) AddOutput(ctx context.Context, time, title, content string, err ...string) {
func (l *Logger) AddOutput(ctx context.Context, title, content string, err ...string) {
output := RunLogOutput{
Time: time,
Time: xtime.BeijingTimeStr(),
Title: title,
Content: content,
}
@@ -64,6 +65,10 @@ func GetProcessor(node *domain.WorkflowNode) (NodeProcessor, error) {
return NewConditionNode(node), nil
case domain.WorkflowNodeTypeApply:
return NewApplyNode(node), nil
case domain.WorkflowNodeTypeDeploy:
return NewDeployNode(node), nil
case domain.WorkflowNodeTypeNotify:
return NewNotifyNode(node), nil
}
return nil, errors.New("not implemented")
}

View File

@@ -50,6 +50,7 @@ func (w *workflowProcessor) runNode(ctx context.Context, node *domain.WorkflowNo
return err
}
}
current = current.Next
}
return nil