From 081e83e0bf2776a868afc45c21f398609af3e784 Mon Sep 17 00:00:00 2001 From: "Yoan.liu" Date: Fri, 13 Jun 2025 13:37:14 +0800 Subject: [PATCH] recover from panics --- internal/workflow/dispatcher/dispatcher.go | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/internal/workflow/dispatcher/dispatcher.go b/internal/workflow/dispatcher/dispatcher.go index 7874b945..ae03e40d 100644 --- a/internal/workflow/dispatcher/dispatcher.go +++ b/internal/workflow/dispatcher/dispatcher.go @@ -4,8 +4,10 @@ import ( "context" "errors" "fmt" + "log" "os" "runtime" + "runtime/debug" "strconv" "sync" "time" @@ -209,7 +211,25 @@ func (d *WorkflowDispatcher) dequeueWorker() { } func (d *WorkflowDispatcher) work(ctx context.Context, data *WorkflowWorkerData) { + var run *domain.WorkflowRun + var err error + defer func() { + // 捕获 panic,避免影响其他工作流的执行 + if r := recover(); r != nil { + log.Default().Println("WorkflowId:", data.WorkflowId, "RunId:", data.RunId) + log.Default().Println("Recovered from panic:", r) + log.Default().Println("Stack trace:", string(debug.Stack())) + if run != nil { + run.Status = domain.WorkflowRunStatusTypeFailed + run.EndedAt = time.Now() + run.Error = fmt.Sprintf("workflow run panic: %v", r) + if _, err := d.workflowRunRepo.Save(ctx, run); err != nil { + log.Default().Println("Failed to save workflow run after panic:", err) + } + } + } + <-d.semaphore d.workerMutex.Lock() delete(d.workers, data.WorkflowId) @@ -226,7 +246,7 @@ func (d *WorkflowDispatcher) work(ctx context.Context, data *WorkflowWorkerData) }() // 查询 WorkflowRun - run, err := d.workflowRunRepo.GetById(ctx, data.RunId) + run, err = d.workflowRunRepo.GetById(ctx, data.RunId) if err != nil { if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) { app.GetLogger().Error(fmt.Sprintf("failed to get workflow run #%s", data.RunId), "err", err)