mirror of
https://github.com/usual2970/certimate.git
synced 2025-07-21 18:37:58 +00:00
refactor: clean code
This commit is contained in:
@@ -12,37 +12,35 @@ import (
|
||||
)
|
||||
|
||||
func Register() {
|
||||
const tableName = "workflow"
|
||||
|
||||
app := app.GetApp()
|
||||
app.OnRecordCreateRequest(tableName).BindFunc(func(e *core.RecordRequestEvent) error {
|
||||
app.OnRecordCreateRequest(domain.CollectionNameWorkflow).BindFunc(func(e *core.RecordRequestEvent) error {
|
||||
if err := e.Next(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := update(e.Request.Context(), e.Record); err != nil {
|
||||
if err := onWorkflowRecordCreateOrUpdate(e.Request.Context(), e.Record); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
app.OnRecordUpdateRequest(tableName).BindFunc(func(e *core.RecordRequestEvent) error {
|
||||
app.OnRecordUpdateRequest(domain.CollectionNameWorkflow).BindFunc(func(e *core.RecordRequestEvent) error {
|
||||
if err := e.Next(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := update(e.Request.Context(), e.Record); err != nil {
|
||||
if err := onWorkflowRecordCreateOrUpdate(e.Request.Context(), e.Record); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
app.OnRecordDeleteRequest(tableName).BindFunc(func(e *core.RecordRequestEvent) error {
|
||||
app.OnRecordDeleteRequest(domain.CollectionNameWorkflow).BindFunc(func(e *core.RecordRequestEvent) error {
|
||||
if err := e.Next(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := delete(e.Request.Context(), e.Record); err != nil {
|
||||
if err := onWorkflowRecordDelete(e.Request.Context(), e.Record); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -50,7 +48,7 @@ func Register() {
|
||||
})
|
||||
}
|
||||
|
||||
func update(ctx context.Context, record *core.Record) error {
|
||||
func onWorkflowRecordCreateOrUpdate(ctx context.Context, record *core.Record) error {
|
||||
scheduler := app.GetScheduler()
|
||||
|
||||
// 向数据库插入/更新时,同时更新定时任务
|
||||
@@ -79,7 +77,7 @@ func update(ctx context.Context, record *core.Record) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func delete(_ context.Context, record *core.Record) error {
|
||||
func onWorkflowRecordDelete(_ context.Context, record *core.Record) error {
|
||||
scheduler := app.GetScheduler()
|
||||
|
||||
// 从数据库删除时,同时移除定时任务
|
||||
|
@@ -77,14 +77,14 @@ func (a *applyNode) Run(ctx context.Context) error {
|
||||
ACMECertStableUrl: applyResult.ACMECertStableUrl,
|
||||
EffectAt: certX509.NotBefore,
|
||||
ExpireAt: certX509.NotAfter,
|
||||
WorkflowId: GetWorkflowId(ctx),
|
||||
WorkflowId: getContextWorkflowId(ctx),
|
||||
WorkflowNodeId: a.node.Id,
|
||||
}
|
||||
|
||||
// 保存执行结果
|
||||
// TODO: 先保持一个节点始终只有一个输出,后续增加版本控制
|
||||
currentOutput := &domain.WorkflowOutput{
|
||||
WorkflowId: GetWorkflowId(ctx),
|
||||
WorkflowId: getContextWorkflowId(ctx),
|
||||
NodeId: a.node.Id,
|
||||
Node: a.node,
|
||||
Succeeded: true,
|
||||
|
@@ -81,7 +81,7 @@ func (d *deployNode) Run(ctx context.Context) error {
|
||||
// TODO: 先保持一个节点始终只有一个输出,后续增加版本控制
|
||||
currentOutput := &domain.WorkflowOutput{
|
||||
Meta: domain.Meta{},
|
||||
WorkflowId: GetWorkflowId(ctx),
|
||||
WorkflowId: getContextWorkflowId(ctx),
|
||||
NodeId: d.node.Id,
|
||||
Node: d.node,
|
||||
Succeeded: true,
|
||||
|
@@ -18,6 +18,19 @@ type nodeLogger struct {
|
||||
log *domain.WorkflowRunLog
|
||||
}
|
||||
|
||||
type certificateRepository interface {
|
||||
GetByWorkflowNodeId(ctx context.Context, workflowNodeId string) (*domain.Certificate, error)
|
||||
}
|
||||
|
||||
type workflowOutputRepository interface {
|
||||
GetByNodeId(ctx context.Context, nodeId string) (*domain.WorkflowOutput, error)
|
||||
Save(ctx context.Context, output *domain.WorkflowOutput, certificate *domain.Certificate, cb func(id string) error) error
|
||||
}
|
||||
|
||||
type settingsRepository interface {
|
||||
GetByName(ctx context.Context, name string) (*domain.Settings, error)
|
||||
}
|
||||
|
||||
func NewNodeLogger(node *domain.WorkflowNode) *nodeLogger {
|
||||
return &nodeLogger{
|
||||
log: &domain.WorkflowRunLog{
|
||||
@@ -61,15 +74,6 @@ func GetProcessor(node *domain.WorkflowNode) (nodeProcessor, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
type certificateRepository interface {
|
||||
GetByWorkflowNodeId(ctx context.Context, workflowNodeId string) (*domain.Certificate, error)
|
||||
}
|
||||
|
||||
type workflowOutputRepository interface {
|
||||
GetByNodeId(ctx context.Context, nodeId string) (*domain.WorkflowOutput, error)
|
||||
Save(ctx context.Context, output *domain.WorkflowOutput, certificate *domain.Certificate, cb func(id string) error) error
|
||||
}
|
||||
|
||||
type settingsRepository interface {
|
||||
GetByName(ctx context.Context, name string) (*domain.Settings, error)
|
||||
func getContextWorkflowId(ctx context.Context) string {
|
||||
return ctx.Value("workflow_id").(string)
|
||||
}
|
||||
|
@@ -1,9 +1,10 @@
|
||||
package nodeprocessor
|
||||
package processor
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/usual2970/certimate/internal/domain"
|
||||
nodes "github.com/usual2970/certimate/internal/workflow/node-processor"
|
||||
)
|
||||
|
||||
type workflowProcessor struct {
|
||||
@@ -23,23 +24,23 @@ func (w *workflowProcessor) Log(ctx context.Context) []domain.WorkflowRunLog {
|
||||
}
|
||||
|
||||
func (w *workflowProcessor) Run(ctx context.Context) error {
|
||||
ctx = WithWorkflowId(ctx, w.workflow.Id)
|
||||
return w.runNode(ctx, w.workflow.Content)
|
||||
ctx = setContextWorkflowId(ctx, w.workflow.Id)
|
||||
return w.processNode(ctx, w.workflow.Content)
|
||||
}
|
||||
|
||||
func (w *workflowProcessor) runNode(ctx context.Context, node *domain.WorkflowNode) error {
|
||||
func (w *workflowProcessor) processNode(ctx context.Context, node *domain.WorkflowNode) error {
|
||||
current := node
|
||||
for current != nil {
|
||||
if current.Type == domain.WorkflowNodeTypeBranch {
|
||||
for _, branch := range current.Branches {
|
||||
if err := w.runNode(ctx, &branch); err != nil {
|
||||
if err := w.processNode(ctx, &branch); err != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if current.Type != domain.WorkflowNodeTypeBranch {
|
||||
processor, err := GetProcessor(current)
|
||||
processor, err := nodes.GetProcessor(current)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -61,10 +62,6 @@ func (w *workflowProcessor) runNode(ctx context.Context, node *domain.WorkflowNo
|
||||
return nil
|
||||
}
|
||||
|
||||
func WithWorkflowId(ctx context.Context, id string) context.Context {
|
||||
func setContextWorkflowId(ctx context.Context, id string) context.Context {
|
||||
return context.WithValue(ctx, "workflow_id", id)
|
||||
}
|
||||
|
||||
func GetWorkflowId(ctx context.Context) string {
|
||||
return ctx.Value("workflow_id").(string)
|
||||
}
|
@@ -9,7 +9,7 @@ import (
|
||||
|
||||
"github.com/usual2970/certimate/internal/app"
|
||||
"github.com/usual2970/certimate/internal/domain"
|
||||
nodeprocessor "github.com/usual2970/certimate/internal/workflow/node-processor"
|
||||
processor "github.com/usual2970/certimate/internal/workflow/processor"
|
||||
)
|
||||
|
||||
const defaultRoutines = 10
|
||||
@@ -129,7 +129,7 @@ func (s *WorkflowService) run(ctx context.Context, runData *workflowRunData) err
|
||||
StartedAt: time.Now(),
|
||||
EndedAt: time.Now(),
|
||||
}
|
||||
processor := nodeprocessor.NewWorkflowProcessor(workflow)
|
||||
processor := processor.NewWorkflowProcessor(workflow)
|
||||
if err := processor.Run(ctx); err != nil {
|
||||
run.Status = domain.WorkflowRunStatusTypeFailed
|
||||
run.EndedAt = time.Now()
|
||||
|
Reference in New Issue
Block a user