feat: rename san to subjectAltNames, workflow to workflowId, nodeId to workflowNodeId, output to workflowOutputId, log to logs, succeed to succeeded

This commit is contained in:
Fu Diwei
2025-01-04 16:29:14 +08:00
parent 9246878d0e
commit ae11d5ee3d
26 changed files with 823 additions and 175 deletions

View File

@@ -2,42 +2,25 @@ package domain
import "time"
var ValidityDuration = time.Hour * 24 * 10
type Certificate struct {
Meta
SubjectAltNames string `json:"san" db:"san"`
Source string `json:"source" db:"source"`
SubjectAltNames string `json:"subjectAltNames" db:"subjectAltNames"`
Certificate string `json:"certificate" db:"certificate"`
PrivateKey string `json:"privateKey" db:"privateKey"`
IssuerCertificate string `json:"issuerCertificate" db:"issuerCertificate"`
CertUrl string `json:"certUrl" db:"certUrl"`
CertStableUrl string `json:"certStableUrl" db:"certStableUrl"`
WorkflowId string `json:"workflow" db:"workflow"`
WorkflowNodeId string `json:"nodeId" db:"nodeId"`
WorkflowOutputId string `json:"output" db:"output"`
EffectAt time.Time `json:"effectAt" db:"effectAt"`
ExpireAt time.Time `json:"expireAt" db:"expireAt"`
AcmeCertUrl string `json:"acmeCertUrl" db:"acmeCertUrl"`
AcmeCertStableUrl string `json:"acmeCertStableUrl" db:"acmeCertStableUrl"`
WorkflowId string `json:"workflowId" db:"workflowId"`
WorkflowNodeId string `json:"workflowNodeId" db:"workflowNodeId"`
WorkflowOutputId string `json:"workflowOutputId" db:"workflowOutputId"`
}
type CertificateMeta struct {
Version string `json:"version"`
SerialNumber string `json:"serialNumber"`
Validity CertificateValidity `json:"validity"`
SignatureAlgorithm string `json:"signatureAlgorithm"`
Issuer CertificateIssuer `json:"issuer"`
Subject CertificateSubject `json:"subject"`
}
type CertificateSourceType string
type CertificateIssuer struct {
Country string `json:"country"`
Organization string `json:"organization"`
CommonName string `json:"commonName"`
}
type CertificateSubject struct {
CN string `json:"CN"`
}
type CertificateValidity struct {
NotBefore string `json:"notBefore"`
NotAfter string `json:"notAfter"`
}
const (
CERTIFICATE_SOURCE_WORKFLOW = CertificateSourceType("workflow")
CERTIFICATE_SOURCE_UPLOAD = CertificateSourceType("upload")
)

View File

@@ -4,9 +4,9 @@ const WorkflowOutputCertificate = "certificate"
type WorkflowOutput struct {
Meta
Workflow string `json:"workflow"`
NodeId string `json:"nodeId"`
Node *WorkflowNode `json:"node"`
Output []WorkflowNodeIO `json:"output"`
Succeed bool `json:"succeed"`
WorkflowId string `json:"workflowId" db:"workflowId"`
NodeId string `json:"nodeId" db:"nodeId"`
Node *WorkflowNode `json:"node" db:"node"`
Outputs []WorkflowNodeIO `json:"outputs" db:"outputs"`
Succeeded bool `json:"succeeded"db:"succeeded"`
}

View File

