Compare commits

..

3 Commits

9 changed files with 44 additions and 22 deletions

View File

@ -55,8 +55,8 @@ type WorkflowNode struct {
Inputs []WorkflowNodeIO `json:"inputs"`
Outputs []WorkflowNodeIO `json:"outputs"`
Next *WorkflowNode `json:"next"`
Branches []WorkflowNode `json:"branches"`
Next *WorkflowNode `json:"next,omitempty"`
Branches []WorkflowNode `json:"branches,omitempty"`
Validated bool `json:"validated"`
}

View File

@ -242,7 +242,7 @@ func (w *WorkflowDispatcher) work(ctx context.Context, data *WorkflowWorkerData)
}
// 执行工作流
invoker := newWorkflowInvoker(data)
invoker := newWorkflowInvokerWithData(w.workflowRunRepo, data)
if runErr := invoker.Invoke(ctx); runErr != nil {
if errors.Is(runErr, context.Canceled) {
run.Status = domain.WorkflowRunStatusTypeCanceled

View File

@ -13,18 +13,23 @@ type workflowInvoker struct {
workflowContent *domain.WorkflowNode
runId string
runLogs []domain.WorkflowRunLog
workflowRunRepo workflowRunRepository
}
func newWorkflowInvoker(data *WorkflowWorkerData) *workflowInvoker {
func newWorkflowInvokerWithData(workflowRunRepo workflowRunRepository, data *WorkflowWorkerData) *workflowInvoker {
if data == nil {
panic("worker data is nil")
}
// TODO: 待优化,日志与执行解耦
return &workflowInvoker{
workflowId: data.WorkflowId,
workflowContent: data.WorkflowContent,
runId: data.RunId,
runLogs: make([]domain.WorkflowRunLog, 0),
workflowRunRepo: workflowRunRepo,
}
}
@ -70,6 +75,12 @@ func (w *workflowInvoker) processNode(ctx context.Context, node *domain.Workflow
log := processor.GetLog(ctx)
if log != nil {
w.runLogs = append(w.runLogs, *log)
// TODO: 待优化,把 /pkg/core/* 包下的输出写入到 DEBUG 级别的日志中
if run, err := w.workflowRunRepo.GetById(ctx, w.runId); err == nil {
run.Logs = w.runLogs
w.workflowRunRepo.Save(ctx, run)
}
}
if procErr != nil {
break

View File

@ -23,6 +23,7 @@ var (
)
func GetSingletonDispatcher(workflowRepo workflowRepository, workflowRunRepo workflowRunRepository) *WorkflowDispatcher {
// TODO: 待优化构造过程
intanceOnce.Do(func() {
instance = newWorkflowDispatcher(workflowRepo, workflowRunRepo)
})

View File

@ -55,6 +55,7 @@ func (l *nodeLogger) AppendLogRecord(ctx context.Context, level domain.WorkflowR
}
if len(err) > 0 {
record.Error = err[0]
l.log.Error = err[0]
}
l.log.Records = append(l.log.Records, record)

View File

@ -32,25 +32,28 @@ const WorkflowRunDetail = ({ data, ...props }: WorkflowRunDetailProps) => {
<Alert showIcon type="error" message={<Typography.Text type="danger">{t("workflow_run.props.status.failed")}</Typography.Text>} />
</Show>
<div className="my-4 rounded-md bg-black p-4 text-stone-200">
<div className="flex flex-col space-y-4">
{data.logs?.map((item, i) => {
return (
<div key={i} className="flex flex-col space-y-2">
<div className="font-semibold">{item.nodeName}</div>
<div className="flex flex-col space-y-1">
{item.records?.map((output, j) => {
return (
<div key={j} className="flex space-x-2 text-sm" style={{ wordBreak: "break-word" }}>
<div className="whitespace-nowrap">[{dayjs(output.time).format("YYYY-MM-DD HH:mm:ss")}]</div>
{output.error ? <div className="text-red-500">{output.error}</div> : <div>{output.content}</div>}
</div>
);
})}
<div className="my-4">
<Typography.Title level={5}>{t("workflow_run.logs")}</Typography.Title>
<div className="rounded-md bg-black p-4 text-stone-200">
<div className="flex flex-col space-y-4">
{data.logs?.map((item, i) => {
return (
<div key={i} className="flex flex-col space-y-2">
<div className="font-semibold">{item.nodeName}</div>
<div className="flex flex-col space-y-1">
{item.records?.map((output, j) => {
return (
<div key={j} className="flex space-x-2 text-sm" style={{ wordBreak: "break-word" }}>
<div className="whitespace-nowrap">[{dayjs(output.time).format("YYYY-MM-DD HH:mm:ss")}]</div>
{output.error ? <div className="text-red-500">{output.error}</div> : <div>{output.content}</div>}
</div>
);
})}
</div>
</div>
</div>
);
})}
);
})}
</div>
</div>
</div>

View File

@ -227,6 +227,10 @@ const WorkflowRuns = ({ className, style, workflowId }: WorkflowRunsProps) => {
}
return [...prev];
});
if (cb.record.status !== WORKFLOW_RUN_STATUSES.PENDING && cb.record.status !== WORKFLOW_RUN_STATUSES.RUNNING) {
unsubscribeWorkflowRun(item.id);
}
});
}

View File

@ -18,6 +18,7 @@
"workflow_run.props.started_at": "Started at",
"workflow_run.props.ended_at": "Ended at",
"workflow_run.logs": "Logs",
"workflow_run.artifacts": "Artifacts",
"workflow_run_artifact.props.type": "Type",

View File

@ -18,6 +18,7 @@
"workflow_run.props.started_at": "开始时间",
"workflow_run.props.ended_at": "完成时间",
"workflow_run.logs": "日志",
"workflow_run.artifacts": "输出产物",
"workflow_run_artifact.props.type": "类型",