mirror of
https://github.com/usual2970/certimate.git
synced 2025-07-09 12:39:57 +00:00
Compare commits
8 Commits
8519ab4a8f
...
01593cb18d
Author | SHA1 | Date | |
---|---|---|---|
![]() |
01593cb18d | ||
![]() |
4f5c1dc6d7 | ||
![]() |
75c89b3d0b | ||
![]() |
b8513eb0b6 | ||
![]() |
1f6b33f4f6 | ||
![]() |
049707acdc | ||
![]() |
e019bfe136 | ||
![]() |
57f8db010b |
@ -31,17 +31,26 @@ const (
|
||||
type WorkflowRunLog struct {
|
||||
NodeId string `json:"nodeId"`
|
||||
NodeName string `json:"nodeName"`
|
||||
Records []WorkflowRunLogRecord `json:"records"`
|
||||
Error string `json:"error"`
|
||||
Outputs []WorkflowRunLogOutput `json:"outputs"`
|
||||
}
|
||||
|
||||
type WorkflowRunLogOutput struct {
|
||||
Time string `json:"time"`
|
||||
Title string `json:"title"`
|
||||
Content string `json:"content"`
|
||||
Error string `json:"error"`
|
||||
type WorkflowRunLogRecord struct {
|
||||
Time string `json:"time"`
|
||||
Level WorkflowRunLogLevel `json:"level"`
|
||||
Content string `json:"content"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
type WorkflowRunLogLevel string
|
||||
|
||||
const (
|
||||
WorkflowRunLogLevelDebug WorkflowRunLogLevel = "DEBUG"
|
||||
WorkflowRunLogLevelInfo WorkflowRunLogLevel = "INFO"
|
||||
WorkflowRunLogLevelWarn WorkflowRunLogLevel = "WARN"
|
||||
WorkflowRunLogLevelError WorkflowRunLogLevel = "ERROR"
|
||||
)
|
||||
|
||||
type WorkflowRunLogs []WorkflowRunLog
|
||||
|
||||
func (r WorkflowRunLogs) ErrorString() string {
|
||||
|
@ -131,6 +131,11 @@ func (d *K8sSecretDeployer) Deploy(ctx context.Context, certPem string, privkeyP
|
||||
secretPayload.ObjectMeta.Annotations[k] = v
|
||||
}
|
||||
}
|
||||
if secretPayload.Data == nil {
|
||||
secretPayload.Data = make(map[string][]byte)
|
||||
}
|
||||
secretPayload.Data[d.config.SecretDataKeyForCrt] = []byte(certPem)
|
||||
secretPayload.Data[d.config.SecretDataKeyForKey] = []byte(privkeyPem)
|
||||
secretPayload, err = client.CoreV1().Secrets(d.config.Namespace).Update(context.TODO(), secretPayload, k8sMeta.UpdateOptions{})
|
||||
if err != nil {
|
||||
return nil, xerrors.Wrap(err, "failed to update k8s secret")
|
||||
|
@ -24,7 +24,7 @@ func (r *WorkflowRepository) ListEnabledAuto(ctx context.Context) ([]*domain.Wor
|
||||
"enabled={:enabled} && trigger={:trigger}",
|
||||
"-created",
|
||||
0, 0,
|
||||
dbx.Params{"enabled": true, "trigger": domain.WorkflowTriggerTypeAuto},
|
||||
dbx.Params{"enabled": true, "trigger": string(domain.WorkflowTriggerTypeAuto)},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -61,7 +61,22 @@ func (r *WorkflowOutputRepository) SaveWithCertificate(ctx context.Context, work
|
||||
workflowOutput.UpdatedAt = record.GetDateTime("updated").Time()
|
||||
}
|
||||
|
||||
if certificate != nil {
|
||||
if certificate == nil {
|
||||
panic("certificate is nil")
|
||||
} else {
|
||||
if certificate.WorkflowId != "" && certificate.WorkflowId != workflowOutput.WorkflowId {
|
||||
return workflowOutput, fmt.Errorf("certificate #%s is not belong to workflow #%s", certificate.Id, workflowOutput.WorkflowId)
|
||||
}
|
||||
if certificate.WorkflowRunId != "" && certificate.WorkflowRunId != workflowOutput.RunId {
|
||||
return workflowOutput, fmt.Errorf("certificate #%s is not belong to workflow run #%s", certificate.Id, workflowOutput.RunId)
|
||||
}
|
||||
if certificate.WorkflowNodeId != "" && certificate.WorkflowNodeId != workflowOutput.NodeId {
|
||||
return workflowOutput, fmt.Errorf("certificate #%s is not belong to workflow node #%s", certificate.Id, workflowOutput.NodeId)
|
||||
}
|
||||
if certificate.WorkflowOutputId != "" && certificate.WorkflowOutputId != workflowOutput.Id {
|
||||
return workflowOutput, fmt.Errorf("certificate #%s is not belong to workflow output #%s", certificate.Id, workflowOutput.Id)
|
||||
}
|
||||
|
||||
certificate.WorkflowId = workflowOutput.WorkflowId
|
||||
certificate.WorkflowRunId = workflowOutput.RunId
|
||||
certificate.WorkflowNodeId = workflowOutput.NodeId
|
||||
@ -143,5 +158,5 @@ func (r *WorkflowOutputRepository) saveRecord(workflowOutput *domain.WorkflowOut
|
||||
return record, err
|
||||
}
|
||||
|
||||
return record, err
|
||||
return record, nil
|
||||
}
|
||||
|
@ -6,6 +6,6 @@ type certificateService interface {
|
||||
InitSchedule(ctx context.Context) error
|
||||
}
|
||||
|
||||
func NewCertificateScheduler(service certificateService) error {
|
||||
func InitCertificateScheduler(service certificateService) error {
|
||||
return service.InitSchedule(context.Background())
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"github.com/usual2970/certimate/internal/app"
|
||||
"github.com/usual2970/certimate/internal/certificate"
|
||||
"github.com/usual2970/certimate/internal/repository"
|
||||
"github.com/usual2970/certimate/internal/workflow"
|
||||
@ -14,7 +15,11 @@ func Register() {
|
||||
certificateRepo := repository.NewCertificateRepository()
|
||||
certificateSvc := certificate.NewCertificateService(certificateRepo)
|
||||
|
||||
NewCertificateScheduler(certificateSvc)
|
||||
if err := InitWorkflowScheduler(workflowSvc); err != nil {
|
||||
app.GetLogger().Error("failed to init workflow scheduler", "err", err)
|
||||
}
|
||||
|
||||
NewWorkflowScheduler(workflowSvc)
|
||||
if err := InitCertificateScheduler(certificateSvc); err != nil {
|
||||
app.GetLogger().Error("failed to init certificate scheduler", "err", err)
|
||||
}
|
||||
}
|
||||
|
@ -6,6 +6,6 @@ type workflowService interface {
|
||||
InitSchedule(ctx context.Context) error
|
||||
}
|
||||
|
||||
func NewWorkflowScheduler(service workflowService) error {
|
||||
func InitWorkflowScheduler(service workflowService) error {
|
||||
return service.InitSchedule(context.Background())
|
||||
}
|
||||
|
@ -92,6 +92,7 @@ func (w *WorkflowDispatcher) Dispatch(data *WorkflowWorkerData) {
|
||||
}
|
||||
|
||||
w.enqueueWorker(data)
|
||||
|
||||
select {
|
||||
case w.chWork <- data:
|
||||
default:
|
||||
@ -138,6 +139,11 @@ func (w *WorkflowDispatcher) Shutdown() {
|
||||
w.queueMutex.Unlock()
|
||||
|
||||
// 等待所有正在执行的 WorkflowRun 完成
|
||||
w.workerMutex.Lock()
|
||||
for _, worker := range w.workers {
|
||||
worker.Cancel()
|
||||
}
|
||||
w.workerMutex.Unlock()
|
||||
w.wg.Wait()
|
||||
w.workers = make(map[string]*workflowWorker)
|
||||
w.workerIdMap = make(map[string]string)
|
||||
|
@ -72,7 +72,6 @@ func onWorkflowRecordCreateOrUpdate(ctx context.Context, record *core.Record) er
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
app.GetLogger().Error("add cron job failed", "err", err)
|
||||
return fmt.Errorf("add cron job failed: %w", err)
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,7 @@ type applyNode struct {
|
||||
func NewApplyNode(node *domain.WorkflowNode) *applyNode {
|
||||
return &applyNode{
|
||||
node: node,
|
||||
nodeLogger: NewNodeLogger(node),
|
||||
nodeLogger: newNodeLogger(node),
|
||||
|
||||
certRepo: repository.NewCertificateRepository(),
|
||||
outputRepo: repository.NewWorkflowOutputRepository(),
|
||||
@ -32,40 +32,40 @@ func NewApplyNode(node *domain.WorkflowNode) *applyNode {
|
||||
}
|
||||
|
||||
func (n *applyNode) Process(ctx context.Context) error {
|
||||
n.AddOutput(ctx, n.node.Name, "开始执行")
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "进入申请证书节点")
|
||||
|
||||
// 查询上次执行结果
|
||||
lastOutput, err := n.outputRepo.GetByNodeId(ctx, n.node.Id)
|
||||
if err != nil && !domain.IsRecordNotFoundError(err) {
|
||||
n.AddOutput(ctx, n.node.Name, "查询申请记录失败", err.Error())
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "查询申请记录失败", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
// 检测是否可以跳过本次执行
|
||||
if skippable, skipReason := n.checkCanSkip(ctx, lastOutput); skippable {
|
||||
n.AddOutput(ctx, n.node.Name, skipReason)
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, skipReason)
|
||||
return nil
|
||||
}
|
||||
|
||||
// 初始化申请器
|
||||
applicant, err := applicant.NewWithApplyNode(n.node)
|
||||
if err != nil {
|
||||
n.AddOutput(ctx, n.node.Name, "获取申请对象失败", err.Error())
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "获取申请对象失败", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
// 申请证书
|
||||
applyResult, err := applicant.Apply()
|
||||
if err != nil {
|
||||
n.AddOutput(ctx, n.node.Name, "申请失败", err.Error())
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "申请失败", err.Error())
|
||||
return err
|
||||
}
|
||||
n.AddOutput(ctx, n.node.Name, "申请成功")
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "申请成功")
|
||||
|
||||
// 解析证书并生成实体
|
||||
certX509, err := certs.ParseCertificateFromPEM(applyResult.CertificateFullChain)
|
||||
if err != nil {
|
||||
n.AddOutput(ctx, n.node.Name, "解析证书失败", err.Error())
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "解析证书失败", err.Error())
|
||||
return err
|
||||
}
|
||||
certificate := &domain.Certificate{
|
||||
@ -89,10 +89,10 @@ func (n *applyNode) Process(ctx context.Context) error {
|
||||
Outputs: n.node.Outputs,
|
||||
}
|
||||
if _, err := n.outputRepo.SaveWithCertificate(ctx, output, certificate); err != nil {
|
||||
n.AddOutput(ctx, n.node.Name, "保存申请记录失败", err.Error())
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "保存申请记录失败", err.Error())
|
||||
return err
|
||||
}
|
||||
n.AddOutput(ctx, n.node.Name, "保存申请记录成功")
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "保存申请记录成功")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -14,13 +14,11 @@ type conditionNode struct {
|
||||
func NewConditionNode(node *domain.WorkflowNode) *conditionNode {
|
||||
return &conditionNode{
|
||||
node: node,
|
||||
nodeLogger: NewNodeLogger(node),
|
||||
nodeLogger: newNodeLogger(node),
|
||||
}
|
||||
}
|
||||
|
||||
func (n *conditionNode) Process(ctx context.Context) error {
|
||||
// 此类型节点不需要执行任何操作,直接返回
|
||||
n.AddOutput(ctx, n.node.Name, "完成")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ type deployNode struct {
|
||||
func NewDeployNode(node *domain.WorkflowNode) *deployNode {
|
||||
return &deployNode{
|
||||
node: node,
|
||||
nodeLogger: NewNodeLogger(node),
|
||||
nodeLogger: newNodeLogger(node),
|
||||
|
||||
certRepo: repository.NewCertificateRepository(),
|
||||
outputRepo: repository.NewWorkflowOutputRepository(),
|
||||
@ -30,12 +30,12 @@ func NewDeployNode(node *domain.WorkflowNode) *deployNode {
|
||||
}
|
||||
|
||||
func (n *deployNode) Process(ctx context.Context) error {
|
||||
n.AddOutput(ctx, n.node.Name, "开始执行")
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "开始执行")
|
||||
|
||||
// 查询上次执行结果
|
||||
lastOutput, err := n.outputRepo.GetByNodeId(ctx, n.node.Id)
|
||||
if err != nil && !domain.IsRecordNotFoundError(err) {
|
||||
n.AddOutput(ctx, n.node.Name, "查询部署记录失败", err.Error())
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "查询部署记录失败", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
@ -43,19 +43,19 @@ func (n *deployNode) Process(ctx context.Context) error {
|
||||
previousNodeOutputCertificateSource := n.node.GetConfigForDeploy().Certificate
|
||||
previousNodeOutputCertificateSourceSlice := strings.Split(previousNodeOutputCertificateSource, "#")
|
||||
if len(previousNodeOutputCertificateSourceSlice) != 2 {
|
||||
n.AddOutput(ctx, n.node.Name, "证书来源配置错误", previousNodeOutputCertificateSource)
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "证书来源配置错误", previousNodeOutputCertificateSource)
|
||||
return fmt.Errorf("证书来源配置错误: %s", previousNodeOutputCertificateSource)
|
||||
}
|
||||
certificate, err := n.certRepo.GetByWorkflowNodeId(ctx, previousNodeOutputCertificateSourceSlice[0])
|
||||
if err != nil {
|
||||
n.AddOutput(ctx, n.node.Name, "获取证书失败", err.Error())
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "获取证书失败", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
// 检测是否可以跳过本次执行
|
||||
if lastOutput != nil && certificate.CreatedAt.Before(lastOutput.UpdatedAt) {
|
||||
if skippable, skipReason := n.checkCanSkip(ctx, lastOutput); skippable {
|
||||
n.AddOutput(ctx, n.node.Name, skipReason)
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, skipReason)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -66,16 +66,16 @@ func (n *deployNode) Process(ctx context.Context) error {
|
||||
PrivateKey string
|
||||
}{Certificate: certificate.Certificate, PrivateKey: certificate.PrivateKey})
|
||||
if err != nil {
|
||||
n.AddOutput(ctx, n.node.Name, "获取部署对象失败", err.Error())
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "获取部署对象失败", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
// 部署证书
|
||||
if err := deployer.Deploy(ctx); err != nil {
|
||||
n.AddOutput(ctx, n.node.Name, "部署失败", err.Error())
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "部署失败", err.Error())
|
||||
return err
|
||||
}
|
||||
n.AddOutput(ctx, n.node.Name, "部署成功")
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "部署成功")
|
||||
|
||||
// 保存执行结果
|
||||
output := &domain.WorkflowOutput{
|
||||
@ -86,10 +86,10 @@ func (n *deployNode) Process(ctx context.Context) error {
|
||||
Succeeded: true,
|
||||
}
|
||||
if _, err := n.outputRepo.Save(ctx, output); err != nil {
|
||||
n.AddOutput(ctx, n.node.Name, "保存部署记录失败", err.Error())
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "保存部署记录失败", err.Error())
|
||||
return err
|
||||
}
|
||||
n.AddOutput(ctx, n.node.Name, "保存部署记录成功")
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "保存部署记录成功")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -14,13 +14,13 @@ type executeFailureNode struct {
|
||||
func NewExecuteFailureNode(node *domain.WorkflowNode) *executeFailureNode {
|
||||
return &executeFailureNode{
|
||||
node: node,
|
||||
nodeLogger: NewNodeLogger(node),
|
||||
nodeLogger: newNodeLogger(node),
|
||||
}
|
||||
}
|
||||
|
||||
func (n *executeFailureNode) Process(ctx context.Context) error {
|
||||
// 此类型节点不需要执行任何操作,直接返回
|
||||
n.AddOutput(ctx, n.node.Name, "进入执行失败分支")
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "进入执行失败分支")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -14,13 +14,13 @@ type executeSuccessNode struct {
|
||||
func NewExecuteSuccessNode(node *domain.WorkflowNode) *executeSuccessNode {
|
||||
return &executeSuccessNode{
|
||||
node: node,
|
||||
nodeLogger: NewNodeLogger(node),
|
||||
nodeLogger: newNodeLogger(node),
|
||||
}
|
||||
}
|
||||
|
||||
func (n *executeSuccessNode) Process(ctx context.Context) error {
|
||||
// 此类型节点不需要执行任何操作,直接返回
|
||||
n.AddOutput(ctx, n.node.Name, "进入执行成功分支")
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "进入执行成功分支")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -18,37 +18,37 @@ type notifyNode struct {
|
||||
func NewNotifyNode(node *domain.WorkflowNode) *notifyNode {
|
||||
return ¬ifyNode{
|
||||
node: node,
|
||||
nodeLogger: NewNodeLogger(node),
|
||||
nodeLogger: newNodeLogger(node),
|
||||
|
||||
settingsRepo: repository.NewSettingsRepository(),
|
||||
}
|
||||
}
|
||||
|
||||
func (n *notifyNode) Process(ctx context.Context) error {
|
||||
n.AddOutput(ctx, n.node.Name, "开始执行")
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "进入推送通知节点")
|
||||
|
||||
nodeConfig := n.node.GetConfigForNotify()
|
||||
|
||||
// 获取通知配置
|
||||
settings, err := n.settingsRepo.GetByName(ctx, "notifyChannels")
|
||||
if err != nil {
|
||||
n.AddOutput(ctx, n.node.Name, "获取通知配置失败", err.Error())
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "获取通知配置失败", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
// 获取通知渠道
|
||||
channelConfig, err := settings.GetNotifyChannelConfig(nodeConfig.Channel)
|
||||
if err != nil {
|
||||
n.AddOutput(ctx, n.node.Name, "获取通知渠道配置失败", err.Error())
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "获取通知渠道配置失败", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
// 发送通知
|
||||
if err := notify.SendToChannel(nodeConfig.Subject, nodeConfig.Message, nodeConfig.Channel, channelConfig); err != nil {
|
||||
n.AddOutput(ctx, n.node.Name, "发送通知失败", err.Error())
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "发送通知失败", err.Error())
|
||||
return err
|
||||
}
|
||||
n.AddOutput(ctx, n.node.Name, "发送通知成功")
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "发送通知成功")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -10,8 +10,9 @@ import (
|
||||
|
||||
type NodeProcessor interface {
|
||||
Process(ctx context.Context) error
|
||||
|
||||
GetLog(ctx context.Context) *domain.WorkflowRunLog
|
||||
AddOutput(ctx context.Context, title, content string, err ...string)
|
||||
AppendLogRecord(ctx context.Context, level domain.WorkflowRunLogLevel, content string, err ...string)
|
||||
}
|
||||
|
||||
type nodeLogger struct {
|
||||
@ -32,12 +33,12 @@ type settingsRepository interface {
|
||||
GetByName(ctx context.Context, name string) (*domain.Settings, error)
|
||||
}
|
||||
|
||||
func NewNodeLogger(node *domain.WorkflowNode) *nodeLogger {
|
||||
func newNodeLogger(node *domain.WorkflowNode) *nodeLogger {
|
||||
return &nodeLogger{
|
||||
log: &domain.WorkflowRunLog{
|
||||
NodeId: node.Id,
|
||||
NodeName: node.Name,
|
||||
Outputs: make([]domain.WorkflowRunLogOutput, 0),
|
||||
Records: make([]domain.WorkflowRunLogRecord, 0),
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -46,17 +47,17 @@ func (l *nodeLogger) GetLog(ctx context.Context) *domain.WorkflowRunLog {
|
||||
return l.log
|
||||
}
|
||||
|
||||
func (l *nodeLogger) AddOutput(ctx context.Context, title, content string, err ...string) {
|
||||
output := domain.WorkflowRunLogOutput{
|
||||
func (l *nodeLogger) AppendLogRecord(ctx context.Context, level domain.WorkflowRunLogLevel, content string, err ...string) {
|
||||
record := domain.WorkflowRunLogRecord{
|
||||
Time: time.Now().UTC().Format(time.RFC3339),
|
||||
Title: title,
|
||||
Level: level,
|
||||
Content: content,
|
||||
}
|
||||
if len(err) > 0 {
|
||||
output.Error = err[0]
|
||||
l.log.Error = err[0]
|
||||
record.Error = err[0]
|
||||
}
|
||||
l.log.Outputs = append(l.log.Outputs, output)
|
||||
|
||||
l.log.Records = append(l.log.Records, record)
|
||||
}
|
||||
|
||||
func GetProcessor(node *domain.WorkflowNode) (NodeProcessor, error) {
|
||||
|
@ -14,13 +14,13 @@ type startNode struct {
|
||||
func NewStartNode(node *domain.WorkflowNode) *startNode {
|
||||
return &startNode{
|
||||
node: node,
|
||||
nodeLogger: NewNodeLogger(node),
|
||||
nodeLogger: newNodeLogger(node),
|
||||
}
|
||||
}
|
||||
|
||||
func (n *startNode) Process(ctx context.Context) error {
|
||||
// 此类型节点不需要执行任何操作,直接返回
|
||||
n.AddOutput(ctx, n.node.Name, "完成")
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "进入开始节点")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ type uploadNode struct {
|
||||
func NewUploadNode(node *domain.WorkflowNode) *uploadNode {
|
||||
return &uploadNode{
|
||||
node: node,
|
||||
nodeLogger: NewNodeLogger(node),
|
||||
nodeLogger: newNodeLogger(node),
|
||||
|
||||
certRepo: repository.NewCertificateRepository(),
|
||||
outputRepo: repository.NewWorkflowOutputRepository(),
|
||||
@ -30,20 +30,20 @@ func NewUploadNode(node *domain.WorkflowNode) *uploadNode {
|
||||
}
|
||||
|
||||
func (n *uploadNode) Process(ctx context.Context) error {
|
||||
n.AddOutput(ctx, n.node.Name, "进入上传证书节点")
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "进入上传证书节点")
|
||||
|
||||
nodeConfig := n.node.GetConfigForUpload()
|
||||
|
||||
// 查询上次执行结果
|
||||
lastOutput, err := n.outputRepo.GetByNodeId(ctx, n.node.Id)
|
||||
if err != nil && !domain.IsRecordNotFoundError(err) {
|
||||
n.AddOutput(ctx, n.node.Name, "查询申请记录失败", err.Error())
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "查询申请记录失败", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
// 检测是否可以跳过本次执行
|
||||
if skippable, skipReason := n.checkCanSkip(ctx, lastOutput); skippable {
|
||||
n.AddOutput(ctx, n.node.Name, skipReason)
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, skipReason)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -51,11 +51,11 @@ func (n *uploadNode) Process(ctx context.Context) error {
|
||||
// 如果证书过期,则直接返回错误
|
||||
certX509, err := certs.ParseCertificateFromPEM(nodeConfig.Certificate)
|
||||
if err != nil {
|
||||
n.AddOutput(ctx, n.node.Name, "解析证书失败")
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "解析证书失败")
|
||||
return err
|
||||
}
|
||||
if time.Now().After(certX509.NotAfter) {
|
||||
n.AddOutput(ctx, n.node.Name, "证书已过期")
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelWarn, "证书已过期")
|
||||
return errors.New("certificate is expired")
|
||||
}
|
||||
|
||||
@ -75,10 +75,10 @@ func (n *uploadNode) Process(ctx context.Context) error {
|
||||
Outputs: n.node.Outputs,
|
||||
}
|
||||
if _, err := n.outputRepo.SaveWithCertificate(ctx, output, certificate); err != nil {
|
||||
n.AddOutput(ctx, n.node.Name, "保存上传记录失败", err.Error())
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelError, "保存上传记录失败", err.Error())
|
||||
return err
|
||||
}
|
||||
n.AddOutput(ctx, n.node.Name, "保存上传记录成功")
|
||||
n.AppendLogRecord(ctx, domain.WorkflowRunLogLevelInfo, "保存上传记录成功")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -48,6 +48,8 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error {
|
||||
|
||||
scheduler := app.GetScheduler()
|
||||
for _, workflow := range workflows {
|
||||
var errs []error
|
||||
|
||||
err := scheduler.Add(fmt.Sprintf("workflow#%s", workflow.Id), workflow.TriggerCron, func() {
|
||||
s.StartRun(ctx, &dtos.WorkflowStartRunReq{
|
||||
WorkflowId: workflow.Id,
|
||||
@ -55,7 +57,11 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error {
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,9 +1,17 @@
|
||||
import { useState } from "react";
|
||||
import { useTranslation } from "react-i18next";
|
||||
import { Alert, Typography } from "antd";
|
||||
import { SelectOutlined as SelectOutlinedIcon } from "@ant-design/icons";
|
||||
import { useRequest } from "ahooks";
|
||||
import { Alert, Button, Divider, Empty, Table, type TableProps, Tooltip, Typography, notification } from "antd";
|
||||
import dayjs from "dayjs";
|
||||
import { ClientResponseError } from "pocketbase";
|
||||
|
||||
import CertificateDetailDrawer from "@/components/certificate/CertificateDetailDrawer";
|
||||
import Show from "@/components/Show";
|
||||
import { type CertificateModel } from "@/domain/certificate";
|
||||
import { WORKFLOW_RUN_STATUSES, type WorkflowRunModel } from "@/domain/workflowRun";
|
||||
import { listByWorkflowRunId as listCertificateByWorkflowRunId } from "@/repository/certificate";
|
||||
import { getErrMsg } from "@/utils/error";
|
||||
|
||||
export type WorkflowRunDetailProps = {
|
||||
className?: string;
|
||||
@ -31,7 +39,7 @@ const WorkflowRunDetail = ({ data, ...props }: WorkflowRunDetailProps) => {
|
||||
<div key={i} className="flex flex-col space-y-2">
|
||||
<div className="font-semibold">{item.nodeName}</div>
|
||||
<div className="flex flex-col space-y-1">
|
||||
{item.outputs?.map((output, j) => {
|
||||
{item.records?.map((output, j) => {
|
||||
return (
|
||||
<div key={j} className="flex space-x-2 text-sm" style={{ wordBreak: "break-word" }}>
|
||||
<div className="whitespace-nowrap">[{dayjs(output.time).format("YYYY-MM-DD HH:mm:ss")}]</div>
|
||||
@ -45,8 +53,108 @@ const WorkflowRunDetail = ({ data, ...props }: WorkflowRunDetailProps) => {
|
||||
})}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<Show when={data.status === WORKFLOW_RUN_STATUSES.SUCCEEDED}>
|
||||
<Divider />
|
||||
|
||||
<WorkflowRunArtifacts runId={data.id} />
|
||||
</Show>
|
||||
</div>
|
||||
);
|
||||
};
|
||||
|
||||
const WorkflowRunArtifacts = ({ runId }: { runId: string }) => {
|
||||
const { t } = useTranslation();
|
||||
|
||||
const [notificationApi, NotificationContextHolder] = notification.useNotification();
|
||||
|
||||
const tableColumns: TableProps<CertificateModel>["columns"] = [
|
||||
{
|
||||
key: "$index",
|
||||
align: "center",
|
||||
fixed: "left",
|
||||
width: 50,
|
||||
render: (_, __, index) => index + 1,
|
||||
},
|
||||
{
|
||||
key: "type",
|
||||
title: t("workflow_run_artifact.props.type"),
|
||||
render: () => t("workflow_run_artifact.props.type.certificate"),
|
||||
},
|
||||
{
|
||||
key: "name",
|
||||
title: t("workflow_run_artifact.props.name"),
|
||||
ellipsis: true,
|
||||
render: (_, record) => {
|
||||
return (
|
||||
<Typography.Text delete={!!record.deleted} ellipsis>
|
||||
{record.subjectAltNames}
|
||||
</Typography.Text>
|
||||
);
|
||||
},
|
||||
},
|
||||
{
|
||||
key: "$action",
|
||||
align: "end",
|
||||
width: 120,
|
||||
render: (_, record) => (
|
||||
<Button.Group>
|
||||
<CertificateDetailDrawer
|
||||
data={record}
|
||||
trigger={
|
||||
<Tooltip title={t("certificate.action.view")}>
|
||||
<Button color="primary" disabled={!!record.deleted} icon={<SelectOutlinedIcon />} variant="text" />
|
||||
</Tooltip>
|
||||
}
|
||||
/>
|
||||
</Button.Group>
|
||||
),
|
||||
},
|
||||
];
|
||||
const [tableData, setTableData] = useState<CertificateModel[]>([]);
|
||||
const { loading: tableLoading } = useRequest(
|
||||
() => {
|
||||
return listCertificateByWorkflowRunId(runId);
|
||||
},
|
||||
{
|
||||
refreshDeps: [runId],
|
||||
onBefore: () => {
|
||||
setTableData([]);
|
||||
},
|
||||
onSuccess: (res) => {
|
||||
setTableData(res.items);
|
||||
},
|
||||
onError: (err) => {
|
||||
if (err instanceof ClientResponseError && err.isAbort) {
|
||||
return;
|
||||
}
|
||||
|
||||
console.error(err);
|
||||
notificationApi.error({ message: t("common.text.request_error"), description: getErrMsg(err) });
|
||||
|
||||
throw err;
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
return (
|
||||
<>
|
||||
{NotificationContextHolder}
|
||||
|
||||
<Typography.Title level={5}>{t("workflow_run.artifacts")}</Typography.Title>
|
||||
<Table<CertificateModel>
|
||||
columns={tableColumns}
|
||||
dataSource={tableData}
|
||||
loading={tableLoading}
|
||||
locale={{
|
||||
emptyText: <Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />,
|
||||
}}
|
||||
pagination={false}
|
||||
rowKey={(record) => record.id}
|
||||
size="small"
|
||||
/>
|
||||
</>
|
||||
);
|
||||
};
|
||||
|
||||
export default WorkflowRunDetail;
|
||||
|
@ -301,7 +301,7 @@ const WorkflowRuns = ({ className, style, workflowId }: WorkflowRunsProps) => {
|
||||
setPageSize(pageSize);
|
||||
},
|
||||
}}
|
||||
rowKey={(record: WorkflowRunModel) => record.id}
|
||||
rowKey={(record) => record.id}
|
||||
scroll={{ x: "max(100%, 960px)" }}
|
||||
/>
|
||||
</div>
|
||||
|
@ -86,7 +86,7 @@ const NotifyNodeConfigForm = forwardRef<NotifyNodeConfigFormInstance, NotifyNode
|
||||
</Form.Item>
|
||||
|
||||
<Form.Item name="message" label={t("workflow_node.notify.form.message.label")} rules={[formRule]}>
|
||||
<Input.TextArea autoSize={{ minRows: 3, maxRows: 10 }} placeholder={t("workflow_node.notify.form.message.placeholder")} />
|
||||
<Input.TextArea autoSize={{ minRows: 3, maxRows: 5 }} placeholder={t("workflow_node.notify.form.message.placeholder")} />
|
||||
</Form.Item>
|
||||
|
||||
<Form.Item className="mb-0">
|
||||
|
@ -3,8 +3,7 @@ import { useTranslation } from "react-i18next";
|
||||
import { Flex, Typography } from "antd";
|
||||
import { produce } from "immer";
|
||||
|
||||
import type { WorkflowNodeConfigForUpload } from "@/domain/workflow";
|
||||
import { WorkflowNodeType } from "@/domain/workflow";
|
||||
import { type WorkflowNodeConfigForUpload, WorkflowNodeType } from "@/domain/workflow";
|
||||
import { useZustandShallowSelector } from "@/hooks";
|
||||
import { useWorkflowStore } from "@/stores/workflow";
|
||||
|
||||
|
@ -141,7 +141,7 @@ const UploadNodeConfigForm = forwardRef<UploadNodeConfigFormInstance, UploadNode
|
||||
</Form.Item>
|
||||
|
||||
<Form.Item name="certificate" label={t("workflow_node.upload.form.certificate.label")} rules={[formRule]}>
|
||||
<Input.TextArea readOnly autoSize={{ minRows: 5, maxRows: 10 }} placeholder={t("workflow_node.upload.form.certificate.placeholder")} />
|
||||
<Input.TextArea readOnly autoSize={{ minRows: 5, maxRows: 5 }} placeholder={t("workflow_node.upload.form.certificate.placeholder")} />
|
||||
</Form.Item>
|
||||
|
||||
<Form.Item>
|
||||
@ -151,7 +151,7 @@ const UploadNodeConfigForm = forwardRef<UploadNodeConfigFormInstance, UploadNode
|
||||
</Form.Item>
|
||||
|
||||
<Form.Item name="privateKey" label={t("workflow_node.upload.form.private_key.label")} rules={[formRule]}>
|
||||
<Input.TextArea readOnly autoSize={{ minRows: 5, maxRows: 10 }} placeholder={t("workflow_node.upload.form.private_key.placeholder")} />
|
||||
<Input.TextArea readOnly autoSize={{ minRows: 5, maxRows: 5 }} placeholder={t("workflow_node.upload.form.private_key.placeholder")} />
|
||||
</Form.Item>
|
||||
|
||||
<Form.Item>
|
||||
|
@ -1 +1 @@
|
||||
export const version = "v0.3.0-alpha.10";
|
||||
export const version = "v0.3.0-alpha.11";
|
||||
|
@ -1,4 +1,4 @@
|
||||
import type { WorkflowModel } from "./workflow";
|
||||
import { type WorkflowModel } from "./workflow";
|
||||
|
||||
export interface WorkflowRunModel extends BaseModel {
|
||||
workflowId: string;
|
||||
@ -16,13 +16,13 @@ export interface WorkflowRunModel extends BaseModel {
|
||||
export type WorkflowRunLog = {
|
||||
nodeId: string;
|
||||
nodeName: string;
|
||||
outputs?: WorkflowRunLogOutput[];
|
||||
records?: WorkflowRunLogRecord[];
|
||||
error?: string;
|
||||
};
|
||||
|
||||
export type WorkflowRunLogOutput = {
|
||||
export type WorkflowRunLogRecord = {
|
||||
time: ISO8601String;
|
||||
title: string;
|
||||
level: string;
|
||||
content: string;
|
||||
error?: string;
|
||||
};
|
||||
|
@ -18,7 +18,7 @@
|
||||
"workflow_node.start.form.trigger_cron.label": "Cron expression",
|
||||
"workflow_node.start.form.trigger_cron.placeholder": "Please enter cron expression",
|
||||
"workflow_node.start.form.trigger_cron.errmsg.invalid": "Please enter a valid cron expression",
|
||||
"workflow_node.start.form.trigger_cron.tooltip": "Time zone is based on the server.",
|
||||
"workflow_node.start.form.trigger_cron.tooltip": "Exactly 5 space separated segments. Time zone is based on the server.",
|
||||
"workflow_node.start.form.trigger_cron.extra": "Expected execution time for the last 5 times:",
|
||||
"workflow_node.start.form.trigger_cron.guide": "Tips: If you have multiple workflows, it is recommended to set them to run at multiple times of the day instead of always running at specific times.<br><br>Reference links:<br>1. <a href=\"https://letsencrypt.org/docs/rate-limits/\" target=\"_blank\">Let’s Encrypt rate limits</a><br>2. <a href=\"https://letsencrypt.org/docs/faq/#why-should-my-let-s-encrypt-acme-client-run-at-a-random-time\" target=\"_blank\">Why should my Let’s Encrypt (ACME) client run at a random time?</a>",
|
||||
|
||||
|
@ -16,5 +16,11 @@
|
||||
"workflow_run.props.trigger.auto": "Timing",
|
||||
"workflow_run.props.trigger.manual": "Manual",
|
||||
"workflow_run.props.started_at": "Started at",
|
||||
"workflow_run.props.ended_at": "Ended at"
|
||||
"workflow_run.props.ended_at": "Ended at",
|
||||
|
||||
"workflow_run.artifacts": "Artifacts",
|
||||
|
||||
"workflow_run_artifact.props.type": "Type",
|
||||
"workflow_run_artifact.props.type.certificate": "Certificate",
|
||||
"workflow_run_artifact.props.name": "Name"
|
||||
}
|
||||
|
@ -18,7 +18,7 @@
|
||||
"workflow_node.start.form.trigger_cron.label": "Cron 表达式",
|
||||
"workflow_node.start.form.trigger_cron.placeholder": "请输入 Cron 表达式",
|
||||
"workflow_node.start.form.trigger_cron.errmsg.invalid": "请输入正确的 Cron 表达式",
|
||||
"workflow_node.start.form.trigger_cron.tooltip": "支持使用任意值(即 <strong>*</strong>)、值列表分隔符(即 <strong>,</strong>)、值的范围(即 <strong>-</strong>)、步骤值(即 <strong>/</strong>)等四种表达式,时区以服务器设置为准。",
|
||||
"workflow_node.start.form.trigger_cron.tooltip": "五段式表达式,支持使用任意值(即 <strong>*</strong>)、值列表分隔符(即 <strong>,</strong>)、值的范围(即 <strong>-</strong>)、步骤值(即 <strong>/</strong>)等四种表达式。时区以服务器设置为准。",
|
||||
"workflow_node.start.form.trigger_cron.extra": "预计最近 5 次执行时间:",
|
||||
"workflow_node.start.form.trigger_cron.guide": "小贴士:如果你有多个工作流,建议将它们设置为在一天中的多个时间段运行,而非总是在相同的特定时间。<br><br>参考链接:<br>1. <a href=\"https://letsencrypt.org/zh-cn/docs/rate-limits/\" target=\"_blank\">Let’s Encrypt 速率限制</a><br>2. <a href=\"https://letsencrypt.org/zh-cn/docs/faq/#%E4%B8%BA%E4%BB%80%E4%B9%88%E6%88%91%E7%9A%84-let-s-encrypt-acme-%E5%AE%A2%E6%88%B7%E7%AB%AF%E5%90%AF%E5%8A%A8%E6%97%B6%E9%97%B4%E5%BA%94%E5%BD%93%E9%9A%8F%E6%9C%BA\" target=\"_blank\">为什么我的 Let’s Encrypt (ACME) 客户端启动时间应当随机?</a>",
|
||||
|
||||
|
@ -16,5 +16,11 @@
|
||||
"workflow_run.props.trigger.auto": "定时执行",
|
||||
"workflow_run.props.trigger.manual": "手动执行",
|
||||
"workflow_run.props.started_at": "开始时间",
|
||||
"workflow_run.props.ended_at": "完成时间"
|
||||
"workflow_run.props.ended_at": "完成时间",
|
||||
|
||||
"workflow_run.artifacts": "输出产物",
|
||||
|
||||
"workflow_run_artifact.props.type": "类型",
|
||||
"workflow_run_artifact.props.type.certificate": "证书",
|
||||
"workflow_run_artifact.props.name": "名称"
|
||||
}
|
||||
|
@ -207,7 +207,7 @@ const AccessList = () => {
|
||||
setPageSize(pageSize);
|
||||
},
|
||||
}}
|
||||
rowKey={(record: AccessModel) => record.id}
|
||||
rowKey={(record) => record.id}
|
||||
scroll={{ x: "max(100%, 960px)" }}
|
||||
/>
|
||||
</div>
|
||||
|
@ -276,7 +276,7 @@ const CertificateList = () => {
|
||||
setPageSize(pageSize);
|
||||
},
|
||||
}}
|
||||
rowKey={(record: CertificateModel) => record.id}
|
||||
rowKey={(record) => record.id}
|
||||
scroll={{ x: "max(100%, 960px)" }}
|
||||
/>
|
||||
</div>
|
||||
|
@ -15,8 +15,7 @@ import {
|
||||
} from "@ant-design/icons";
|
||||
import { PageHeader } from "@ant-design/pro-components";
|
||||
import { useRequest } from "ahooks";
|
||||
import type { TableProps } from "antd";
|
||||
import { Button, Card, Col, Divider, Empty, Flex, Grid, Row, Space, Statistic, Table, Tag, Typography, notification, theme } from "antd";
|
||||
import { Button, Card, Col, Divider, Empty, Flex, Grid, Row, Space, Statistic, Table, type TableProps, Tag, Typography, notification, theme } from "antd";
|
||||
import dayjs from "dayjs";
|
||||
import {
|
||||
CalendarClock as CalendarClockIcon,
|
||||
@ -177,7 +176,7 @@ const Dashboard = () => {
|
||||
() => {
|
||||
return listWorkflowRuns({
|
||||
page: 1,
|
||||
perPage: 5,
|
||||
perPage: 9,
|
||||
expand: true,
|
||||
});
|
||||
},
|
||||
@ -285,8 +284,9 @@ const Dashboard = () => {
|
||||
emptyText: <Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />,
|
||||
}}
|
||||
pagination={false}
|
||||
rowKey={(record: WorkflowRunModel) => record.id}
|
||||
rowKey={(record) => record.id}
|
||||
scroll={{ x: "max(100%, 960px)" }}
|
||||
size="small"
|
||||
/>
|
||||
</Card>
|
||||
</Flex>
|
||||
|
@ -366,7 +366,7 @@ const WorkflowList = () => {
|
||||
setPageSize(pageSize);
|
||||
},
|
||||
}}
|
||||
rowKey={(record: WorkflowModel) => record.id}
|
||||
rowKey={(record) => record.id}
|
||||
scroll={{ x: "max(100%, 960px)" }}
|
||||
/>
|
||||
</div>
|
||||
|
@ -38,6 +38,23 @@ export const list = async (request: ListCertificateRequest) => {
|
||||
return pb.collection(COLLECTION_NAME).getList<CertificateModel>(page, perPage, options);
|
||||
};
|
||||
|
||||
export const listByWorkflowRunId = async (workflowRunId: string) => {
|
||||
const pb = getPocketBase();
|
||||
|
||||
const options: RecordListOptions = {
|
||||
filter: pb.filter("workflowRunId={:workflowRunId}", {
|
||||
workflowRunId: workflowRunId,
|
||||
}),
|
||||
sort: "-created",
|
||||
requestKey: null,
|
||||
};
|
||||
const items = await pb.collection(COLLECTION_NAME).getFullList<CertificateModel>(options);
|
||||
return {
|
||||
totalItems: items.length,
|
||||
items: items,
|
||||
};
|
||||
};
|
||||
|
||||
export const remove = async (record: MaybeModelRecordWithId<CertificateModel>) => {
|
||||
await getPocketBase()
|
||||
.collection(COLLECTION_NAME)
|
||||
|
@ -3,6 +3,8 @@
|
||||
export const validCronExpression = (expr: string): boolean => {
|
||||
try {
|
||||
parseExpression(expr);
|
||||
|
||||
if (expr.trim().split(" ").length !== 5) return false; // pocketbase 后端仅支持五段式的表达式
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
@ -10,19 +12,15 @@ export const validCronExpression = (expr: string): boolean => {
|
||||
};
|
||||
|
||||
export const getNextCronExecutions = (expr: string, times = 1): Date[] => {
|
||||
if (!expr) return [];
|
||||
if (!validCronExpression(expr)) return [];
|
||||
|
||||
try {
|
||||
const now = new Date();
|
||||
const cron = parseExpression(expr, { currentDate: now, iterator: true });
|
||||
const now = new Date();
|
||||
const cron = parseExpression(expr, { currentDate: now, iterator: true });
|
||||
|
||||
const result: Date[] = [];
|
||||
for (let i = 0; i < times; i++) {
|
||||
const next = cron.next();
|
||||
result.push(next.value.toDate());
|
||||
}
|
||||
return result;
|
||||
} catch {
|
||||
return [];
|
||||
const result: Date[] = [];
|
||||
for (let i = 0; i < times; i++) {
|
||||
const next = cron.next();
|
||||
result.push(next.value.toDate());
|
||||
}
|
||||
return result;
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user