@@ -1,5 +1,13 @@
package domain
type WorkflowRunLog struct {
Meta
WorkflowId string `json:"workflowId" db:"workflowId"`
Logs []RunLog `json:"logs" db:"logs"`
Succeeded bool `json:"succeeded" db:"succeeded"`
Error string `json:"error" db:"error"`
}
type RunLogOutput struct {
Time string `json:"time"`
Title string `json:"title"`
@@ -8,19 +16,12 @@ type RunLogOutput struct {
}
type RunLog struct {
NodeId string `json:"nodeId"`
NodeName string `json:"nodeName"`
Error string `json:"error"`
Outputs []RunLogOutput `json:"outputs"`
}
type WorkflowRunLog struct {
Meta
Workflow string `json:"workflow"`
Log []RunLog `json:"log"`
Succeed bool `json:"succeed"`
Error string `json:"error"`
}
type RunLogs []RunLog
func (r RunLogs) Error() string {

View File

@@ -44,9 +44,9 @@ func (w *WorkflowRepository) SaveRunLog(ctx context.Context, log *domain.Workflo
}
record := models.NewRecord(collection)
record.Set("workflow", log.Workflow)
record.Set("log", log.Log)
record.Set("succeed", log.Succeed)
record.Set("workflowId", log.WorkflowId)
record.Set("logs", log.Logs)
record.Set("succeeded", log.Succeeded)
record.Set("error", log.Error)
return app.GetApp().Dao().SaveRecord(record)

View File

@@ -35,8 +35,8 @@ func (w *WorkflowOutputRepository) Get(ctx context.Context, nodeId string) (*dom
return nil, errors.New("failed to unmarshal node")
}
output := make([]domain.WorkflowNodeIO, 0)
if err := record.UnmarshalJSONField("output", &output); err != nil {
outputs := make([]domain.WorkflowNodeIO, 0)
if err := record.UnmarshalJSONField("outputs", &outputs); err != nil {
return nil, errors.New("failed to unmarshal output")
}
@@ -46,18 +46,18 @@ func (w *WorkflowOutputRepository) Get(ctx context.Context, nodeId string) (*dom
CreatedAt: record.GetCreated().Time(),
UpdatedAt: record.GetUpdated().Time(),
},
Workflow: record.GetString("workflow"),
NodeId: record.GetString("nodeId"),
Node: node,
Output: output,
Succeed: record.GetBool("succeed"),
WorkflowId: record.GetString("workflowId"),
NodeId: record.GetString("nodeId"),
Node: node,
Outputs: outputs,
Succeeded: record.GetBool("succeeded"),
}
return rs, nil
}
func (w *WorkflowOutputRepository) GetCertificate(ctx context.Context, nodeId string) (*domain.Certificate, error) {
records, err := app.GetApp().Dao().FindRecordsByFilter("certificate", "nodeId={:nodeId}", "-created", 1, 0, dbx.Params{"nodeId": nodeId})
records, err := app.GetApp().Dao().FindRecordsByFilter("certificate", "workflowNodeId={:workflowNodeId}", "-created", 1, 0, dbx.Params{"workflowNodeId": nodeId})
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, domain.ErrRecordNotFound
@@ -76,16 +76,18 @@ func (w *WorkflowOutputRepository) GetCertificate(ctx context.Context, nodeId st
CreatedAt: record.GetDateTime("created").Time(),
UpdatedAt: record.GetDateTime("updated").Time(),
},
Source: record.GetString("source"),
SubjectAltNames: record.GetString("subjectAltNames"),
Certificate: record.GetString("certificate"),
PrivateKey: record.GetString("privateKey"),
IssuerCertificate: record.GetString("issuerCertificate"),
SubjectAltNames: record.GetString("san"),
WorkflowOutputId: record.GetString("output"),
EffectAt: record.GetDateTime("effectAt").Time(),
ExpireAt: record.GetDateTime("expireAt").Time(),
CertUrl: record.GetString("certUrl"),
CertStableUrl: record.GetString("certStableUrl"),
WorkflowId: record.GetString("workflow"),
WorkflowNodeId: record.GetString("nodeId"),
AcmeCertUrl: record.GetString("acmeCertUrl"),
AcmeCertStableUrl: record.GetString("acmeCertStableUrl"),
WorkflowId: record.GetString("workflowId"),
WorkflowNodeId: record.GetString("workflowNodeId"),
WorkflowOutputId: record.GetString("workflowOutputId"),
}
return rs, nil
}
@@ -107,11 +109,11 @@ func (w *WorkflowOutputRepository) Save(ctx context.Context, output *domain.Work
return err
}
}
record.Set("workflow", output.Workflow)
record.Set("workflowId", output.WorkflowId)
record.Set("nodeId", output.NodeId)
record.Set("node", output.Node)
record.Set("output", output.Output)
record.Set("succeed", output.Succeed)
record.Set("outputs", output.Outputs)
record.Set("succeeded", output.Succeeded)
if err := app.GetApp().Dao().SaveRecord(record); err != nil {
return err
@@ -128,30 +130,32 @@ func (w *WorkflowOutputRepository) Save(ctx context.Context, output *domain.Work
}
certRecord := models.NewRecord(certCollection)
certRecord.Set("source", certificate.Source)
certRecord.Set("subjectAltNames", certificate.SubjectAltNames)
certRecord.Set("certificate", certificate.Certificate)
certRecord.Set("privateKey", certificate.PrivateKey)
certRecord.Set("issuerCertificate", certificate.IssuerCertificate)
certRecord.Set("san", certificate.SubjectAltNames)
certRecord.Set("output", certificate.WorkflowOutputId)
certRecord.Set("effectAt", certificate.EffectAt)
certRecord.Set("expireAt", certificate.ExpireAt)
certRecord.Set("certUrl", certificate.CertUrl)
certRecord.Set("certStableUrl", certificate.CertStableUrl)
certRecord.Set("workflow", certificate.WorkflowId)
certRecord.Set("nodeId", certificate.WorkflowNodeId)
certRecord.Set("acmeCertUrl", certificate.AcmeCertUrl)
certRecord.Set("acmeCertStableUrl", certificate.AcmeCertStableUrl)
certRecord.Set("workflowId", certificate.WorkflowId)
certRecord.Set("workflowNodeId", certificate.WorkflowNodeId)
certRecord.Set("workflowOutputId", certificate.WorkflowOutputId)
if err := app.GetApp().Dao().SaveRecord(certRecord); err != nil {
return err
}
// 更新 certificate
for i, item := range output.Output {
for i, item := range output.Outputs {
if item.Name == "certificate" {
output.Output[i].Value = certRecord.GetId()
output.Outputs[i].Value = certRecord.GetId()
break
}
}
record.Set("output", output.Output)
record.Set("outputs", output.Outputs)
if err := app.GetApp().Dao().SaveRecord(record); err != nil {
return err

View File

@@ -64,7 +64,7 @@ func update(ctx context.Context, record *models.Record) error {
app.GetApp().Logger().Error("add cron job failed", "err", err)
return fmt.Errorf("add cron job failed: %w", err)
}
app.GetApp().Logger().Error("add cron job failed", "san", record.GetString("san"))
app.GetApp().Logger().Error("add cron job failed", "subjectAltNames", record.GetString("subjectAltNames"))
scheduler.Start()
return nil

View File

@@ -17,6 +17,8 @@ type applyNode struct {
*Logger
}
var validityDuration = time.Hour * 24 * 10
func NewApplyNode(node *domain.WorkflowNode) *applyNode {
return &applyNode{
node: node,
@@ -46,14 +48,14 @@ func (a *applyNode) Run(ctx context.Context) error {
return err
}
if output != nil && output.Succeed {
if output != nil && output.Succeeded {
cert, err := a.outputRepo.GetCertificate(ctx, a.node.Id)
if err != nil {
a.AddOutput(ctx, a.node.Name, "获取证书失败", err.Error())
return err
}
if time.Until(cert.ExpireAt) > domain.ValidityDuration {
if time.Until(cert.ExpireAt) > validityDuration {
a.AddOutput(ctx, a.node.Name, "已申请过证书,且证书在有效期内")
return nil
}
@@ -81,28 +83,30 @@ func (a *applyNode) Run(ctx context.Context) error {
outputId = output.Id
}
output = &domain.WorkflowOutput{
Workflow: GetWorkflowId(ctx),
NodeId: a.node.Id,
Node: a.node,
Succeed: true,
Output: a.node.Output,
Meta: domain.Meta{Id: outputId},
Meta: domain.Meta{Id: outputId},
WorkflowId: GetWorkflowId(ctx),
NodeId: a.node.Id,
Node: a.node,
Succeeded: true,
Outputs: a.node.Output,
}
cert, err := x509.ParseCertificateFromPEM(certificate.Certificate)
certX509, err := x509.ParseCertificateFromPEM(certificate.Certificate)
if err != nil {
a.AddOutput(ctx, a.node.Name, "解析证书失败", err.Error())
return err
}
certificateRecord := &domain.Certificate{
SubjectAltNames: strings.Join(cert.DNSNames, ";"),
Source: string(domain.CERTIFICATE_SOURCE_WORKFLOW),
SubjectAltNames: strings.Join(certX509.DNSNames, ";"),
Certificate: certificate.Certificate,
PrivateKey: certificate.PrivateKey,
IssuerCertificate: certificate.IssuerCertificate,
CertUrl: certificate.CertUrl,
CertStableUrl: certificate.CertStableUrl,
ExpireAt: cert.NotAfter,
AcmeCertUrl: certificate.CertUrl,
AcmeCertStableUrl: certificate.CertStableUrl,
EffectAt: certX509.NotBefore,
ExpireAt: certX509.NotAfter,
WorkflowId: GetWorkflowId(ctx),
WorkflowNodeId: a.node.Id,
}

View File

@@ -71,8 +71,8 @@ func (d *deployNode) Run(ctx context.Context) error {
AccessConfig: access.Config,
AccessRecord: access,
Certificate: applicant.Certificate{
CertUrl: cert.CertUrl,
CertStableUrl: cert.CertStableUrl,
CertUrl: cert.AcmeCertUrl,
CertStableUrl: cert.AcmeCertStableUrl,
PrivateKey: cert.PrivateKey,
Certificate: cert.Certificate,
IssuerCertificate: cert.IssuerCertificate,
@@ -105,11 +105,11 @@ func (d *deployNode) Run(ctx context.Context) error {
outputId = output.Id
}
output = &domain.WorkflowOutput{
Workflow: GetWorkflowId(ctx),
NodeId: d.node.Id,
Node: d.node,
Succeed: true,
Meta: domain.Meta{Id: outputId},
Meta: domain.Meta{Id: outputId},
WorkflowId: GetWorkflowId(ctx),
NodeId: d.node.Id,
Node: d.node,
Succeeded: true,
}
if err := d.outputRepo.Save(ctx, output, nil, nil); err != nil {
@@ -123,5 +123,5 @@ func (d *deployNode) Run(ctx context.Context) error {
}
func (d *deployNode) deployed(output *domain.WorkflowOutput) bool {
return output != nil && output.Succeed
return output != nil && output.Succeeded
}

View File

@@ -21,6 +21,7 @@ type Logger struct {
func NewLogger(node *domain.WorkflowNode) *Logger {
return &Logger{
log: &domain.RunLog{
NodeId: node.Id,
NodeName: node.Name,
Outputs: make([]domain.RunLogOutput, 0),
},

View File

@@ -69,10 +69,10 @@ func (s *WorkflowService) Run(ctx context.Context, req *domain.WorkflowRunReq) e
processor := nodeprocessor.NewWorkflowProcessor(workflow)
if err := processor.Run(ctx); err != nil {
log := &domain.WorkflowRunLog{
Workflow: workflow.Id,
Log: processor.Log(ctx),
Succeed: false,
Error: err.Error(),
WorkflowId: workflow.Id,
Logs: processor.Log(ctx),
Succeeded: false,
Error: err.Error(),
}
if err := s.repo.SaveRunLog(ctx, log); err != nil {
app.GetApp().Logger().Error("failed to save run log", "err", err)
@@ -89,10 +89,10 @@ func (s *WorkflowService) Run(ctx context.Context, req *domain.WorkflowRunReq) e
succeed = false
}
log := &domain.WorkflowRunLog{
Workflow: workflow.Id,
Log: processor.Log(ctx),
Error: runErr,
Succeed: succeed,
WorkflowId: workflow.Id,
Logs: processor.Log(ctx),
Error: runErr,
Succeeded: succeed,
}
if err := s.repo.SaveRunLog(ctx, log); err != nil {
app.GetApp().Logger().Error("failed to save run log", "err", err)