mirror of
https://github.com/usual2970/certimate.git
synced 2025-10-04 21:44:54 +00:00
feat: workflow run status & time
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"sync"
|
||||
|
||||
"github.com/pocketbase/pocketbase"
|
||||
@@ -19,3 +20,7 @@ func GetApp() *pocketbase.PocketBase {
|
||||
|
||||
return instance
|
||||
}
|
||||
|
||||
func GetLogger() *slog.Logger {
|
||||
return GetApp().Logger()
|
||||
}
|
||||
|
@@ -37,21 +37,21 @@ func (s *certificateService) InitSchedule(ctx context.Context) error {
|
||||
err := scheduler.Add("certificate", "0 0 * * *", func() {
|
||||
certs, err := s.repo.ListExpireSoon(context.Background())
|
||||
if err != nil {
|
||||
app.GetApp().Logger().Error("failed to get expire soon certificate", "err", err)
|
||||
app.GetLogger().Error("failed to get expire soon certificate", "err", err)
|
||||
return
|
||||
}
|
||||
msg := buildMsg(certs)
|
||||
// TODO: 空指针 Bug
|
||||
if err := notify.SendToAllChannels(msg.Subject, msg.Message); err != nil {
|
||||
app.GetApp().Logger().Error("failed to send expire soon certificate", "err", err)
|
||||
app.GetLogger().Error("failed to send expire soon certificate", "err", err)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
app.GetApp().Logger().Error("failed to add schedule", "err", err)
|
||||
app.GetLogger().Error("failed to add schedule", "err", err)
|
||||
return err
|
||||
}
|
||||
scheduler.Start()
|
||||
app.GetApp().Logger().Info("certificate schedule started")
|
||||
app.GetLogger().Info("certificate schedule started")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -2,25 +2,25 @@ package domain
|
||||
|
||||
import "time"
|
||||
|
||||
type Certificate struct {
|
||||
Meta
|
||||
Source string `json:"source" db:"source"`
|
||||
SubjectAltNames string `json:"subjectAltNames" db:"subjectAltNames"`
|
||||
Certificate string `json:"certificate" db:"certificate"`
|
||||
PrivateKey string `json:"privateKey" db:"privateKey"`
|
||||
IssuerCertificate string `json:"issuerCertificate" db:"issuerCertificate"`
|
||||
EffectAt time.Time `json:"effectAt" db:"effectAt"`
|
||||
ExpireAt time.Time `json:"expireAt" db:"expireAt"`
|
||||
AcmeCertUrl string `json:"acmeCertUrl" db:"acmeCertUrl"`
|
||||
AcmeCertStableUrl string `json:"acmeCertStableUrl" db:"acmeCertStableUrl"`
|
||||
WorkflowId string `json:"workflowId" db:"workflowId"`
|
||||
WorkflowNodeId string `json:"workflowNodeId" db:"workflowNodeId"`
|
||||
WorkflowOutputId string `json:"workflowOutputId" db:"workflowOutputId"`
|
||||
}
|
||||
|
||||
type CertificateSourceType string
|
||||
|
||||
const (
|
||||
CERTIFICATE_SOURCE_WORKFLOW = CertificateSourceType("workflow")
|
||||
CERTIFICATE_SOURCE_UPLOAD = CertificateSourceType("upload")
|
||||
CertificateSourceTypeWorkflow = CertificateSourceType("workflow")
|
||||
CertificateSourceTypeUpload = CertificateSourceType("upload")
|
||||
)
|
||||
|
||||
type Certificate struct {
|
||||
Meta
|
||||
Source CertificateSourceType `json:"source" db:"source"`
|
||||
SubjectAltNames string `json:"subjectAltNames" db:"subjectAltNames"`
|
||||
Certificate string `json:"certificate" db:"certificate"`
|
||||
PrivateKey string `json:"privateKey" db:"privateKey"`
|
||||
IssuerCertificate string `json:"issuerCertificate" db:"issuerCertificate"`
|
||||
EffectAt time.Time `json:"effectAt" db:"effectAt"`
|
||||
ExpireAt time.Time `json:"expireAt" db:"expireAt"`
|
||||
AcmeCertUrl string `json:"acmeCertUrl" db:"acmeCertUrl"`
|
||||
AcmeCertStableUrl string `json:"acmeCertStableUrl" db:"acmeCertStableUrl"`
|
||||
WorkflowId string `json:"workflowId" db:"workflowId"`
|
||||
WorkflowNodeId string `json:"workflowNodeId" db:"workflowNodeId"`
|
||||
WorkflowOutputId string `json:"workflowOutputId" db:"workflowOutputId"`
|
||||
}
|
||||
|
@@ -1,48 +1,58 @@
|
||||
package domain
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/usual2970/certimate/internal/pkg/utils/maps"
|
||||
)
|
||||
|
||||
const (
|
||||
WorkflowNodeTypeStart = "start"
|
||||
WorkflowNodeTypeEnd = "end"
|
||||
WorkflowNodeTypeApply = "apply"
|
||||
WorkflowNodeTypeDeploy = "deploy"
|
||||
WorkflowNodeTypeNotify = "notify"
|
||||
WorkflowNodeTypeBranch = "branch"
|
||||
WorkflowNodeTypeCondition = "condition"
|
||||
)
|
||||
type WorkflowNodeType string
|
||||
|
||||
const (
|
||||
WorkflowTriggerAuto = "auto"
|
||||
WorkflowTriggerManual = "manual"
|
||||
WorkflowNodeTypeStart = WorkflowNodeType("start")
|
||||
WorkflowNodeTypeEnd = WorkflowNodeType("end")
|
||||
WorkflowNodeTypeApply = WorkflowNodeType("apply")
|
||||
WorkflowNodeTypeDeploy = WorkflowNodeType("deploy")
|
||||
WorkflowNodeTypeNotify = WorkflowNodeType("notify")
|
||||
WorkflowNodeTypeBranch = WorkflowNodeType("branch")
|
||||
WorkflowNodeTypeCondition = WorkflowNodeType("condition")
|
||||
)
|
||||
|
||||
type WorkflowTriggerType string
|
||||
|
||||
const (
|
||||
WorkflowTriggerTypeAuto = WorkflowTriggerType("auto")
|
||||
WorkflowTriggerTypeManual = WorkflowTriggerType("manual")
|
||||
)
|
||||
|
||||
type Workflow struct {
|
||||
Meta
|
||||
Name string `json:"name"`
|
||||
Description string `json:"description"`
|
||||
Trigger string `json:"trigger"`
|
||||
TriggerCron string `json:"triggerCron"`
|
||||
Enabled bool `json:"enabled"`
|
||||
Content *WorkflowNode `json:"content"`
|
||||
Draft *WorkflowNode `json:"draft"`
|
||||
HasDraft bool `json:"hasDraft"`
|
||||
Name string `json:"name" db:"name"`
|
||||
Description string `json:"description" db:"description"`
|
||||
Trigger WorkflowTriggerType `json:"trigger" db:"trigger"`
|
||||
TriggerCron string `json:"triggerCron" db:"triggerCron"`
|
||||
Enabled bool `json:"enabled" db:"enabled"`
|
||||
Content *WorkflowNode `json:"content" db:"content"`
|
||||
Draft *WorkflowNode `json:"draft" db:"draft"`
|
||||
HasDraft bool `json:"hasDraft" db:"hasDraft"`
|
||||
LastRunId string `json:"lastRunId" db:"lastRunId"`
|
||||
LastRunStatus WorkflowRunStatusType `json:"lastRunStatus" db:"lastRunStatus"`
|
||||
LastRunTime time.Time `json:"lastRunTime" db:"lastRunTime"`
|
||||
}
|
||||
|
||||
type WorkflowNode struct {
|
||||
Id string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Next *WorkflowNode `json:"next"`
|
||||
Id string `json:"id"`
|
||||
Type WorkflowNodeType `json:"type"`
|
||||
Name string `json:"name"`
|
||||
|
||||
Config map[string]any `json:"config"`
|
||||
Inputs []WorkflowNodeIO `json:"inputs"`
|
||||
Outputs []WorkflowNodeIO `json:"outputs"`
|
||||
|
||||
Validated bool `json:"validated"`
|
||||
Type string `json:"type"`
|
||||
|
||||
Next *WorkflowNode `json:"next"`
|
||||
Branches []WorkflowNode `json:"branches"`
|
||||
|
||||
Validated bool `json:"validated"`
|
||||
}
|
||||
|
||||
func (n *WorkflowNode) GetConfigString(key string) string {
|
||||
@@ -76,5 +86,6 @@ type WorkflowNodeIOValueSelector struct {
|
||||
}
|
||||
|
||||
type WorkflowRunReq struct {
|
||||
Id string `json:"id"`
|
||||
WorkflowId string `json:"workflowId"`
|
||||
Trigger WorkflowTriggerType `json:"trigger"`
|
||||
}
|
||||
|
@@ -2,15 +2,24 @@ package domain
|
||||
|
||||
import "time"
|
||||
|
||||
type WorkflowRunStatusType string
|
||||
|
||||
const (
|
||||
WorkflowRunStatusTypePending WorkflowRunStatusType = "pending"
|
||||
WorkflowRunStatusTypeRunning WorkflowRunStatusType = "running"
|
||||
WorkflowRunStatusTypeSucceeded WorkflowRunStatusType = "succeeded"
|
||||
WorkflowRunStatusTypeFailed WorkflowRunStatusType = "failed"
|
||||
)
|
||||
|
||||
type WorkflowRun struct {
|
||||
Meta
|
||||
WorkflowId string `json:"workflowId" db:"workflowId"`
|
||||
Trigger string `json:"trigger" db:"trigger"`
|
||||
StartedAt time.Time `json:"startedAt" db:"startedAt"`
|
||||
CompletedAt time.Time `json:"completedAt" db:"completedAt"`
|
||||
Logs []WorkflowRunLog `json:"logs" db:"logs"`
|
||||
Succeeded bool `json:"succeeded" db:"succeeded"`
|
||||
Error string `json:"error" db:"error"`
|
||||
WorkflowId string `json:"workflowId" db:"workflowId"`
|
||||
Status WorkflowRunStatusType `json:"status" db:"status"`
|
||||
Trigger WorkflowTriggerType `json:"trigger" db:"trigger"`
|
||||
StartedAt time.Time `json:"startedAt" db:"startedAt"`
|
||||
EndedAt time.Time `json:"endedAt" db:"endedAt"`
|
||||
Logs []WorkflowRunLog `json:"logs" db:"logs"`
|
||||
Error string `json:"error" db:"error"`
|
||||
}
|
||||
|
||||
type WorkflowRunLog struct {
|
||||
|
@@ -24,16 +24,16 @@ func (a *AccessRepository) GetById(ctx context.Context, id string) (*domain.Acce
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rs := &domain.Access{
|
||||
access := &domain.Access{
|
||||
Meta: domain.Meta{
|
||||
Id: record.GetId(),
|
||||
CreatedAt: record.GetTime("created"),
|
||||
UpdatedAt: record.GetTime("updated"),
|
||||
CreatedAt: record.GetCreated().Time(),
|
||||
UpdatedAt: record.GetUpdated().Time(),
|
||||
},
|
||||
Name: record.GetString("name"),
|
||||
Provider: record.GetString("provider"),
|
||||
Config: record.GetString("config"),
|
||||
Usage: record.GetString("usage"),
|
||||
}
|
||||
return rs, nil
|
||||
return access, nil
|
||||
}
|
||||
|
@@ -48,9 +48,9 @@ func (r *AcmeAccountRepository) GetByCAAndEmail(ca, email string) (*domain.AcmeA
|
||||
|
||||
return &domain.AcmeAccount{
|
||||
Meta: domain.Meta{
|
||||
Id: record.GetString("id"),
|
||||
CreatedAt: record.GetTime("created"),
|
||||
UpdatedAt: record.GetTime("updated"),
|
||||
Id: record.GetId(),
|
||||
CreatedAt: record.GetCreated().Time(),
|
||||
UpdatedAt: record.GetUpdated().Time(),
|
||||
},
|
||||
CA: record.GetString("ca"),
|
||||
Email: record.GetString("email"),
|
||||
|
@@ -16,7 +16,7 @@ func NewCertificateRepository() *CertificateRepository {
|
||||
func (c *CertificateRepository) ListExpireSoon(ctx context.Context) ([]domain.Certificate, error) {
|
||||
rs := []domain.Certificate{}
|
||||
if err := app.GetApp().Dao().DB().
|
||||
NewQuery("select * from certificate where expireAt > datetime('now') and expireAt < datetime('now', '+20 days')").
|
||||
NewQuery("SELECT * FROM certificate WHERE expireAt > DATETIME('now') AND expireAt < DATETIME('now', '+20 days')").
|
||||
All(&rs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -15,19 +15,19 @@ func NewSettingsRepository() *SettingsRepository {
|
||||
}
|
||||
|
||||
func (s *SettingsRepository) GetByName(ctx context.Context, name string) (*domain.Settings, error) {
|
||||
resp, err := app.GetApp().Dao().FindFirstRecordByFilter("settings", "name={:name}", dbx.Params{"name": name})
|
||||
record, err := app.GetApp().Dao().FindFirstRecordByFilter("settings", "name={:name}", dbx.Params{"name": name})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rs := &domain.Settings{
|
||||
Meta: domain.Meta{
|
||||
Id: resp.GetString("id"),
|
||||
CreatedAt: resp.GetTime("created"),
|
||||
UpdatedAt: resp.GetTime("updated"),
|
||||
Id: record.GetId(),
|
||||
CreatedAt: record.GetCreated().Time(),
|
||||
UpdatedAt: record.GetUpdated().Time(),
|
||||
},
|
||||
Name: resp.GetString("name"),
|
||||
Content: resp.GetString("content"),
|
||||
Name: record.GetString("name"),
|
||||
Content: record.GetString("content"),
|
||||
}
|
||||
|
||||
return rs, nil
|
||||
|
@@ -19,7 +19,7 @@ func (r *StatisticsRepository) Get(ctx context.Context) (*domain.Statistics, err
|
||||
certTotal := struct {
|
||||
Total int `db:"total"`
|
||||
}{}
|
||||
if err := app.GetApp().Dao().DB().NewQuery("select count(*) as total from certificate").One(&certTotal); err != nil {
|
||||
if err := app.GetApp().Dao().DB().NewQuery("SELECT COUNT(*) AS total FROM certificate").One(&certTotal); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rs.CertificateTotal = certTotal.Total
|
||||
@@ -29,7 +29,7 @@ func (r *StatisticsRepository) Get(ctx context.Context) (*domain.Statistics, err
|
||||
Total int `db:"total"`
|
||||
}{}
|
||||
if err := app.GetApp().Dao().DB().
|
||||
NewQuery("select count(*) as total from certificate where expireAt > datetime('now') and expireAt < datetime('now', '+20 days')").
|
||||
NewQuery("SELECT COUNT(*) AS total FROM certificate WHERE expireAt > DATETIME('now') and expireAt < DATETIME('now', '+20 days')").
|
||||
One(&certExpireSoonTotal); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -40,7 +40,7 @@ func (r *StatisticsRepository) Get(ctx context.Context) (*domain.Statistics, err
|
||||
Total int `db:"total"`
|
||||
}{}
|
||||
if err := app.GetApp().Dao().DB().
|
||||
NewQuery("select count(*) as total from certificate where expireAt < datetime('now')").
|
||||
NewQuery("SELECT COUNT(*) AS total FROM certificate WHERE expireAt < DATETIME('now')").
|
||||
One(&certExpiredTotal); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -50,7 +50,7 @@ func (r *StatisticsRepository) Get(ctx context.Context) (*domain.Statistics, err
|
||||
workflowTotal := struct {
|
||||
Total int `db:"total"`
|
||||
}{}
|
||||
if err := app.GetApp().Dao().DB().NewQuery("select count(*) as total from workflow").One(&workflowTotal); err != nil {
|
||||
if err := app.GetApp().Dao().DB().NewQuery("SELECT COUNT(*) AS total FROM workflow").One(&workflowTotal); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rs.WorkflowTotal = workflowTotal.Total
|
||||
@@ -59,7 +59,7 @@ func (r *StatisticsRepository) Get(ctx context.Context) (*domain.Statistics, err
|
||||
workflowEnabledTotal := struct {
|
||||
Total int `db:"total"`
|
||||
}{}
|
||||
if err := app.GetApp().Dao().DB().NewQuery("select count(*) as total from workflow where enabled is TRUE").One(&workflowEnabledTotal); err != nil {
|
||||
if err := app.GetApp().Dao().DB().NewQuery("SELECT COUNT(*) AS total FROM workflow WHERE enabled IS TRUE").One(&workflowEnabledTotal); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rs.WorkflowEnabled = workflowEnabledTotal.Total
|
||||
|
@@ -6,6 +6,7 @@ import (
|
||||
"errors"
|
||||
|
||||
"github.com/pocketbase/dbx"
|
||||
"github.com/pocketbase/pocketbase/daos"
|
||||
"github.com/pocketbase/pocketbase/models"
|
||||
"github.com/usual2970/certimate/internal/app"
|
||||
"github.com/usual2970/certimate/internal/domain"
|
||||
@@ -21,11 +22,12 @@ func (w *WorkflowRepository) ListEnabledAuto(ctx context.Context) ([]domain.Work
|
||||
records, err := app.GetApp().Dao().FindRecordsByFilter(
|
||||
"workflow",
|
||||
"enabled={:enabled} && trigger={:trigger}",
|
||||
"-created", 1000, 0, dbx.Params{"enabled": true, "trigger": domain.WorkflowTriggerAuto},
|
||||
"-created", 1000, 0, dbx.Params{"enabled": true, "trigger": domain.WorkflowTriggerTypeAuto},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rs := make([]domain.Workflow, 0)
|
||||
for _, record := range records {
|
||||
workflow, err := record2Workflow(record)
|
||||
@@ -34,25 +36,50 @@ func (w *WorkflowRepository) ListEnabledAuto(ctx context.Context) ([]domain.Work
|
||||
}
|
||||
rs = append(rs, *workflow)
|
||||
}
|
||||
|
||||
return rs, nil
|
||||
}
|
||||
|
||||
func (w *WorkflowRepository) SaveRunLog(ctx context.Context, log *domain.WorkflowRun) error {
|
||||
func (w *WorkflowRepository) SaveRun(ctx context.Context, run *domain.WorkflowRun) error {
|
||||
collection, err := app.GetApp().Dao().FindCollectionByNameOrId("workflow_run")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
record := models.NewRecord(collection)
|
||||
|
||||
record.Set("workflowId", log.WorkflowId)
|
||||
record.Set("trigger", log.Trigger)
|
||||
record.Set("startedAt", log.StartedAt)
|
||||
record.Set("completedAt", log.CompletedAt)
|
||||
record.Set("logs", log.Logs)
|
||||
record.Set("succeeded", log.Succeeded)
|
||||
record.Set("error", log.Error)
|
||||
err = app.GetApp().Dao().RunInTransaction(func(txDao *daos.Dao) error {
|
||||
record := models.NewRecord(collection)
|
||||
record.Set("workflowId", run.WorkflowId)
|
||||
record.Set("trigger", string(run.Trigger))
|
||||
record.Set("status", string(run.Status))
|
||||
record.Set("startedAt", run.StartedAt)
|
||||
record.Set("endedAt", run.EndedAt)
|
||||
record.Set("logs", run.Logs)
|
||||
record.Set("error", run.Error)
|
||||
err = txDao.SaveRecord(record)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return app.GetApp().Dao().SaveRecord(record)
|
||||
_, err = txDao.DB().Update(
|
||||
"workflow",
|
||||
dbx.Params{
|
||||
"lastRunId": record.GetId(),
|
||||
"lastRunStatus": record.GetString("status"),
|
||||
"lastRunTime": record.GetString("startedAt"),
|
||||
},
|
||||
dbx.NewExp("id={:id}", dbx.Params{"id": run.WorkflowId}),
|
||||
).Execute()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *WorkflowRepository) Get(ctx context.Context, id string) (*domain.Workflow, error) {
|
||||
@@ -81,18 +108,20 @@ func record2Workflow(record *models.Record) (*domain.Workflow, error) {
|
||||
workflow := &domain.Workflow{
|
||||
Meta: domain.Meta{
|
||||
Id: record.GetId(),
|
||||
CreatedAt: record.GetTime("created"),
|
||||
UpdatedAt: record.GetTime("updated"),
|
||||
CreatedAt: record.GetCreated().Time(),
|
||||
UpdatedAt: record.GetUpdated().Time(),
|
||||
},
|
||||
Name: record.GetString("name"),
|
||||
Description: record.GetString("description"),
|
||||
Trigger: record.GetString("trigger"),
|
||||
TriggerCron: record.GetString("triggerCron"),
|
||||
Enabled: record.GetBool("enabled"),
|
||||
Content: content,
|
||||
Draft: draft,
|
||||
HasDraft: record.GetBool("hasDraft"),
|
||||
Name: record.GetString("name"),
|
||||
Description: record.GetString("description"),
|
||||
Trigger: domain.WorkflowTriggerType(record.GetString("trigger")),
|
||||
TriggerCron: record.GetString("triggerCron"),
|
||||
Enabled: record.GetBool("enabled"),
|
||||
Content: content,
|
||||
Draft: draft,
|
||||
HasDraft: record.GetBool("hasDraft"),
|
||||
LastRunId: record.GetString("lastRunId"),
|
||||
LastRunStatus: domain.WorkflowRunStatusType(record.GetString("lastRunStatus")),
|
||||
LastRunTime: record.GetTime("lastRunTime"),
|
||||
}
|
||||
|
||||
return workflow, nil
|
||||
}
|
||||
|
@@ -73,16 +73,16 @@ func (w *WorkflowOutputRepository) GetCertificate(ctx context.Context, nodeId st
|
||||
rs := &domain.Certificate{
|
||||
Meta: domain.Meta{
|
||||
Id: record.GetId(),
|
||||
CreatedAt: record.GetDateTime("created").Time(),
|
||||
UpdatedAt: record.GetDateTime("updated").Time(),
|
||||
CreatedAt: record.GetCreated().Time(),
|
||||
UpdatedAt: record.GetUpdated().Time(),
|
||||
},
|
||||
Source: record.GetString("source"),
|
||||
Source: domain.CertificateSourceType(record.GetString("source")),
|
||||
SubjectAltNames: record.GetString("subjectAltNames"),
|
||||
Certificate: record.GetString("certificate"),
|
||||
PrivateKey: record.GetString("privateKey"),
|
||||
IssuerCertificate: record.GetString("issuerCertificate"),
|
||||
EffectAt: record.GetDateTime("effectAt").Time(),
|
||||
ExpireAt: record.GetDateTime("expireAt").Time(),
|
||||
EffectAt: record.GetTime("effectAt"),
|
||||
ExpireAt: record.GetTime("expireAt"),
|
||||
AcmeCertUrl: record.GetString("acmeCertUrl"),
|
||||
AcmeCertStableUrl: record.GetString("acmeCertStableUrl"),
|
||||
WorkflowId: record.GetString("workflowId"),
|
||||
|
@@ -44,27 +44,28 @@ func update(ctx context.Context, record *models.Record) error {
|
||||
// 是不是自动
|
||||
// 是不是 enabled
|
||||
|
||||
id := record.Id
|
||||
workflowId := record.Id
|
||||
enabled := record.GetBool("enabled")
|
||||
trigger := record.GetString("trigger")
|
||||
|
||||
scheduler := app.GetScheduler()
|
||||
if !enabled || trigger == domain.WorkflowTriggerManual {
|
||||
scheduler.Remove(id)
|
||||
if !enabled || trigger == string(domain.WorkflowTriggerTypeManual) {
|
||||
scheduler.Remove(workflowId)
|
||||
scheduler.Start()
|
||||
return nil
|
||||
}
|
||||
|
||||
err := scheduler.Add(id, record.GetString("triggerCron"), func() {
|
||||
err := scheduler.Add(workflowId, record.GetString("triggerCron"), func() {
|
||||
NewWorkflowService(repository.NewWorkflowRepository()).Run(ctx, &domain.WorkflowRunReq{
|
||||
Id: id,
|
||||
WorkflowId: workflowId,
|
||||
Trigger: domain.WorkflowTriggerTypeAuto,
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
app.GetApp().Logger().Error("add cron job failed", "err", err)
|
||||
app.GetLogger().Error("add cron job failed", "err", err)
|
||||
return fmt.Errorf("add cron job failed: %w", err)
|
||||
}
|
||||
app.GetApp().Logger().Error("add cron job failed", "subjectAltNames", record.GetString("subjectAltNames"))
|
||||
app.GetLogger().Error("add cron job failed", "subjectAltNames", record.GetString("subjectAltNames"))
|
||||
|
||||
scheduler.Start()
|
||||
return nil
|
||||
|
@@ -98,7 +98,7 @@ func (a *applyNode) Run(ctx context.Context) error {
|
||||
}
|
||||
|
||||
certificateRecord := &domain.Certificate{
|
||||
Source: string(domain.CERTIFICATE_SOURCE_WORKFLOW),
|
||||
Source: domain.CertificateSourceTypeWorkflow,
|
||||
SubjectAltNames: strings.Join(certX509.DNSNames, ";"),
|
||||
Certificate: certificate.Certificate,
|
||||
PrivateKey: certificate.PrivateKey,
|
||||
|
@@ -3,6 +3,7 @@ package workflow
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/usual2970/certimate/internal/app"
|
||||
"github.com/usual2970/certimate/internal/domain"
|
||||
@@ -11,7 +12,7 @@ import (
|
||||
|
||||
type WorkflowRepository interface {
|
||||
Get(ctx context.Context, id string) (*domain.Workflow, error)
|
||||
SaveRunLog(ctx context.Context, log *domain.WorkflowRun) error
|
||||
SaveRun(ctx context.Context, run *domain.WorkflowRun) error
|
||||
ListEnabledAuto(ctx context.Context) ([]domain.Workflow, error)
|
||||
}
|
||||
|
||||
@@ -31,71 +32,80 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
scheduler := app.GetScheduler()
|
||||
for _, workflow := range workflows {
|
||||
err := scheduler.Add(workflow.Id, workflow.TriggerCron, func() {
|
||||
s.Run(ctx, &domain.WorkflowRunReq{
|
||||
Id: workflow.Id,
|
||||
WorkflowId: workflow.Id,
|
||||
Trigger: domain.WorkflowTriggerTypeAuto,
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
app.GetApp().Logger().Error("failed to add schedule", "err", err)
|
||||
app.GetLogger().Error("failed to add schedule", "err", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
scheduler.Start()
|
||||
app.GetApp().Logger().Info("workflow schedule started")
|
||||
|
||||
app.GetLogger().Info("workflow schedule started")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *WorkflowService) Run(ctx context.Context, req *domain.WorkflowRunReq) error {
|
||||
func (s *WorkflowService) Run(ctx context.Context, options *domain.WorkflowRunReq) error {
|
||||
// 查询
|
||||
if req.Id == "" {
|
||||
if options.WorkflowId == "" {
|
||||
return domain.ErrInvalidParams
|
||||
}
|
||||
|
||||
workflow, err := s.repo.Get(ctx, req.Id)
|
||||
workflow, err := s.repo.Get(ctx, options.WorkflowId)
|
||||
if err != nil {
|
||||
app.GetApp().Logger().Error("failed to get workflow", "id", req.Id, "err", err)
|
||||
app.GetLogger().Error("failed to get workflow", "id", options.WorkflowId, "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// 执行
|
||||
if !workflow.Enabled {
|
||||
app.GetApp().Logger().Error("workflow is disabled", "id", req.Id)
|
||||
app.GetLogger().Error("workflow is disabled", "id", options.WorkflowId)
|
||||
return fmt.Errorf("workflow is disabled")
|
||||
}
|
||||
|
||||
// 执行
|
||||
run := &domain.WorkflowRun{
|
||||
WorkflowId: workflow.Id,
|
||||
Status: domain.WorkflowRunStatusTypeRunning,
|
||||
Trigger: options.Trigger,
|
||||
StartedAt: time.Now(),
|
||||
EndedAt: time.Now(),
|
||||
}
|
||||
|
||||
processor := nodeprocessor.NewWorkflowProcessor(workflow)
|
||||
if err := processor.Run(ctx); err != nil {
|
||||
log := &domain.WorkflowRun{
|
||||
WorkflowId: workflow.Id,
|
||||
Logs: processor.Log(ctx),
|
||||
Succeeded: false,
|
||||
Error: err.Error(),
|
||||
}
|
||||
if err := s.repo.SaveRunLog(ctx, log); err != nil {
|
||||
app.GetApp().Logger().Error("failed to save run log", "err", err)
|
||||
run.Status = domain.WorkflowRunStatusTypeFailed
|
||||
run.EndedAt = time.Now()
|
||||
run.Logs = processor.Log(ctx)
|
||||
run.Error = err.Error()
|
||||
|
||||
if err := s.repo.SaveRun(ctx, run); err != nil {
|
||||
app.GetLogger().Error("failed to save workflow run", "err", err)
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed to run workflow: %w", err)
|
||||
}
|
||||
|
||||
// 保存执行日志
|
||||
logs := processor.Log(ctx)
|
||||
runLogs := domain.WorkflowRunLogs(logs)
|
||||
runErr := runLogs.FirstError()
|
||||
succeed := true
|
||||
if runErr != "" {
|
||||
succeed = false
|
||||
runStatus := domain.WorkflowRunStatusTypeSucceeded
|
||||
runError := domain.WorkflowRunLogs(logs).FirstError()
|
||||
if runError != "" {
|
||||
runStatus = domain.WorkflowRunStatusTypeFailed
|
||||
}
|
||||
log := &domain.WorkflowRun{
|
||||
WorkflowId: workflow.Id,
|
||||
Logs: processor.Log(ctx),
|
||||
Error: runErr,
|
||||
Succeeded: succeed,
|
||||
}
|
||||
if err := s.repo.SaveRunLog(ctx, log); err != nil {
|
||||
app.GetApp().Logger().Error("failed to save run log", "err", err)
|
||||
run.Status = runStatus
|
||||
run.EndedAt = time.Now()
|
||||
run.Logs = processor.Log(ctx)
|
||||
run.Error = runError
|
||||
if err := s.repo.SaveRun(ctx, run); err != nil {
|
||||
app.GetLogger().Error("failed to save workflow run", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user