refactor: clean code

This commit is contained in:
Fu Diwei
2025-01-05 00:08:12 +08:00
parent 3b9a7fe805
commit 61843a4997
69 changed files with 972 additions and 839 deletions

View File

@@ -14,7 +14,7 @@ import (
const tableName = "workflow"
func AddEvent() error {
func RegisterEvents() error {
app := app.GetApp()
app.OnRecordAfterCreateRequest(tableName).Add(func(e *core.RecordCreateEvent) error {
@@ -32,30 +32,23 @@ func AddEvent() error {
return nil
}
func delete(_ context.Context, record *models.Record) error {
id := record.Id
scheduler := app.GetScheduler()
scheduler.Remove(id)
scheduler.Start()
return nil
}
func update(ctx context.Context, record *models.Record) error {
// 是不是自动
// 是不是 enabled
scheduler := app.GetScheduler()
workflowId := record.Id
// 向数据库插入/更新时,同时更新定时任务
workflowId := record.GetId()
enabled := record.GetBool("enabled")
trigger := record.GetString("trigger")
scheduler := app.GetScheduler()
// 如果是手动触发或未启用,移除定时任务
if !enabled || trigger == string(domain.WorkflowTriggerTypeManual) {
scheduler.Remove(workflowId)
scheduler.Remove(fmt.Sprintf("workflow#%s", workflowId))
scheduler.Start()
return nil
}
err := scheduler.Add(workflowId, record.GetString("triggerCron"), func() {
// 反之,重新添加定时任务
err := scheduler.Add(fmt.Sprintf("workflow#%s", workflowId), record.GetString("triggerCron"), func() {
NewWorkflowService(repository.NewWorkflowRepository()).Run(ctx, &domain.WorkflowRunReq{
WorkflowId: workflowId,
Trigger: domain.WorkflowTriggerTypeAuto,
@@ -65,8 +58,19 @@ func update(ctx context.Context, record *models.Record) error {
app.GetLogger().Error("add cron job failed", "err", err)
return fmt.Errorf("add cron job failed: %w", err)
}
app.GetLogger().Error("add cron job failed", "subjectAltNames", record.GetString("subjectAltNames"))
scheduler.Start()
return nil
}
func delete(_ context.Context, record *models.Record) error {
scheduler := app.GetScheduler()
// 从数据库删除时,同时移除定时任务
workflowId := record.GetId()
scheduler.Remove(fmt.Sprintf("workflow#%s", workflowId))
scheduler.Start()
return nil
}

View File

@@ -29,10 +29,10 @@ func NewApplyNode(node *domain.WorkflowNode) *applyNode {
type WorkflowOutputRepository interface {
// 查询节点输出
Get(ctx context.Context, nodeId string) (*domain.WorkflowOutput, error)
GetByNodeId(ctx context.Context, nodeId string) (*domain.WorkflowOutput, error)
// 查询申请节点的证书
GetCertificate(ctx context.Context, nodeId string) (*domain.Certificate, error)
GetCertificateByNodeId(ctx context.Context, nodeId string) (*domain.Certificate, error)
// 保存节点输出
Save(ctx context.Context, output *domain.WorkflowOutput, certificate *domain.Certificate, cb func(id string) error) error
@@ -42,14 +42,14 @@ type WorkflowOutputRepository interface {
func (a *applyNode) Run(ctx context.Context) error {
a.AddOutput(ctx, a.node.Name, "开始执行")
// 查询是否申请过,已申请过则直接返回(先保持和 v0.2 一致)
output, err := a.outputRepo.Get(ctx, a.node.Id)
output, err := a.outputRepo.GetByNodeId(ctx, a.node.Id)
if err != nil && !domain.IsRecordNotFound(err) {
a.AddOutput(ctx, a.node.Name, "查询申请记录失败", err.Error())
return err
}
if output != nil && output.Succeeded {
cert, err := a.outputRepo.GetCertificate(ctx, a.node.Id)
cert, err := a.outputRepo.GetCertificateByNodeId(ctx, a.node.Id)
if err != nil {
a.AddOutput(ctx, a.node.Name, "获取证书失败", err.Error())
return err
@@ -62,14 +62,14 @@ func (a *applyNode) Run(ctx context.Context) error {
}
// 获取Applicant
apply, err := applicant.GetWithApplyNode(a.node)
applicant, err := applicant.GetWithApplyNode(a.node)
if err != nil {
a.AddOutput(ctx, a.node.Name, "获取申请对象失败", err.Error())
return err
}
// 申请
certificate, err := apply.Apply()
certificate, err := applicant.Apply()
if err != nil {
a.AddOutput(ctx, a.node.Name, "申请失败", err.Error())
return err
@@ -103,8 +103,8 @@ func (a *applyNode) Run(ctx context.Context) error {
Certificate: certificate.Certificate,
PrivateKey: certificate.PrivateKey,
IssuerCertificate: certificate.IssuerCertificate,
AcmeCertUrl: certificate.CertUrl,
AcmeCertStableUrl: certificate.CertStableUrl,
ACMECertUrl: certificate.CertUrl,
ACMECertStableUrl: certificate.CertStableUrl,
EffectAt: certX509.NotBefore,
ExpireAt: certX509.NotAfter,
WorkflowId: GetWorkflowId(ctx),

View File

@@ -28,7 +28,7 @@ func NewDeployNode(node *domain.WorkflowNode) *deployNode {
func (d *deployNode) Run(ctx context.Context) error {
d.AddOutput(ctx, d.node.Name, "开始执行")
// 检查是否部署过(部署过则直接返回,和 v0.2 暂时保持一致)
output, err := d.outputRepo.Get(ctx, d.node.Id)
output, err := d.outputRepo.GetByNodeId(ctx, d.node.Id)
if err != nil && !domain.IsRecordNotFound(err) {
d.AddOutput(ctx, d.node.Name, "查询部署记录失败", err.Error())
return err
@@ -43,7 +43,7 @@ func (d *deployNode) Run(ctx context.Context) error {
return fmt.Errorf("证书来源配置错误: %s", certSource)
}
cert, err := d.outputRepo.GetCertificate(ctx, certSourceSlice[0])
cert, err := d.outputRepo.GetCertificateByNodeId(ctx, certSourceSlice[0])
if err != nil {
d.AddOutput(ctx, d.node.Name, "获取证书失败", err.Error())
return err
@@ -71,8 +71,8 @@ func (d *deployNode) Run(ctx context.Context) error {
AccessConfig: access.Config,
AccessRecord: access,
Certificate: applicant.Certificate{
CertUrl: cert.AcmeCertUrl,
CertStableUrl: cert.AcmeCertStableUrl,
CertUrl: cert.ACMECertUrl,
CertStableUrl: cert.ACMECertStableUrl,
PrivateKey: cert.PrivateKey,
Certificate: cert.Certificate,
IssuerCertificate: cert.IssuerCertificate,
@@ -85,7 +85,7 @@ func (d *deployNode) Run(ctx context.Context) error {
},
}
deploy, err := deployer.GetWithTypeAndOption(d.node.GetConfigString("provider"), option)
deploy, err := deployer.GetWithProviderAndOption(d.node.GetConfigString("provider"), option)
if err != nil {
d.AddOutput(ctx, d.node.Name, "获取部署对象失败", err.Error())
return err

View File

@@ -11,7 +11,7 @@ import (
)
type WorkflowRepository interface {
Get(ctx context.Context, id string) (*domain.Workflow, error)
GetById(ctx context.Context, id string) (*domain.Workflow, error)
SaveRun(ctx context.Context, run *domain.WorkflowRun) error
ListEnabledAuto(ctx context.Context) ([]domain.Workflow, error)
}
@@ -27,7 +27,6 @@ func NewWorkflowService(repo WorkflowRepository) *WorkflowService {
}
func (s *WorkflowService) InitSchedule(ctx context.Context) error {
// 查询所有的 enabled auto workflow
workflows, err := s.repo.ListEnabledAuto(ctx)
if err != nil {
return err
@@ -35,7 +34,7 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error {
scheduler := app.GetScheduler()
for _, workflow := range workflows {
err := scheduler.Add(workflow.Id, workflow.TriggerCron, func() {
err := scheduler.Add(fmt.Sprintf("workflow#%s", workflow.Id), workflow.TriggerCron, func() {
s.Run(ctx, &domain.WorkflowRunReq{
WorkflowId: workflow.Id,
Trigger: domain.WorkflowTriggerTypeAuto,
@@ -55,21 +54,12 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error {
func (s *WorkflowService) Run(ctx context.Context, options *domain.WorkflowRunReq) error {
// 查询
if options.WorkflowId == "" {
return domain.ErrInvalidParams
}
workflow, err := s.repo.Get(ctx, options.WorkflowId)
workflow, err := s.repo.GetById(ctx, options.WorkflowId)
if err != nil {
app.GetLogger().Error("failed to get workflow", "id", options.WorkflowId, "err", err)
return err
}
if !workflow.Enabled {
app.GetLogger().Error("workflow is disabled", "id", options.WorkflowId)
return fmt.Errorf("workflow is disabled")
}
// 执行
run := &domain.WorkflowRun{
WorkflowId: workflow.Id,
@@ -78,7 +68,6 @@ func (s *WorkflowService) Run(ctx context.Context, options *domain.WorkflowRunRe
StartedAt: time.Now(),
EndedAt: time.Now(),
}
processor := nodeprocessor.NewWorkflowProcessor(workflow)
if err := processor.Run(ctx); err != nil {
run.Status = domain.WorkflowRunStatusTypeFailed
@@ -93,7 +82,7 @@ func (s *WorkflowService) Run(ctx context.Context, options *domain.WorkflowRunRe
return fmt.Errorf("failed to run workflow: %w", err)
}
// 保存执行日志
// 保存日志
logs := processor.Log(ctx)
runStatus := domain.WorkflowRunStatusTypeSucceeded
runError := domain.WorkflowRunLogs(logs).FirstError()