expression evaluate

This commit is contained in:
Yoan.liu
2025-05-20 18:09:42 +08:00
parent b546cf3ad0
commit 97d692910b
15 changed files with 511 additions and 5 deletions

View File

@@ -16,6 +16,7 @@ import (
type applyNode struct {
node *domain.WorkflowNode
*nodeProcessor
*nodeOutputer
certRepo certificateRepository
outputRepo workflowOutputRepository
@@ -25,6 +26,7 @@ func NewApplyNode(node *domain.WorkflowNode) *applyNode {
return &applyNode{
node: node,
nodeProcessor: newNodeProcessor(node),
nodeOutputer: newNodeOutputer(),
certRepo: repository.NewCertificateRepository(),
outputRepo: repository.NewWorkflowOutputRepository(),
@@ -71,6 +73,7 @@ func (n *applyNode) Process(ctx context.Context) error {
n.logger.Warn("failed to parse certificate, may be the CA responded error")
return err
}
certificate := &domain.Certificate{
Source: domain.CertificateSourceTypeWorkflow,
Certificate: applyResult.CertificateFullChain,
@@ -96,6 +99,10 @@ func (n *applyNode) Process(ctx context.Context) error {
return err
}
// 添加中间结果
n.outputs["certificate.validated"] = true
n.outputs["certificate.daysLeft"] = int(time.Until(certificate.ExpireAt).Hours() / 24)
n.logger.Info("apply completed")
return nil
@@ -139,6 +146,10 @@ func (n *applyNode) checkCanSkip(ctx context.Context, lastOutput *domain.Workflo
renewalInterval := time.Duration(currentNodeConfig.SkipBeforeExpiryDays) * time.Hour * 24
expirationTime := time.Until(lastCertificate.ExpireAt)
if expirationTime > renewalInterval {
n.outputs["certificate.validated"] = true
n.outputs["certificate.daysLeft"] = int(expirationTime.Hours() / 24)
return true, fmt.Sprintf("the certificate has already been issued (expires in %dd, next renewal in %dd)", int(expirationTime.Hours()/24), currentNodeConfig.SkipBeforeExpiryDays)
}
}

View File

@@ -2,6 +2,7 @@ package nodeprocessor
import (
"context"
"errors"
"github.com/usual2970/certimate/internal/domain"
)
@@ -9,16 +10,43 @@ import (
type conditionNode struct {
node *domain.WorkflowNode
*nodeProcessor
*nodeOutputer
}
func NewConditionNode(node *domain.WorkflowNode) *conditionNode {
return &conditionNode{
node: node,
nodeProcessor: newNodeProcessor(node),
nodeOutputer: newNodeOutputer(),
}
}
func (n *conditionNode) Process(ctx context.Context) error {
// 此类型节点不需要执行任何操作,直接返回
n.logger.Info("enter condition node: " + n.node.Name)
nodeConfig := n.node.GetConfigForCondition()
if nodeConfig.Expression == nil {
return nil
}
return nil
}
func (n *conditionNode) eval(ctx context.Context, expression domain.Expr) (any, error) {
switch expr:=expression.(type) {
case domain.CompareExpr:
left,err:= n.eval(ctx, expr.Left)
if err != nil {
return nil, err
}
right,err:= n.eval(ctx, expr.Right)
if err != nil {
return nil, err
}
case domain.LogicalExpr:
case domain.NotExpr:
case domain.VarExpr:
case domain.ConstExpr:
}
return false, errors.New("unknown expression type")
}

View File

@@ -0,0 +1,126 @@
package nodeprocessor
import (
"context"
"sync"
)
// 定义上下文键类型,避免键冲突
type workflowContextKey string
const (
nodeOutputsKey workflowContextKey = "node_outputs"
)
// 带互斥锁的节点输出容器
type nodeOutputsContainer struct {
sync.RWMutex
outputs map[string]map[string]any
}
// 创建新的并发安全的节点输出容器
func newNodeOutputsContainer() *nodeOutputsContainer {
return &nodeOutputsContainer{
outputs: make(map[string]map[string]any),
}
}
// 添加节点输出到上下文
func AddNodeOutput(ctx context.Context, nodeId string, output map[string]any) context.Context {
container := getNodeOutputsContainer(ctx)
if container == nil {
container = newNodeOutputsContainer()
}
container.Lock()
defer container.Unlock()
// 创建输出的深拷贝以避免后续修改
outputCopy := make(map[string]any, len(output))
for k, v := range output {
outputCopy[k] = v
}
container.outputs[nodeId] = outputCopy
return context.WithValue(ctx, nodeOutputsKey, container)
}
// 从上下文获取节点输出
func GetNodeOutput(ctx context.Context, nodeId string) map[string]any {
container := getNodeOutputsContainer(ctx)
if container == nil {
return nil
}
container.RLock()
defer container.RUnlock()
output, exists := container.outputs[nodeId]
if !exists {
return nil
}
outputCopy := make(map[string]any, len(output))
for k, v := range output {
outputCopy[k] = v
}
return outputCopy
}
// 获取特定节点的特定输出项
func GetNodeOutputValue(ctx context.Context, nodeId string, key string) (any, bool) {
output := GetNodeOutput(ctx, nodeId)
if output == nil {
return nil, false
}
value, exists := output[key]
return value, exists
}
// 获取所有节点输出
func GetNodeOutputs(ctx context.Context) map[string]map[string]any {
container := getNodeOutputsContainer(ctx)
if container == nil {
return nil
}
container.RLock()
defer container.RUnlock()
// 创建所有输出的深拷贝
allOutputs := make(map[string]map[string]any, len(container.outputs))
for nodeId, output := range container.outputs {
nodeCopy := make(map[string]any, len(output))
for k, v := range output {
nodeCopy[k] = v
}
allOutputs[nodeId] = nodeCopy
}
return allOutputs
}
// 获取节点输出容器
func getNodeOutputsContainer(ctx context.Context) *nodeOutputsContainer {
value := ctx.Value(nodeOutputsKey)
if value == nil {
return nil
}
return value.(*nodeOutputsContainer)
}
// 检查节点是否有输出
func HasNodeOutput(ctx context.Context, nodeId string) bool {
container := getNodeOutputsContainer(ctx)
if container == nil {
return false
}
container.RLock()
defer container.RUnlock()
_, exists := container.outputs[nodeId]
return exists
}

View File

@@ -15,6 +15,7 @@ import (
type deployNode struct {
node *domain.WorkflowNode
*nodeProcessor
*nodeOutputer
certRepo certificateRepository
outputRepo workflowOutputRepository
@@ -24,6 +25,7 @@ func NewDeployNode(node *domain.WorkflowNode) *deployNode {
return &deployNode{
node: node,
nodeProcessor: newNodeProcessor(node),
nodeOutputer: newNodeOutputer(),
certRepo: repository.NewCertificateRepository(),
outputRepo: repository.NewWorkflowOutputRepository(),

View File

@@ -9,12 +9,14 @@ import (
type executeFailureNode struct {
node *domain.WorkflowNode
*nodeProcessor
*nodeOutputer
}
func NewExecuteFailureNode(node *domain.WorkflowNode) *executeFailureNode {
return &executeFailureNode{
node: node,
nodeProcessor: newNodeProcessor(node),
nodeOutputer: newNodeOutputer(),
}
}

View File

@@ -9,12 +9,14 @@ import (
type executeSuccessNode struct {
node *domain.WorkflowNode
*nodeProcessor
*nodeOutputer
}
func NewExecuteSuccessNode(node *domain.WorkflowNode) *executeSuccessNode {
return &executeSuccessNode{
node: node,
nodeProcessor: newNodeProcessor(node),
nodeOutputer: newNodeOutputer(),
}
}

View File

@@ -12,6 +12,7 @@ import (
type notifyNode struct {
node *domain.WorkflowNode
*nodeProcessor
*nodeOutputer
settingsRepo settingsRepository
}
@@ -20,6 +21,7 @@ func NewNotifyNode(node *domain.WorkflowNode) *notifyNode {
return &notifyNode{
node: node,
nodeProcessor: newNodeProcessor(node),
nodeOutputer: newNodeOutputer(),
settingsRepo: repository.NewSettingsRepository(),
}

View File

@@ -14,6 +14,8 @@ type NodeProcessor interface {
SetLogger(*slog.Logger)
Process(ctx context.Context) error
GetOutputs() map[string]any
}
type nodeProcessor struct {
@@ -32,6 +34,20 @@ func (n *nodeProcessor) SetLogger(logger *slog.Logger) {
n.logger = logger
}
type nodeOutputer struct {
outputs map[string]any
}
func newNodeOutputer() *nodeOutputer {
return &nodeOutputer{
outputs: make(map[string]any),
}
}
func (n *nodeOutputer) GetOutputs() map[string]any {
return n.outputs
}
type certificateRepository interface {
GetByWorkflowNodeId(ctx context.Context, workflowNodeId string) (*domain.Certificate, error)
}

View File

@@ -9,12 +9,14 @@ import (
type startNode struct {
node *domain.WorkflowNode
*nodeProcessor
*nodeOutputer
}
func NewStartNode(node *domain.WorkflowNode) *startNode {
return &startNode{
node: node,
nodeProcessor: newNodeProcessor(node),
nodeOutputer: newNodeOutputer(),
}
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"
"github.com/usual2970/certimate/internal/domain"
"github.com/usual2970/certimate/internal/repository"
@@ -12,6 +13,7 @@ import (
type uploadNode struct {
node *domain.WorkflowNode
*nodeProcessor
*nodeOutputer
certRepo certificateRepository
outputRepo workflowOutputRepository
@@ -21,6 +23,7 @@ func NewUploadNode(node *domain.WorkflowNode) *uploadNode {
return &uploadNode{
node: node,
nodeProcessor: newNodeProcessor(node),
nodeOutputer: newNodeOutputer(),
certRepo: repository.NewCertificateRepository(),
outputRepo: repository.NewWorkflowOutputRepository(),
@@ -66,6 +69,9 @@ func (n *uploadNode) Process(ctx context.Context) error {
return err
}
n.outputs["certificate.validated"] = true
n.outputs["certificate.daysLeft"] = int(time.Until(certificate.ExpireAt).Hours() / 24)
n.logger.Info("upload completed")
return nil
@@ -85,6 +91,8 @@ func (n *uploadNode) checkCanSkip(ctx context.Context, lastOutput *domain.Workfl
lastCertificate, _ := n.certRepo.GetByWorkflowNodeId(ctx, n.node.Id)
if lastCertificate != nil {
n.outputs["certificate.validated"] = true
n.outputs["certificate.daysLeft"] = int(time.Until(lastCertificate.ExpireAt).Hours() / 24)
return true, "the certificate has already been uploaded"
}
}