From ebffac7ba4f8a915058b4b9491f49270d368ee8a Mon Sep 17 00:00:00 2001 From: yoan <536464346@qq.com> Date: Thu, 9 Jan 2025 20:00:15 +0800 Subject: [PATCH] execute workflows asynchronously --- go.sum | 3 - internal/domain/workflow.go | 4 ++ internal/repository/workflow.go | 50 ++++++++++++---- internal/workflow/service.go | 73 ++++++++++++++++++++++- ui/src/api/workflow.ts | 13 ++++ ui/src/pages/workflows/WorkflowDetail.tsx | 40 +++++++++++-- 6 files changed, 162 insertions(+), 21 deletions(-) diff --git a/go.sum b/go.sum index abf63404..d848839f 100644 --- a/go.sum +++ b/go.sum @@ -392,7 +392,6 @@ github.com/gojek/heimdall/v7 v7.0.3/go.mod h1:Z43HtMid7ysSjmsedPTXAki6jcdcNVnjn5 github.com/gojek/valkyrie v0.0.0-20180215180059-6aee720afcdf h1:5xRGbUdOmZKoDXkGx5evVLehuCMpuO1hl701bEQqXOM= github.com/gojek/valkyrie v0.0.0-20180215180059-6aee720afcdf/go.mod h1:QzhUKaYKJmcbTnCYCAVQrroCOY7vOOI8cSQ4NbuhYf0= github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A= -github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/golang-jwt/jwt/v4 v4.5.1 h1:JdqV9zKUdtaa9gdPlywC3aeoEsR681PlKC+4F5gQgeo= github.com/golang-jwt/jwt/v4 v4.5.1/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= @@ -700,8 +699,6 @@ github.com/pocketbase/dbx v1.11.0 h1:LpZezioMfT3K4tLrqA55wWFw1EtH1pM4tzSVa7kgszU github.com/pocketbase/dbx v1.11.0/go.mod h1:xXRCIAKTHMgUCyCKZm55pUOdvFziJjQfXaWKhu2vhMs= github.com/pocketbase/pocketbase v0.22.21 h1:DGPCxn6co8VuTV0mton4NFO/ON49XiFMszRr+Mysy48= github.com/pocketbase/pocketbase v0.22.21/go.mod h1:Cw5E4uoGhKItBIE2lJL3NfmiUr9Syk2xaNJ2G7Dssow= -github.com/pocketbase/pocketbase v0.23.12 h1:HB4THFbzaliF0C3wvpx+kNOZxIwCEMDqN3/17gn5N7E= -github.com/pocketbase/pocketbase v0.23.12/go.mod h1:OcFJNMO0Vzt3f9+lweMbup6iL7V13ckxu1pdEY6FeM0= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= diff --git a/internal/domain/workflow.go b/internal/domain/workflow.go index 2657b8a6..59474a7b 100644 --- a/internal/domain/workflow.go +++ b/internal/domain/workflow.go @@ -40,6 +40,10 @@ type Workflow struct { LastRunTime time.Time `json:"lastRunTime" db:"lastRunTime"` } +func (w *Workflow) Table() string { + return "workflow" +} + type WorkflowNode struct { Id string `json:"id"` Type WorkflowNodeType `json:"type"` diff --git a/internal/repository/workflow.go b/internal/repository/workflow.go index 7c0fc46e..e6e1d046 100644 --- a/internal/repository/workflow.go +++ b/internal/repository/workflow.go @@ -40,6 +40,39 @@ func (w *WorkflowRepository) ListEnabledAuto(ctx context.Context) ([]domain.Work return rs, nil } +func (w *WorkflowRepository) Save(ctx context.Context, workflow *domain.Workflow) error { + collection, err := app.GetApp().Dao().FindCollectionByNameOrId(workflow.Table()) + if err != nil { + return err + } + var record *models.Record + if workflow.Id == "" { + record = models.NewRecord(collection) + } else { + record, err = app.GetApp().Dao().FindRecordById(workflow.Table(), workflow.Id) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return domain.ErrRecordNotFound + } + return err + } + } + + record.Set("name", workflow.Name) + record.Set("description", workflow.Description) + record.Set("trigger", string(workflow.Trigger)) + record.Set("triggerCron", workflow.TriggerCron) + record.Set("enabled", workflow.Enabled) + record.Set("content", workflow.Content) + record.Set("draft", workflow.Draft) + record.Set("hasDraft", workflow.HasDraft) + record.Set("lastRunId", workflow.LastRunId) + record.Set("lastRunStatus", string(workflow.LastRunStatus)) + record.Set("lastRunTime", workflow.LastRunTime) + + return app.GetApp().Dao().SaveRecord(record) +} + func (w *WorkflowRepository) SaveRun(ctx context.Context, run *domain.WorkflowRun) error { collection, err := app.GetApp().Dao().FindCollectionByNameOrId("workflow_run") if err != nil { @@ -60,20 +93,17 @@ func (w *WorkflowRepository) SaveRun(ctx context.Context, run *domain.WorkflowRu return err } - _, err = txDao.DB().Update( - "workflow", - dbx.Params{ - "lastRunId": record.GetId(), - "lastRunStatus": record.GetString("status"), - "lastRunTime": record.GetString("startedAt"), - }, - dbx.NewExp("id={:id}", dbx.Params{"id": run.WorkflowId}), - ).Execute() + // unable trigger sse using DB() + wordflowRecord, err := txDao.FindRecordById("workflow", run.WorkflowId) if err != nil { return err } - return nil + wordflowRecord.Set("lastRunId", record.GetId()) + wordflowRecord.Set("lastRunStatus", record.GetString("status")) + wordflowRecord.Set("lastRunTime", record.GetString("startedAt")) + + return txDao.SaveRecord(wordflowRecord) }) if err != nil { return err diff --git a/internal/workflow/service.go b/internal/workflow/service.go index 03bb8c80..83afa94b 100644 --- a/internal/workflow/service.go +++ b/internal/workflow/service.go @@ -2,7 +2,9 @@ package workflow import ( "context" + "errors" "fmt" + "sync" "time" "github.com/usual2970/certimate/internal/app" @@ -10,19 +12,56 @@ import ( nodeprocessor "github.com/usual2970/certimate/internal/workflow/node-processor" ) +const defaultRoutines = 10 + +type workflowRunData struct { + Workflow *domain.Workflow + Options *domain.WorkflowRunReq +} + type WorkflowRepository interface { GetById(ctx context.Context, id string) (*domain.Workflow, error) SaveRun(ctx context.Context, run *domain.WorkflowRun) error + Save(ctx context.Context, workflow *domain.Workflow) error ListEnabledAuto(ctx context.Context) ([]domain.Workflow, error) } type WorkflowService struct { - repo WorkflowRepository + ch chan *workflowRunData + repo WorkflowRepository + wg sync.WaitGroup + cancel context.CancelFunc } func NewWorkflowService(repo WorkflowRepository) *WorkflowService { - return &WorkflowService{ + rs := &WorkflowService{ repo: repo, + ch: make(chan *workflowRunData, 1), + } + + ctx, cancel := context.WithCancel(context.Background()) + rs.cancel = cancel + + rs.wg.Add(defaultRoutines) + for i := 0; i < defaultRoutines; i++ { + go rs.process(ctx) + } + + return rs +} + +func (s *WorkflowService) process(ctx context.Context) { + defer s.wg.Done() + for { + select { + case data := <-s.ch: + // 执行 + if err := s.run(ctx, data); err != nil { + app.GetLogger().Error("failed to run workflow", "id", data.Workflow.Id, "err", err) + } + case <-ctx.Done(): + return + } } } @@ -60,7 +99,32 @@ func (s *WorkflowService) Run(ctx context.Context, options *domain.WorkflowRunRe return err } + if workflow.LastRunStatus == domain.WorkflowRunStatusTypeRunning { + return errors.New("workflow is running") + } + + // set last run + workflow.LastRunTime = time.Now() + workflow.LastRunStatus = domain.WorkflowRunStatusTypeRunning + workflow.LastRunId = "" + + if err := s.repo.Save(ctx, workflow); err != nil { + return err + } + + s.ch <- &workflowRunData{ + Workflow: workflow, + Options: options, + } + + return nil +} + +func (s *WorkflowService) run(ctx context.Context, runData *workflowRunData) error { // 执行 + workflow := runData.Workflow + options := runData.Options + run := &domain.WorkflowRun{ WorkflowId: workflow.Id, Status: domain.WorkflowRunStatusTypeRunning, @@ -100,3 +164,8 @@ func (s *WorkflowService) Run(ctx context.Context, options *domain.WorkflowRunRe return nil } + +func (s *WorkflowService) Stop() { + s.cancel() + s.wg.Wait() +} diff --git a/ui/src/api/workflow.ts b/ui/src/api/workflow.ts index f746e2c0..d545b94e 100644 --- a/ui/src/api/workflow.ts +++ b/ui/src/api/workflow.ts @@ -1,3 +1,4 @@ +import type { RecordModel, RecordSubscription } from "pocketbase"; import { ClientResponseError } from "pocketbase"; import { WORKFLOW_TRIGGERS } from "@/domain/workflow"; @@ -23,3 +24,15 @@ export const run = async (id: string) => { return resp; }; + +export const subscribe = async (id: string, cb: (e: RecordSubscription) => void) => { + const pb = getPocketBase(); + + pb.collection("workflow").subscribe(id, cb); +}; + +export const unsubscribe = async (id: string) => { + const pb = getPocketBase(); + + pb.collection("workflow").unsubscribe(id); +}; diff --git a/ui/src/pages/workflows/WorkflowDetail.tsx b/ui/src/pages/workflows/WorkflowDetail.tsx index 5dab83ef..0d89ac2a 100644 --- a/ui/src/pages/workflows/WorkflowDetail.tsx +++ b/ui/src/pages/workflows/WorkflowDetail.tsx @@ -1,4 +1,4 @@ -import { useEffect, useState } from "react"; +import { useEffect, useMemo, useState } from "react"; import { useTranslation } from "react-i18next"; import { useNavigate, useParams } from "react-router-dom"; import { @@ -17,7 +17,7 @@ import { ClientResponseError } from "pocketbase"; import { isEqual } from "radash"; import { z } from "zod"; -import { run as runWorkflow } from "@/api/workflow"; +import { run as runWorkflow, subscribe, unsubscribe } from "@/api/workflow"; import ModalForm from "@/components/ModalForm"; import Show from "@/components/Show"; import WorkflowElements from "@/components/workflow/WorkflowElements"; @@ -57,6 +57,30 @@ const WorkflowDetail = () => { const [allowDiscard, setAllowDiscard] = useState(false); const [allowRelease, setAllowRelease] = useState(false); const [allowRun, setAllowRun] = useState(false); + + const lastRunStatus = useMemo(() => { + return workflow.lastRunStatus; + }, [workflow]); + + useEffect(() => { + if (lastRunStatus && lastRunStatus == "running") { + setIsRunning(true); + } else { + setIsRunning(false); + } + }, [lastRunStatus]); + + useEffect(() => { + if (isRunning && workflowId) { + subscribe(workflowId, (e) => { + if (e.record.lastRunStatus !== "running") { + setIsRunning(false); + unsubscribe(workflowId); + } + }); + } + }, [workflowId, isRunning]); + useEffect(() => { const hasReleased = !!workflow.content; const hasChanges = workflow.hasDraft! || !isEqual(workflow.draft, workflow.content); @@ -149,13 +173,18 @@ const WorkflowDetail = () => { resolve(void 0); } - // TODO: 异步执行 promise.then(async () => { - setIsRunning(true); - try { + // subscribe before running workflow + subscribe(workflowId!, (e) => { + if (e.record.lastRunStatus !== "running") { + setIsRunning(false); + unsubscribe(workflowId!); + } + }); await runWorkflow(workflowId!); + setIsRunning(true); messageApi.success(t("common.text.operation_succeeded")); } catch (err) { if (err instanceof ClientResponseError && err.isAbort) { @@ -164,7 +193,6 @@ const WorkflowDetail = () => { console.error(err); messageApi.warning(t("common.text.operation_failed")); - } finally { setIsRunning(false); } });