diff --git a/internal/workflow/dispatcher/dispatcher.go b/internal/workflow/dispatcher/dispatcher.go index 47b91ca0..53081b6b 100644 --- a/internal/workflow/dispatcher/dispatcher.go +++ b/internal/workflow/dispatcher/dispatcher.go @@ -142,11 +142,11 @@ func (w *WorkflowDispatcher) Shutdown() { w.workerMutex.Lock() for _, worker := range w.workers { worker.Cancel() + delete(w.workers, worker.Data.WorkflowId) + delete(w.workerIdMap, worker.Data.RunId) } w.workerMutex.Unlock() w.wg.Wait() - w.workers = make(map[string]*workflowWorker) - w.workerIdMap = make(map[string]string) } func (w *WorkflowDispatcher) enqueueWorker(data *WorkflowWorkerData) { diff --git a/migrations/1739263253_updated_workflow_run.go b/migrations/1739263253_updated_workflow_run.go new file mode 100644 index 00000000..5286a7b8 --- /dev/null +++ b/migrations/1739263253_updated_workflow_run.go @@ -0,0 +1,58 @@ +package migrations + +import ( + "github.com/pocketbase/pocketbase/core" + m "github.com/pocketbase/pocketbase/migrations" +) + +func init() { + m.Register(func(app core.App) error { + collection, err := app.FindCollectionByNameOrId("qjp8lygssgwyqyz") + if err != nil { + return err + } + + // update field + if err := collection.Fields.AddMarshaledJSONAt(1, []byte(`{ + "cascadeDelete": true, + "collectionId": "tovyif5ax6j62ur", + "hidden": false, + "id": "m8xfsyyy", + "maxSelect": 1, + "minSelect": 0, + "name": "workflowId", + "presentable": false, + "required": false, + "system": false, + "type": "relation" + }`)); err != nil { + return err + } + + return app.Save(collection) + }, func(app core.App) error { + collection, err := app.FindCollectionByNameOrId("qjp8lygssgwyqyz") + if err != nil { + return err + } + + // update field + if err := collection.Fields.AddMarshaledJSONAt(1, []byte(`{ + "cascadeDelete": false, + "collectionId": "tovyif5ax6j62ur", + "hidden": false, + "id": "m8xfsyyy", + "maxSelect": 1, + "minSelect": 0, + "name": "workflowId", + "presentable": false, + "required": false, + "system": false, + "type": "relation" + }`)); err != nil { + return err + } + + return app.Save(collection) + }) +} diff --git a/migrations/1739263264_updated_workflow_output.go b/migrations/1739263264_updated_workflow_output.go new file mode 100644 index 00000000..e2add066 --- /dev/null +++ b/migrations/1739263264_updated_workflow_output.go @@ -0,0 +1,92 @@ +package migrations + +import ( + "github.com/pocketbase/pocketbase/core" + m "github.com/pocketbase/pocketbase/migrations" +) + +func init() { + m.Register(func(app core.App) error { + collection, err := app.FindCollectionByNameOrId("bqnxb95f2cooowp") + if err != nil { + return err + } + + // update field + if err := collection.Fields.AddMarshaledJSONAt(1, []byte(`{ + "cascadeDelete": true, + "collectionId": "tovyif5ax6j62ur", + "hidden": false, + "id": "jka88auc", + "maxSelect": 1, + "minSelect": 0, + "name": "workflowId", + "presentable": false, + "required": false, + "system": false, + "type": "relation" + }`)); err != nil { + return err + } + + // update field + if err := collection.Fields.AddMarshaledJSONAt(2, []byte(`{ + "cascadeDelete": true, + "collectionId": "qjp8lygssgwyqyz", + "hidden": false, + "id": "relation821863227", + "maxSelect": 1, + "minSelect": 0, + "name": "runId", + "presentable": false, + "required": false, + "system": false, + "type": "relation" + }`)); err != nil { + return err + } + + return app.Save(collection) + }, func(app core.App) error { + collection, err := app.FindCollectionByNameOrId("bqnxb95f2cooowp") + if err != nil { + return err + } + + // update field + if err := collection.Fields.AddMarshaledJSONAt(1, []byte(`{ + "cascadeDelete": false, + "collectionId": "tovyif5ax6j62ur", + "hidden": false, + "id": "jka88auc", + "maxSelect": 1, + "minSelect": 0, + "name": "workflowId", + "presentable": false, + "required": false, + "system": false, + "type": "relation" + }`)); err != nil { + return err + } + + // update field + if err := collection.Fields.AddMarshaledJSONAt(2, []byte(`{ + "cascadeDelete": false, + "collectionId": "qjp8lygssgwyqyz", + "hidden": false, + "id": "relation821863227", + "maxSelect": 1, + "minSelect": 0, + "name": "runId", + "presentable": false, + "required": false, + "system": false, + "type": "relation" + }`)); err != nil { + return err + } + + return app.Save(collection) + }) +} diff --git a/ui/src/repository/_pocketbase.ts b/ui/src/repository/_pocketbase.ts index 53d2f792..983c4987 100644 --- a/ui/src/repository/_pocketbase.ts +++ b/ui/src/repository/_pocketbase.ts @@ -6,3 +6,11 @@ export const getPocketBase = () => { pb = new PocketBase("/"); return pb; }; + +export const COLLECTION_NAME_ADMIN = "_superusers"; +export const COLLECTION_NAME_ACCESS = "access"; +export const COLLECTION_NAME_CERTIFICATE = "certificate"; +export const COLLECTION_NAME_SETTINGS = "settings"; +export const COLLECTION_NAME_WORKFLOW = "workflow"; +export const COLLECTION_NAME_WORKFLOW_RUN = "workflow_run"; +export const COLLECTION_NAME_WORKFLOW_OUTPUT = "workflow_output"; diff --git a/ui/src/repository/access.ts b/ui/src/repository/access.ts index c254f386..4b21d5da 100644 --- a/ui/src/repository/access.ts +++ b/ui/src/repository/access.ts @@ -1,12 +1,10 @@ import dayjs from "dayjs"; import { type AccessModel } from "@/domain/access"; -import { getPocketBase } from "./_pocketbase"; - -const COLLECTION_NAME = "access"; +import { COLLECTION_NAME_ACCESS, getPocketBase } from "./_pocketbase"; export const list = async () => { - return await getPocketBase().collection(COLLECTION_NAME).getFullList({ + return await getPocketBase().collection(COLLECTION_NAME_ACCESS).getFullList({ filter: "deleted=null", sort: "-created", requestKey: null, @@ -15,15 +13,15 @@ export const list = async () => { export const save = async (record: MaybeModelRecord) => { if (record.id) { - return await getPocketBase().collection(COLLECTION_NAME).update(record.id, record); + return await getPocketBase().collection(COLLECTION_NAME_ACCESS).update(record.id, record); } - return await getPocketBase().collection(COLLECTION_NAME).create(record); + return await getPocketBase().collection(COLLECTION_NAME_ACCESS).create(record); }; export const remove = async (record: MaybeModelRecordWithId) => { await getPocketBase() - .collection(COLLECTION_NAME) + .collection(COLLECTION_NAME_ACCESS) .update(record.id!, { deleted: dayjs.utc().format("YYYY-MM-DD HH:mm:ss") }); return true; }; diff --git a/ui/src/repository/admin.ts b/ui/src/repository/admin.ts index 1d43eca0..da8ae5c3 100644 --- a/ui/src/repository/admin.ts +++ b/ui/src/repository/admin.ts @@ -1,9 +1,7 @@ -import { getPocketBase } from "./_pocketbase"; - -const COLLECTION_NAME = "_superusers"; +import { COLLECTION_NAME_ADMIN, getPocketBase } from "./_pocketbase"; export const authWithPassword = (username: string, password: string) => { - return getPocketBase().collection(COLLECTION_NAME).authWithPassword(username, password); + return getPocketBase().collection(COLLECTION_NAME_ADMIN).authWithPassword(username, password); }; export const getAuthStore = () => { @@ -12,6 +10,6 @@ export const getAuthStore = () => { export const save = (data: { email: string } | { password: string; passwordConfirm: string }) => { return getPocketBase() - .collection(COLLECTION_NAME) + .collection(COLLECTION_NAME_ADMIN) .update(getAuthStore().record?.id || "", data); }; diff --git a/ui/src/repository/certificate.ts b/ui/src/repository/certificate.ts index 598443ae..61de9c51 100644 --- a/ui/src/repository/certificate.ts +++ b/ui/src/repository/certificate.ts @@ -2,9 +2,7 @@ import dayjs from "dayjs"; import { type RecordListOptions } from "pocketbase"; import { type CertificateModel } from "@/domain/certificate"; -import { getPocketBase } from "./_pocketbase"; - -const COLLECTION_NAME = "certificate"; +import { COLLECTION_NAME_CERTIFICATE, getPocketBase } from "./_pocketbase"; export type ListCertificateRequest = { page?: number; @@ -35,7 +33,7 @@ export const list = async (request: ListCertificateRequest) => { }); } - return pb.collection(COLLECTION_NAME).getList(page, perPage, options); + return pb.collection(COLLECTION_NAME_CERTIFICATE).getList(page, perPage, options); }; export const listByWorkflowRunId = async (workflowRunId: string) => { @@ -48,7 +46,7 @@ export const listByWorkflowRunId = async (workflowRunId: string) => { sort: "-created", requestKey: null, }; - const items = await pb.collection(COLLECTION_NAME).getFullList(options); + const items = await pb.collection(COLLECTION_NAME_CERTIFICATE).getFullList(options); return { totalItems: items.length, items: items, @@ -57,7 +55,7 @@ export const listByWorkflowRunId = async (workflowRunId: string) => { export const remove = async (record: MaybeModelRecordWithId) => { await getPocketBase() - .collection(COLLECTION_NAME) + .collection(COLLECTION_NAME_CERTIFICATE) .update(record.id!, { deleted: dayjs.utc().format("YYYY-MM-DD HH:mm:ss") }); return true; }; diff --git a/ui/src/repository/settings.ts b/ui/src/repository/settings.ts index 8985ae0f..df9bafe0 100644 --- a/ui/src/repository/settings.ts +++ b/ui/src/repository/settings.ts @@ -1,13 +1,11 @@ import { ClientResponseError } from "pocketbase"; import { type SettingsModel, type SettingsNames } from "@/domain/settings"; -import { getPocketBase } from "./_pocketbase"; - -const COLLECTION_NAME = "settings"; +import { COLLECTION_NAME_SETTINGS, getPocketBase } from "./_pocketbase"; export const get = async >(name: SettingsNames) => { try { - const resp = await getPocketBase().collection(COLLECTION_NAME).getFirstListItem>(`name='${name}'`, { + const resp = await getPocketBase().collection(COLLECTION_NAME_SETTINGS).getFirstListItem>(`name='${name}'`, { requestKey: null, }); return resp; @@ -25,8 +23,8 @@ export const get = async >(name: SettingsNames) = export const save = async >(record: MaybeModelRecordWithId>) => { if (record.id) { - return await getPocketBase().collection(COLLECTION_NAME).update>(record.id, record); + return await getPocketBase().collection(COLLECTION_NAME_SETTINGS).update>(record.id, record); } - return await getPocketBase().collection(COLLECTION_NAME).create>(record); + return await getPocketBase().collection(COLLECTION_NAME_SETTINGS).create>(record); }; diff --git a/ui/src/repository/workflow.ts b/ui/src/repository/workflow.ts index 0241eecf..9b0f6ef5 100644 --- a/ui/src/repository/workflow.ts +++ b/ui/src/repository/workflow.ts @@ -1,9 +1,7 @@ import { type RecordListOptions, type RecordSubscription } from "pocketbase"; import { type WorkflowModel } from "@/domain/workflow"; -import { getPocketBase } from "./_pocketbase"; - -const COLLECTION_NAME = "workflow"; +import { COLLECTION_NAME_WORKFLOW, getPocketBase } from "./_pocketbase"; export type ListWorkflowRequest = { page?: number; @@ -26,11 +24,11 @@ export const list = async (request: ListWorkflowRequest) => { options.filter = pb.filter("enabled={:enabled}", { enabled: request.enabled }); } - return await pb.collection(COLLECTION_NAME).getList(page, perPage, options); + return await pb.collection(COLLECTION_NAME_WORKFLOW).getList(page, perPage, options); }; export const get = async (id: string) => { - return await getPocketBase().collection(COLLECTION_NAME).getOne(id, { + return await getPocketBase().collection(COLLECTION_NAME_WORKFLOW).getOne(id, { requestKey: null, }); }; @@ -38,21 +36,21 @@ export const get = async (id: string) => { export const save = async (record: MaybeModelRecord) => { if (record.id) { return await getPocketBase() - .collection(COLLECTION_NAME) + .collection(COLLECTION_NAME_WORKFLOW) .update(record.id as string, record); } - return await getPocketBase().collection(COLLECTION_NAME).create(record); + return await getPocketBase().collection(COLLECTION_NAME_WORKFLOW).create(record); }; export const remove = async (record: MaybeModelRecordWithId) => { - return await getPocketBase().collection(COLLECTION_NAME).delete(record.id); + return await getPocketBase().collection(COLLECTION_NAME_WORKFLOW).delete(record.id); }; export const subscribe = async (id: string, cb: (e: RecordSubscription) => void) => { - return getPocketBase().collection(COLLECTION_NAME).subscribe(id, cb); + return getPocketBase().collection(COLLECTION_NAME_WORKFLOW).subscribe(id, cb); }; export const unsubscribe = async (id: string) => { - return getPocketBase().collection(COLLECTION_NAME).unsubscribe(id); + return getPocketBase().collection(COLLECTION_NAME_WORKFLOW).unsubscribe(id); }; diff --git a/ui/src/repository/workflowRun.ts b/ui/src/repository/workflowRun.ts index cf681b70..4ede9a6f 100644 --- a/ui/src/repository/workflowRun.ts +++ b/ui/src/repository/workflowRun.ts @@ -2,9 +2,7 @@ import { type WorkflowRunModel } from "@/domain/workflowRun"; -import { getPocketBase } from "./_pocketbase"; - -const COLLECTION_NAME = "workflow_run"; +import { COLLECTION_NAME_WORKFLOW_RUN, getPocketBase } from "./_pocketbase"; export type ListWorkflowRunsRequest = { workflowId?: string; @@ -25,7 +23,7 @@ export const list = async (request: ListWorkflowRunsRequest) => { } return await getPocketBase() - .collection(COLLECTION_NAME) + .collection(COLLECTION_NAME_WORKFLOW_RUN) .getList(page, perPage, { filter: getPocketBase().filter(filter, params), sort: "-created", @@ -35,13 +33,13 @@ export const list = async (request: ListWorkflowRunsRequest) => { }; export const remove = async (record: MaybeModelRecordWithId) => { - return await getPocketBase().collection(COLLECTION_NAME).delete(record.id); + return await getPocketBase().collection(COLLECTION_NAME_WORKFLOW_RUN).delete(record.id); }; export const subscribe = async (id: string, cb: (e: RecordSubscription) => void) => { - return getPocketBase().collection(COLLECTION_NAME).subscribe(id, cb); + return getPocketBase().collection(COLLECTION_NAME_WORKFLOW_RUN).subscribe(id, cb); }; export const unsubscribe = async (id: string) => { - return getPocketBase().collection(COLLECTION_NAME).unsubscribe(id); + return getPocketBase().collection(COLLECTION_NAME_WORKFLOW_RUN).unsubscribe(id); };