diff --git a/package.json b/package.json index b6fdf3df..0b0cbbb3 100644 --- a/package.json +++ b/package.json @@ -66,7 +66,6 @@ "dependencies": { "@ffmpeg.wasm/core-mt": "^0.13.2", "express": "^5.0.0", - "piscina": "^4.7.0", "silk-wasm": "^3.6.1", "ws": "^8.18.0" } diff --git a/src/common/audio-worker.ts b/src/common/audio-worker.ts index 64d48039..c1984946 100644 --- a/src/common/audio-worker.ts +++ b/src/common/audio-worker.ts @@ -1,9 +1,20 @@ import { encode } from 'silk-wasm'; +import { parentPort } from 'worker_threads'; export interface EncodeArgs { input: ArrayBufferView | ArrayBuffer sampleRate: number } -export default async ({ input, sampleRate }: EncodeArgs) => { +export function recvTask(cb: (taskData: T) => Promise) { + parentPort?.on('message', async (taskData: T) => { + try { + let ret = await cb(taskData); + parentPort?.postMessage(ret); + } catch (error: unknown) { + parentPort?.postMessage({ error: (error as Error).message }); + } + }); +} +recvTask(async ({ input, sampleRate }) => { return await encode(input, sampleRate); -}; +}); \ No newline at end of file diff --git a/src/common/audio.ts b/src/common/audio.ts index 07a69fe3..190a8a4d 100644 --- a/src/common/audio.ts +++ b/src/common/audio.ts @@ -1,4 +1,3 @@ -import Piscina from 'piscina'; import fsPromise from 'fs/promises'; import path from 'node:path'; import { randomUUID } from 'crypto'; @@ -6,16 +5,16 @@ import { EncodeResult, getDuration, getWavFileInfo, isSilk, isWav } from 'silk-w import { LogWrapper } from '@/common/log'; import { EncodeArgs } from '@/common/audio-worker'; import { FFmpegService } from '@/common/ffmpeg'; +import { runTask } from './worker'; +import { fileURLToPath } from 'node:url'; const ALLOW_SAMPLE_RATE = [8000, 12000, 16000, 24000, 32000, 44100, 48000]; -async function getWorkerPath() { - return new URL(/* @vite-ignore */ './audio-worker.mjs', import.meta.url).href; +function getWorkerPath() { + //return new URL(/* @vite-ignore */ './audio-worker.mjs', import.meta.url).href; + return path.join(path.dirname(fileURLToPath(import.meta.url)), 'audio-worker.mjs'); } -const piscina = new Piscina({ - filename: await getWorkerPath(), -}); async function guessDuration(pttPath: string, logger: LogWrapper) { const pttFileInfo = await fsPromise.stat(pttPath); @@ -46,7 +45,7 @@ export async function encodeSilk(filePath: string, TEMP_DIR: string, logger: Log const { input, sampleRate } = isWav(file) ? await handleWavFile(file, filePath, pcmPath) : { input: await FFmpegService.convert(filePath, pcmPath), sampleRate: 24000 }; - const silk = await piscina.run({ input: input, sampleRate: sampleRate }); + const silk = await runTask(getWorkerPath(), { input: input, sampleRate: sampleRate }); fsPromise.unlink(pcmPath).catch((e) => logger.logError('删除临时文件失败', pcmPath, e)); await fsPromise.writeFile(pttPath, Buffer.from(silk.data)); logger.log(`语音文件${filePath}转换成功!`, pttPath, '时长:', silk.duration); diff --git a/src/common/ffmpeg-worker.ts b/src/common/ffmpeg-worker.ts index 38b0afa8..40228a8d 100644 --- a/src/common/ffmpeg-worker.ts +++ b/src/common/ffmpeg-worker.ts @@ -5,6 +5,17 @@ import { readFileSync, statSync, writeFileSync } from 'fs'; import type { VideoInfo } from './video'; import { fileTypeFromFile } from 'file-type'; import imageSize from 'image-size'; +import { parentPort } from 'worker_threads'; +export function recvTask(cb: (taskData: T) => Promise) { + parentPort?.on('message', async (taskData: T) => { + try { + let ret = await cb(taskData); + parentPort?.postMessage(ret); + } catch (error: unknown) { + parentPort?.postMessage({ error: (error as Error).message }); + } + }); +} class FFmpegService { public static async extractThumbnail(videoPath: string, thumbnailPath: string): Promise { const ffmpegInstance = await FFmpeg.create({ core: '@ffmpeg.wasm/core-mt' }); @@ -137,15 +148,18 @@ interface FFmpegTask { } export default async function handleFFmpegTask({ method, args }: FFmpegTask): Promise { switch (method) { - case 'extractThumbnail': - return await FFmpegService.extractThumbnail(...args as [string, string]); - case 'convertFile': - return await FFmpegService.convertFile(...args as [string, string, string]); - case 'convert': - return await FFmpegService.convert(...args as [string, string]); - case 'getVideoInfo': - return await FFmpegService.getVideoInfo(...args as [string, string]); - default: - throw new Error(`Unknown method: ${method}`); + case 'extractThumbnail': + return await FFmpegService.extractThumbnail(...args as [string, string]); + case 'convertFile': + return await FFmpegService.convertFile(...args as [string, string, string]); + case 'convert': + return await FFmpegService.convert(...args as [string, string]); + case 'getVideoInfo': + return await FFmpegService.getVideoInfo(...args as [string, string]); + default: + throw new Error(`Unknown method: ${method}`); } -} \ No newline at end of file +} +recvTask(async ({ method, args }: FFmpegTask) => { + return await handleFFmpegTask({ method, args }); +}); \ No newline at end of file diff --git a/src/common/ffmpeg.ts b/src/common/ffmpeg.ts index dbb543f4..737c761a 100644 --- a/src/common/ffmpeg.ts +++ b/src/common/ffmpeg.ts @@ -1,6 +1,8 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import Piscina from 'piscina'; import { VideoInfo } from './video'; +import path from 'path'; +import { fileURLToPath } from 'url'; +import { runTask } from './worker'; type EncodeArgs = { method: 'extractThumbnail' | 'convertFile' | 'convert' | 'getVideoInfo'; @@ -9,42 +11,26 @@ type EncodeArgs = { type EncodeResult = any; -async function getWorkerPath() { - return new URL(/* @vite-ignore */ './ffmpeg-worker.mjs', import.meta.url).href; +function getWorkerPath() { + return path.join(path.dirname(fileURLToPath(import.meta.url)), './ffmpeg-worker.mjs'); } export class FFmpegService { public static async extractThumbnail(videoPath: string, thumbnailPath: string): Promise { - const piscina = new Piscina({ - filename: await getWorkerPath(), - }); - await piscina.run({ method: 'extractThumbnail', args: [videoPath, thumbnailPath] }); - await piscina.destroy(); + await runTask(getWorkerPath(), { method: 'extractThumbnail', args: [videoPath, thumbnailPath] }); } public static async convertFile(inputFile: string, outputFile: string, format: string): Promise { - const piscina = new Piscina({ - filename: await getWorkerPath(), - }); - await piscina.run({ method: 'convertFile', args: [inputFile, outputFile, format] }); - await piscina.destroy(); + await runTask(getWorkerPath(), { method: 'convertFile', args: [inputFile, outputFile, format] }); } public static async convert(filePath: string, pcmPath: string): Promise { - const piscina = new Piscina({ - filename: await getWorkerPath(), - }); - const result = await piscina.run({ method: 'convert', args: [filePath, pcmPath] }); - await piscina.destroy(); + const result = await runTask(getWorkerPath(), { method: 'convert', args: [filePath, pcmPath] }); return result; } public static async getVideoInfo(videoPath: string, thumbnailPath: string): Promise { - const piscina = new Piscina({ - filename: await getWorkerPath(), - }); - const result = await piscina.run({ method: 'getVideoInfo', args: [videoPath, thumbnailPath] }); - await piscina.destroy(); + const result = await await runTask(getWorkerPath(), { method: 'getVideoInfo', args: [videoPath, thumbnailPath] }); return result; } } diff --git a/src/common/worker.ts b/src/common/worker.ts new file mode 100644 index 00000000..f14ea3bb --- /dev/null +++ b/src/common/worker.ts @@ -0,0 +1,29 @@ +import { Worker } from 'worker_threads'; + +export async function runTask(workerScript: string, taskData: T): Promise { + let worker = new Worker(workerScript); + try { + return await new Promise((resolve, reject) => { + worker.on('message', (result: R) => { + resolve(result); + }); + + worker.on('error', (error) => { + reject(new Error(`Worker error: ${error.message}`)); + }); + + worker.on('exit', (code) => { + if (code !== 0) { + reject(new Error(`Worker stopped with exit code ${code}`)); + } + }); + worker.postMessage(taskData); + }); + } catch (error: unknown) { + throw new Error(`Failed to run task: ${(error as Error).message}`); + } finally { + // Ensure the worker is terminated after the promise is settled + worker.terminate(); + } +} + diff --git a/vite.config.ts b/vite.config.ts index 9ff471cf..9d9429b6 100644 --- a/vite.config.ts +++ b/vite.config.ts @@ -8,8 +8,7 @@ const external = [ 'silk-wasm', 'ws', 'express', - '@ffmpeg.wasm/core-mt', - 'piscina' + '@ffmpeg.wasm/core-mt' ]; const nodeModules = [...builtinModules, builtinModules.map((m) => `node:${m}`)].flat();