fix conflict

This commit is contained in:
yoan
2025-01-19 19:02:58 +08:00
142 changed files with 3458 additions and 8971 deletions

View File

@@ -5,49 +5,67 @@ import (
"fmt"
"github.com/pocketbase/pocketbase/core"
"github.com/pocketbase/pocketbase/models"
"github.com/usual2970/certimate/internal/app"
"github.com/usual2970/certimate/internal/domain"
"github.com/usual2970/certimate/internal/domain/dtos"
"github.com/usual2970/certimate/internal/repository"
)
const tableName = "workflow"
func Register() {
app := app.GetApp()
app.OnRecordCreateRequest(domain.CollectionNameWorkflow).BindFunc(func(e *core.RecordRequestEvent) error {
if err := e.Next(); err != nil {
return err
}
app.OnRecordAfterCreateRequest(tableName).Add(func(e *core.RecordCreateEvent) error {
return update(e.HttpContext.Request().Context(), e.Record)
if err := onWorkflowRecordCreateOrUpdate(e.Request.Context(), e.Record); err != nil {
return err
}
return nil
})
app.OnRecordUpdateRequest(domain.CollectionNameWorkflow).BindFunc(func(e *core.RecordRequestEvent) error {
if err := e.Next(); err != nil {
return err
}
app.OnRecordAfterUpdateRequest(tableName).Add(func(e *core.RecordUpdateEvent) error {
return update(e.HttpContext.Request().Context(), e.Record)
if err := onWorkflowRecordCreateOrUpdate(e.Request.Context(), e.Record); err != nil {
return err
}
return nil
})
app.OnRecordDeleteRequest(domain.CollectionNameWorkflow).BindFunc(func(e *core.RecordRequestEvent) error {
if err := e.Next(); err != nil {
return err
}
app.OnRecordAfterDeleteRequest(tableName).Add(func(e *core.RecordDeleteEvent) error {
return delete(e.HttpContext.Request().Context(), e.Record)
if err := onWorkflowRecordDelete(e.Request.Context(), e.Record); err != nil {
return err
}
return nil
})
}
func update(ctx context.Context, record *models.Record) error {
func onWorkflowRecordCreateOrUpdate(ctx context.Context, record *core.Record) error {
scheduler := app.GetScheduler()
// 向数据库插入/更新时,同时更新定时任务
workflowId := record.GetId()
workflowId := record.Id
enabled := record.GetBool("enabled")
trigger := record.GetString("trigger")
// 如果是手动触发或未启用,移除定时任务
if !enabled || trigger == string(domain.WorkflowTriggerTypeManual) {
scheduler.Remove(fmt.Sprintf("workflow#%s", workflowId))
scheduler.Start()
return nil
}
// 反之,重新添加定时任务
err := scheduler.Add(fmt.Sprintf("workflow#%s", workflowId), record.GetString("triggerCron"), func() {
NewWorkflowService(repository.NewWorkflowRepository()).Run(ctx, &domain.WorkflowRunReq{
NewWorkflowService(repository.NewWorkflowRepository()).Run(ctx, &dtos.WorkflowRunReq{
WorkflowId: workflowId,
Trigger: domain.WorkflowTriggerTypeAuto,
})
@@ -57,18 +75,15 @@ func update(ctx context.Context, record *models.Record) error {
return fmt.Errorf("add cron job failed: %w", err)
}
scheduler.Start()
return nil
}
func delete(_ context.Context, record *models.Record) error {
func onWorkflowRecordDelete(_ context.Context, record *core.Record) error {
scheduler := app.GetScheduler()
// 从数据库删除时,同时移除定时任务
workflowId := record.GetId()
workflowId := record.Id
scheduler.Remove(fmt.Sprintf("workflow#%s", workflowId))
scheduler.Start()
return nil
}

View File

@@ -77,14 +77,14 @@ func (a *applyNode) Run(ctx context.Context) error {
ACMECertStableUrl: applyResult.ACMECertStableUrl,
EffectAt: certX509.NotBefore,
ExpireAt: certX509.NotAfter,
WorkflowId: GetWorkflowId(ctx),
WorkflowId: getContextWorkflowId(ctx),
WorkflowNodeId: a.node.Id,
}
// 保存执行结果
// TODO: 先保持一个节点始终只有一个输出,后续增加版本控制
currentOutput := &domain.WorkflowOutput{
WorkflowId: GetWorkflowId(ctx),
WorkflowId: getContextWorkflowId(ctx),
NodeId: a.node.Id,
Node: a.node,
Succeeded: true,
@@ -109,32 +109,32 @@ func (a *applyNode) Run(ctx context.Context) error {
}
func (a *applyNode) checkCanSkip(ctx context.Context, lastOutput *domain.WorkflowOutput) (skip bool, reason string) {
const validityDuration = time.Hour * 24 * 10
// TODO: 可控制是否强制申请
if lastOutput != nil && lastOutput.Succeeded {
// 比较和上次申请时的关键配置(即影响证书签发的)参数是否一致
if lastOutput.Node.GetConfigString("domains") != a.node.GetConfigString("domains") {
currentNodeConfig := a.node.GetConfigForApply()
lastNodeConfig := lastOutput.Node.GetConfigForApply()
if currentNodeConfig.Domains != lastNodeConfig.Domains {
return false, "配置项变化:域名"
}
if lastOutput.Node.GetConfigString("contactEmail") != a.node.GetConfigString("contactEmail") {
if currentNodeConfig.ContactEmail != lastNodeConfig.ContactEmail {
return false, "配置项变化:联系邮箱"
}
if lastOutput.Node.GetConfigString("provider") != a.node.GetConfigString("provider") {
if currentNodeConfig.ProviderAccessId != lastNodeConfig.ProviderAccessId {
return false, "配置项变化DNS 提供商授权"
}
if !maps.Equal(lastOutput.Node.GetConfigMap("providerConfig"), a.node.GetConfigMap("providerConfig")) {
if !maps.Equal(currentNodeConfig.ProviderConfig, lastNodeConfig.ProviderConfig) {
return false, "配置项变化DNS 提供商参数"
}
if lastOutput.Node.GetConfigString("keyAlgorithm") != a.node.GetConfigString("keyAlgorithm") {
if currentNodeConfig.KeyAlgorithm != lastNodeConfig.KeyAlgorithm {
return false, "配置项变化:数字签名算法"
}
lastCertificate, _ := a.certRepo.GetByWorkflowNodeId(ctx, a.node.Id)
if lastCertificate != nil && time.Until(lastCertificate.ExpireAt) > validityDuration {
renewalInterval := time.Duration(currentNodeConfig.SkipBeforeExpiryDays) * time.Hour * 24
if lastCertificate != nil && time.Until(lastCertificate.ExpireAt) > renewalInterval {
return true, "已申请过证书,且证书尚未临近过期"
}
}
return false, "无历史申请记录"
return false, ""
}

View File

@@ -38,13 +38,13 @@ func (d *deployNode) Run(ctx context.Context) error {
}
// 获取前序节点输出证书
certSource := d.node.GetConfigString("certificate")
certSourceSlice := strings.Split(certSource, "#")
if len(certSourceSlice) != 2 {
d.AddOutput(ctx, d.node.Name, "证书来源配置错误", certSource)
return fmt.Errorf("证书来源配置错误: %s", certSource)
previousNodeOutputCertificateSource := d.node.GetConfigForDeploy().Certificate
previousNodeOutputCertificateSourceSlice := strings.Split(previousNodeOutputCertificateSource, "#")
if len(previousNodeOutputCertificateSourceSlice) != 2 {
d.AddOutput(ctx, d.node.Name, "证书来源配置错误", previousNodeOutputCertificateSource)
return fmt.Errorf("证书来源配置错误: %s", previousNodeOutputCertificateSource)
}
certificate, err := d.certRepo.GetByWorkflowNodeId(ctx, certSourceSlice[0])
certificate, err := d.certRepo.GetByWorkflowNodeId(ctx, previousNodeOutputCertificateSourceSlice[0])
if err != nil {
d.AddOutput(ctx, d.node.Name, "获取证书失败", err.Error())
return err
@@ -81,7 +81,7 @@ func (d *deployNode) Run(ctx context.Context) error {
// TODO: 先保持一个节点始终只有一个输出,后续增加版本控制
currentOutput := &domain.WorkflowOutput{
Meta: domain.Meta{},
WorkflowId: GetWorkflowId(ctx),
WorkflowId: getContextWorkflowId(ctx),
NodeId: d.node.Id,
Node: d.node,
Succeeded: true,
@@ -99,18 +99,21 @@ func (d *deployNode) Run(ctx context.Context) error {
}
func (d *deployNode) checkCanSkip(ctx context.Context, lastOutput *domain.WorkflowOutput) (skip bool, reason string) {
// TODO: 可控制是否强制部署
if lastOutput != nil && lastOutput.Succeeded {
// 比较和上次部署时的关键配置(即影响证书部署的)参数是否一致
if lastOutput.Node.GetConfigString("provider") != d.node.GetConfigString("provider") {
currentNodeConfig := d.node.GetConfigForDeploy()
lastNodeConfig := lastOutput.Node.GetConfigForDeploy()
if currentNodeConfig.ProviderAccessId != lastNodeConfig.ProviderAccessId {
return false, "配置项变化:主机提供商授权"
}
if !maps.Equal(lastOutput.Node.GetConfigMap("providerConfig"), d.node.GetConfigMap("providerConfig")) {
if !maps.Equal(currentNodeConfig.ProviderConfig, lastNodeConfig.ProviderConfig) {
return false, "配置项变化:主机提供商参数"
}
return true, "已部署过证书"
if currentNodeConfig.SkipOnLastSucceeded {
return true, "已部署过证书"
}
}
return false, "无历史部署记录"
return false, ""
}

View File

@@ -10,7 +10,7 @@ import (
type notifyNode struct {
node *domain.WorkflowNode
settingsRepo settingRepository
settingsRepo settingsRepository
*nodeLogger
}
@@ -25,6 +25,8 @@ func NewNotifyNode(node *domain.WorkflowNode) *notifyNode {
func (n *notifyNode) Run(ctx context.Context) error {
n.AddOutput(ctx, n.node.Name, "开始执行")
nodeConfig := n.node.GetConfigForNotify()
// 获取通知配置
settings, err := n.settingsRepo.GetByName(ctx, "notifyChannels")
if err != nil {
@@ -33,18 +35,14 @@ func (n *notifyNode) Run(ctx context.Context) error {
}
// 获取通知渠道
channelConfig, err := settings.GetNotifyChannelConfig(n.node.GetConfigString("channel"))
channelConfig, err := settings.GetNotifyChannelConfig(nodeConfig.Channel)
if err != nil {
n.AddOutput(ctx, n.node.Name, "获取通知渠道配置失败", err.Error())
return err
}
// 发送通知
if err := notify.SendToChannel(n.node.GetConfigString("subject"),
n.node.GetConfigString("message"),
n.node.GetConfigString("channel"),
channelConfig,
); err != nil {
if err := notify.SendToChannel(nodeConfig.Subject, nodeConfig.Message, nodeConfig.Channel, channelConfig); err != nil {
n.AddOutput(ctx, n.node.Name, "发送通知失败", err.Error())
return err
}

View File

@@ -8,7 +8,7 @@ import (
"github.com/usual2970/certimate/internal/domain"
)
type nodeProcessor interface {
type NodeProcessor interface {
Run(ctx context.Context) error
Log(ctx context.Context) *domain.WorkflowRunLog
AddOutput(ctx context.Context, title, content string, err ...string)
@@ -18,6 +18,19 @@ type nodeLogger struct {
log *domain.WorkflowRunLog
}
type certificateRepository interface {
GetByWorkflowNodeId(ctx context.Context, workflowNodeId string) (*domain.Certificate, error)
}
type workflowOutputRepository interface {
GetByNodeId(ctx context.Context, nodeId string) (*domain.WorkflowOutput, error)
Save(ctx context.Context, output *domain.WorkflowOutput, certificate *domain.Certificate, cb func(id string) error) error
}
type settingsRepository interface {
GetByName(ctx context.Context, name string) (*domain.Settings, error)
}
func NewNodeLogger(node *domain.WorkflowNode) *nodeLogger {
return &nodeLogger{
log: &domain.WorkflowRunLog{
@@ -45,7 +58,7 @@ func (l *nodeLogger) AddOutput(ctx context.Context, title, content string, err .
l.log.Outputs = append(l.log.Outputs, output)
}
func GetProcessor(node *domain.WorkflowNode) (nodeProcessor, error) {
func GetProcessor(node *domain.WorkflowNode) (NodeProcessor, error) {
switch node.Type {
case domain.WorkflowNodeTypeStart:
return NewStartNode(node), nil
@@ -65,15 +78,6 @@ func GetProcessor(node *domain.WorkflowNode) (nodeProcessor, error) {
return nil, errors.New("not implemented")
}
type certificateRepository interface {
GetByWorkflowNodeId(ctx context.Context, workflowNodeId string) (*domain.Certificate, error)
}
type workflowOutputRepository interface {
GetByNodeId(ctx context.Context, nodeId string) (*domain.WorkflowOutput, error)
Save(ctx context.Context, output *domain.WorkflowOutput, certificate *domain.Certificate, cb func(id string) error) error
}
type settingRepository interface {
GetByName(ctx context.Context, name string) (*domain.Settings, error)
func getContextWorkflowId(ctx context.Context) string {
return ctx.Value("workflow_id").(string)
}

View File

@@ -1,9 +1,10 @@
package nodeprocessor
package processor
import (
"context"
"github.com/usual2970/certimate/internal/domain"
nodes "github.com/usual2970/certimate/internal/workflow/node-processor"
)
type workflowProcessor struct {
@@ -23,26 +24,26 @@ func (w *workflowProcessor) Log(ctx context.Context) []domain.WorkflowRunLog {
}
func (w *workflowProcessor) Run(ctx context.Context) error {
ctx = WithWorkflowId(ctx, w.workflow.Id)
return w.runNode(ctx, w.workflow.Content)
ctx = setContextWorkflowId(ctx, w.workflow.Id)
return w.processNode(ctx, w.workflow.Content)
}
func (w *workflowProcessor) runNode(ctx context.Context, node *domain.WorkflowNode) error {
func (w *workflowProcessor) processNode(ctx context.Context, node *domain.WorkflowNode) error {
current := node
for current != nil {
if current.Type == domain.WorkflowNodeTypeBranch || current.Type == domain.WorkflowNodeTypeExecuteResultBranch {
for _, branch := range current.Branches {
if err := w.runNode(ctx, &branch); err != nil {
if err := w.processNode(ctx, &branch); err != nil {
continue
}
}
}
var runErr error
var processor nodeProcessor
var processor nodes.NodeProcessor
for {
if current.Type != domain.WorkflowNodeTypeBranch && current.Type != domain.WorkflowNodeTypeExecuteResultBranch {
processor, runErr = GetProcessor(current)
processor, runErr = nodes.GetProcessor(current)
if runErr != nil {
break
}
@@ -74,7 +75,7 @@ func (w *workflowProcessor) runNode(ctx context.Context, node *domain.WorkflowNo
return nil
}
func WithWorkflowId(ctx context.Context, id string) context.Context {
func setContextWorkflowId(ctx context.Context, id string) context.Context {
return context.WithValue(ctx, "workflow_id", id)
}

View File

@@ -9,21 +9,22 @@ import (
"github.com/usual2970/certimate/internal/app"
"github.com/usual2970/certimate/internal/domain"
nodeprocessor "github.com/usual2970/certimate/internal/workflow/node-processor"
"github.com/usual2970/certimate/internal/domain/dtos"
processor "github.com/usual2970/certimate/internal/workflow/processor"
)
const defaultRoutines = 10
type workflowRunData struct {
Workflow *domain.Workflow
Options *domain.WorkflowRunReq
Workflow *domain.Workflow
RunTrigger domain.WorkflowTriggerType
}
type workflowRepository interface {
ListEnabledAuto(ctx context.Context) ([]*domain.Workflow, error)
GetById(ctx context.Context, id string) (*domain.Workflow, error)
Save(ctx context.Context, workflow *domain.Workflow) error
SaveRun(ctx context.Context, run *domain.WorkflowRun) error
SaveRun(ctx context.Context, workflowRun *domain.WorkflowRun) error
}
type WorkflowService struct {
@@ -74,7 +75,7 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error {
scheduler := app.GetScheduler()
for _, workflow := range workflows {
err := scheduler.Add(fmt.Sprintf("workflow#%s", workflow.Id), workflow.TriggerCron, func() {
s.Run(ctx, &domain.WorkflowRunReq{
s.Run(ctx, &dtos.WorkflowRunReq{
WorkflowId: workflow.Id,
Trigger: domain.WorkflowTriggerTypeAuto,
})
@@ -84,18 +85,15 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error {
return err
}
}
scheduler.Start()
app.GetLogger().Info("workflow schedule started")
return nil
}
func (s *WorkflowService) Run(ctx context.Context, options *domain.WorkflowRunReq) error {
func (s *WorkflowService) Run(ctx context.Context, req *dtos.WorkflowRunReq) error {
// 查询
workflow, err := s.repo.GetById(ctx, options.WorkflowId)
workflow, err := s.repo.GetById(ctx, req.WorkflowId)
if err != nil {
app.GetLogger().Error("failed to get workflow", "id", options.WorkflowId, "err", err)
app.GetLogger().Error("failed to get workflow", "id", req.WorkflowId, "err", err)
return err
}
@@ -113,8 +111,8 @@ func (s *WorkflowService) Run(ctx context.Context, options *domain.WorkflowRunRe
}
s.ch <- &workflowRunData{
Workflow: workflow,
Options: options,
Workflow: workflow,
RunTrigger: req.Trigger,
}
return nil
@@ -123,16 +121,15 @@ func (s *WorkflowService) Run(ctx context.Context, options *domain.WorkflowRunRe
func (s *WorkflowService) run(ctx context.Context, runData *workflowRunData) error {
// 执行
workflow := runData.Workflow
options := runData.Options
run := &domain.WorkflowRun{
WorkflowId: workflow.Id,
Status: domain.WorkflowRunStatusTypeRunning,
Trigger: options.Trigger,
Trigger: runData.RunTrigger,
StartedAt: time.Now(),
EndedAt: time.Now(),
}
processor := nodeprocessor.NewWorkflowProcessor(workflow)
processor := processor.NewWorkflowProcessor(workflow)
if err := processor.Run(ctx); err != nil {
run.Status = domain.WorkflowRunStatusTypeFailed
run.EndedAt = time.Now()
@@ -165,7 +162,7 @@ func (s *WorkflowService) run(ctx context.Context, runData *workflowRunData) err
return nil
}
func (s *WorkflowService) Stop() {
func (s *WorkflowService) Stop(ctx context.Context) {
s.cancel()
s.wg.Wait()
}