recover from panics

This commit is contained in:
Yoan.liu 2025-06-13 13:37:14 +08:00
parent a048eb95a9
commit 081e83e0bf

View File

@ -4,8 +4,10 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"log"
"os" "os"
"runtime" "runtime"
"runtime/debug"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@ -209,7 +211,25 @@ func (d *WorkflowDispatcher) dequeueWorker() {
} }
func (d *WorkflowDispatcher) work(ctx context.Context, data *WorkflowWorkerData) { func (d *WorkflowDispatcher) work(ctx context.Context, data *WorkflowWorkerData) {
var run *domain.WorkflowRun
var err error
defer func() { defer func() {
// 捕获 panic避免影响其他工作流的执行
if r := recover(); r != nil {
log.Default().Println("WorkflowId:", data.WorkflowId, "RunId:", data.RunId)
log.Default().Println("Recovered from panic:", r)
log.Default().Println("Stack trace:", string(debug.Stack()))
if run != nil {
run.Status = domain.WorkflowRunStatusTypeFailed
run.EndedAt = time.Now()
run.Error = fmt.Sprintf("workflow run panic: %v", r)
if _, err := d.workflowRunRepo.Save(ctx, run); err != nil {
log.Default().Println("Failed to save workflow run after panic:", err)
}
}
}
<-d.semaphore <-d.semaphore
d.workerMutex.Lock() d.workerMutex.Lock()
delete(d.workers, data.WorkflowId) delete(d.workers, data.WorkflowId)
@ -226,7 +246,7 @@ func (d *WorkflowDispatcher) work(ctx context.Context, data *WorkflowWorkerData)
}() }()
// 查询 WorkflowRun // 查询 WorkflowRun
run, err := d.workflowRunRepo.GetById(ctx, data.RunId) run, err = d.workflowRunRepo.GetById(ctx, data.RunId)
if err != nil { if err != nil {
if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) { if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
app.GetLogger().Error(fmt.Sprintf("failed to get workflow run #%s", data.RunId), "err", err) app.GetLogger().Error(fmt.Sprintf("failed to get workflow run #%s", data.RunId), "err", err)