feat: auto cleanup workflow history runs and expired certificates

This commit is contained in:
Fu Diwei
2025-03-19 17:12:24 +08:00
parent 914c5b4870
commit e27d4f11ee
19 changed files with 355 additions and 55 deletions

View File

@@ -11,6 +11,8 @@ import (
"time"
"github.com/go-acme/lego/v4/certcrypto"
"github.com/pocketbase/dbx"
"github.com/usual2970/certimate/internal/app"
"github.com/usual2970/certimate/internal/domain"
"github.com/usual2970/certimate/internal/domain/dtos"
@@ -27,21 +29,29 @@ const (
type certificateRepository interface {
ListExpireSoon(ctx context.Context) ([]*domain.Certificate, error)
GetById(ctx context.Context, id string) (*domain.Certificate, error)
DeleteWhere(ctx context.Context, exprs ...dbx.Expression) (int, error)
}
type settingsRepository interface {
GetByName(ctx context.Context, name string) (*domain.Settings, error)
}
type CertificateService struct {
certRepo certificateRepository
certificateRepo certificateRepository
settingsRepo settingsRepository
}
func NewCertificateService(certRepo certificateRepository) *CertificateService {
func NewCertificateService(certificateRepo certificateRepository, settingsRepo settingsRepository) *CertificateService {
return &CertificateService{
certRepo: certRepo,
certificateRepo: certificateRepo,
settingsRepo: settingsRepo,
}
}
func (s *CertificateService) InitSchedule(ctx context.Context) error {
// 每日发送过期证书提醒
app.GetScheduler().MustAdd("certificateExpireSoonNotify", "0 0 * * *", func() {
certificates, err := s.certRepo.ListExpireSoon(context.Background())
certificates, err := s.certificateRepo.ListExpireSoon(context.Background())
if err != nil {
app.GetLogger().Error("failed to get certificates which expire soon", "err", err)
return
@@ -56,11 +66,37 @@ func (s *CertificateService) InitSchedule(ctx context.Context) error {
app.GetLogger().Error("failed to send notification", "err", err)
}
})
// 每日清理过期证书
app.GetScheduler().MustAdd("certificateExpiredCleanup", "0 0 * * *", func() {
settings, err := s.settingsRepo.GetByName(ctx, "persistence")
if err != nil {
app.GetLogger().Error("failed to get persistence settings", "err", err)
return
}
var settingsContent *domain.PersistenceSettingsContent
json.Unmarshal([]byte(settings.Content), &settingsContent)
if settingsContent != nil && settingsContent.ExpiredCertificatesMaxDaysRetention != 0 {
ret, err := s.certificateRepo.DeleteWhere(
context.Background(),
dbx.NewExp(fmt.Sprintf("expireAt<DATETIME('now', '-%d days')", settingsContent.ExpiredCertificatesMaxDaysRetention)),
)
if err != nil {
app.GetLogger().Error("failed to delete expired certificates", "err", err)
}
if ret > 0 {
app.GetLogger().Info(fmt.Sprintf("cleanup %d expired certificates", ret))
}
}
})
return nil
}
func (s *CertificateService) ArchiveFile(ctx context.Context, req *dtos.CertificateArchiveFileReq) (*dtos.CertificateArchiveFileResp, error) {
certificate, err := s.certRepo.GetById(ctx, req.CertificateId)
certificate, err := s.certificateRepo.GetById(ctx, req.CertificateId)
if err != nil {
return nil, err
}

View File

@@ -176,7 +176,7 @@ func createDeployer(options *deployerOptions) (deployer.Deployer, error) {
AccessKeyId: access.AccessKeyId,
AccessKeySecret: access.AccessKeySecret,
Region: maputil.GetString(options.ProviderDeployConfig, "region"),
ServiceVersion: maputil.GetString(options.ProviderDeployConfig, "serviceVersion"),
ServiceVersion: maputil.GetOrDefaultString(options.ProviderDeployConfig, "serviceVersion", "3.0"),
Domain: maputil.GetString(options.ProviderDeployConfig, "domain"),
})
return deployer, err

View File

@@ -14,12 +14,10 @@ type Settings struct {
}
type NotifyTemplatesSettingsContent struct {
NotifyTemplates []NotifyTemplate `json:"notifyTemplates"`
}
type NotifyTemplate struct {
Subject string `json:"subject"`
Message string `json:"message"`
NotifyTemplates []struct {
Subject string `json:"subject"`
Message string `json:"message"`
} `json:"notifyTemplates"`
}
type NotifyChannelsSettingsContent map[string]map[string]any
@@ -37,3 +35,8 @@ func (s *Settings) GetNotifyChannelConfig(channel string) (map[string]any, error
return v, nil
}
type PersistenceSettingsContent struct {
WorkflowRunsMaxDaysRetention int `json:"workflowRunsMaxDaysRetention"`
ExpiredCertificatesMaxDaysRetention int `json:"expiredCertificatesMaxDaysRetention"`
}

View File

@@ -23,7 +23,6 @@ type DeployerConfig struct {
// 阿里云地域。
Region string `json:"region"`
// 服务版本。
// 零值时默认为 "3.0"。
ServiceVersion string `json:"serviceVersion"`
// 自定义域名(不支持泛域名)。
Domain string `json:"domain"`
@@ -70,7 +69,7 @@ func (d *DeployerProvider) WithLogger(logger *slog.Logger) deployer.Deployer {
func (d *DeployerProvider) Deploy(ctx context.Context, certPem string, privkeyPem string) (*deployer.DeployResult, error) {
switch d.config.ServiceVersion {
case "", "3.0":
case "3.0":
if err := d.deployToFC3(ctx, certPem, privkeyPem); err != nil {
return nil, err
}

View File

@@ -67,11 +67,9 @@ func (r *CertificateRepository) GetByWorkflowNodeId(ctx context.Context, workflo
dbx.Params{"workflowNodeId": workflowNodeId},
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, domain.ErrRecordNotFound
}
return nil, err
}
if len(records) == 0 {
return nil, domain.ErrRecordNotFound
}
@@ -125,6 +123,29 @@ func (r *CertificateRepository) Save(ctx context.Context, certificate *domain.Ce
return certificate, nil
}
func (r *CertificateRepository) DeleteWhere(ctx context.Context, exprs ...dbx.Expression) (int, error) {
records, err := app.GetApp().FindAllRecords(domain.CollectionNameCertificate, exprs...)
if err != nil {
return 0, nil
}
var ret int
var errs []error
for _, record := range records {
if err := app.GetApp().Delete(record); err != nil {
errs = append(errs, err)
} else {
ret++
}
}
if len(errs) > 0 {
return ret, errors.Join(errs...)
}
return ret, nil
}
func (r *CertificateRepository) castRecordToModel(record *core.Record) (*domain.Certificate, error) {
if record == nil {
return nil, fmt.Errorf("record is nil")

View File

@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"github.com/pocketbase/dbx"
"github.com/pocketbase/pocketbase/core"
"github.com/usual2970/certimate/internal/app"
"github.com/usual2970/certimate/internal/domain"
@@ -96,6 +97,29 @@ func (r *WorkflowRunRepository) Save(ctx context.Context, workflowRun *domain.Wo
return workflowRun, nil
}
func (r *WorkflowRunRepository) DeleteWhere(ctx context.Context, exprs ...dbx.Expression) (int, error) {
records, err := app.GetApp().FindAllRecords(domain.CollectionNameWorkflowRun, exprs...)
if err != nil {
return 0, nil
}
var ret int
var errs []error
for _, record := range records {
if err := app.GetApp().Delete(record); err != nil {
errs = append(errs, err)
} else {
ret++
}
}
if len(errs) > 0 {
return ret, errors.Join(errs...)
}
return ret, nil
}
func (r *WorkflowRunRepository) castRecordToModel(record *core.Record) (*domain.WorkflowRun, error) {
if record == nil {
return nil, fmt.Errorf("record is nil")

View File

@@ -23,17 +23,15 @@ var (
)
func Register(router *router.Router[*core.RequestEvent]) {
certificateRepo := repository.NewCertificateRepository()
certificateSvc = certificate.NewCertificateService(certificateRepo)
workflowRepo := repository.NewWorkflowRepository()
workflowRunRepo := repository.NewWorkflowRunRepository()
workflowSvc = workflow.NewWorkflowService(workflowRepo, workflowRunRepo)
statisticsRepo := repository.NewStatisticsRepository()
statisticsSvc = statistics.NewStatisticsService(statisticsRepo)
certificateRepo := repository.NewCertificateRepository()
settingsRepo := repository.NewSettingsRepository()
statisticsRepo := repository.NewStatisticsRepository()
certificateSvc = certificate.NewCertificateService(certificateRepo, settingsRepo)
workflowSvc = workflow.NewWorkflowService(workflowRepo, workflowRunRepo, settingsRepo)
statisticsSvc = statistics.NewStatisticsService(statisticsRepo)
notifySvc = notify.NewNotifyService(settingsRepo)
group := router.Group("/api")

View File

@@ -10,10 +10,11 @@ import (
func Register() {
workflowRepo := repository.NewWorkflowRepository()
workflowRunRepo := repository.NewWorkflowRunRepository()
workflowSvc := workflow.NewWorkflowService(workflowRepo, workflowRunRepo)
certificateRepo := repository.NewCertificateRepository()
certificateSvc := certificate.NewCertificateService(certificateRepo)
settingsRepo := repository.NewSettingsRepository()
workflowSvc := workflow.NewWorkflowService(workflowRepo, workflowRunRepo, settingsRepo)
certificateSvc := certificate.NewCertificateService(certificateRepo, settingsRepo)
if err := InitWorkflowScheduler(workflowSvc); err != nil {
app.GetLogger().Error("failed to init workflow scheduler", "err", err)

View File

@@ -65,7 +65,7 @@ func onWorkflowRecordCreateOrUpdate(ctx context.Context, record *core.Record) er
// 反之,重新添加定时任务
err := scheduler.Add(fmt.Sprintf("workflow#%s", workflowId), record.GetString("triggerCron"), func() {
workflowSrv := NewWorkflowService(repository.NewWorkflowRepository(), repository.NewWorkflowRunRepository())
workflowSrv := NewWorkflowService(repository.NewWorkflowRepository(), repository.NewWorkflowRunRepository(), repository.NewSettingsRepository())
workflowSrv.StartRun(ctx, &dtos.WorkflowStartRunReq{
WorkflowId: workflowId,
RunTrigger: domain.WorkflowTriggerTypeAuto,

View File

@@ -2,10 +2,13 @@ package workflow
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/pocketbase/dbx"
"github.com/usual2970/certimate/internal/app"
"github.com/usual2970/certimate/internal/domain"
"github.com/usual2970/certimate/internal/domain/dtos"
@@ -21,6 +24,11 @@ type workflowRepository interface {
type workflowRunRepository interface {
GetById(ctx context.Context, id string) (*domain.WorkflowRun, error)
Save(ctx context.Context, workflowRun *domain.WorkflowRun) (*domain.WorkflowRun, error)
DeleteWhere(ctx context.Context, exprs ...dbx.Expression) (int, error)
}
type settingsRepository interface {
GetByName(ctx context.Context, name string) (*domain.Settings, error)
}
type WorkflowService struct {
@@ -28,40 +36,71 @@ type WorkflowService struct {
workflowRepo workflowRepository
workflowRunRepo workflowRunRepository
settingsRepo settingsRepository
}
func NewWorkflowService(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository) *WorkflowService {
func NewWorkflowService(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository, settingsRepo settingsRepository) *WorkflowService {
srv := &WorkflowService{
dispatcher: dispatcher.GetSingletonDispatcher(),
workflowRepo: workflowRepo,
workflowRunRepo: workflowRunRepo,
settingsRepo: settingsRepo,
}
return srv
}
func (s *WorkflowService) InitSchedule(ctx context.Context) error {
workflows, err := s.workflowRepo.ListEnabledAuto(ctx)
if err != nil {
return err
}
scheduler := app.GetScheduler()
for _, workflow := range workflows {
var errs []error
err := scheduler.Add(fmt.Sprintf("workflow#%s", workflow.Id), workflow.TriggerCron, func() {
s.StartRun(ctx, &dtos.WorkflowStartRunReq{
WorkflowId: workflow.Id,
RunTrigger: domain.WorkflowTriggerTypeAuto,
})
})
// 每日清理工作流执行历史
app.GetScheduler().MustAdd("workflowHistoryRunsCleanup", "0 0 * * *", func() {
settings, err := s.settingsRepo.GetByName(ctx, "persistence")
if err != nil {
errs = append(errs, err)
app.GetLogger().Error("failed to get persistence settings", "err", err)
return
}
if len(errs) > 0 {
return errors.Join(errs...)
var settingsContent *domain.PersistenceSettingsContent
json.Unmarshal([]byte(settings.Content), &settingsContent)
if settingsContent != nil && settingsContent.WorkflowRunsMaxDaysRetention != 0 {
ret, err := s.workflowRunRepo.DeleteWhere(
context.Background(),
dbx.NewExp(fmt.Sprintf("status!='%s'", string(domain.WorkflowRunStatusTypePending))),
dbx.NewExp(fmt.Sprintf("status!='%s'", string(domain.WorkflowRunStatusTypeRunning))),
dbx.NewExp(fmt.Sprintf("endedAt<DATETIME('now', '-%d days')", settingsContent.WorkflowRunsMaxDaysRetention)),
)
if err != nil {
app.GetLogger().Error("failed to delete workflow history runs", "err", err)
}
if ret > 0 {
app.GetLogger().Info(fmt.Sprintf("cleanup %d workflow history runs", ret))
}
}
})
// 工作流
{
workflows, err := s.workflowRepo.ListEnabledAuto(ctx)
if err != nil {
return err
}
for _, workflow := range workflows {
var errs []error
err := app.GetScheduler().Add(fmt.Sprintf("workflow#%s", workflow.Id), workflow.TriggerCron, func() {
s.StartRun(ctx, &dtos.WorkflowStartRunReq{
WorkflowId: workflow.Id,
RunTrigger: domain.WorkflowTriggerTypeAuto,
})
})
if err != nil {
errs = append(errs, err)
}
if len(errs) > 0 {
return errors.Join(errs...)
}
}
}