feat: 背压问题

This commit is contained in:
手瓜一十雪
2025-05-07 17:14:57 +08:00
parent 0b7f126ce1
commit 303a74f8fd
2 changed files with 52 additions and 2 deletions

Binary file not shown.

View File

@@ -1,6 +1,7 @@
import { LogWrapper } from '@/common/log'; import { LogWrapper } from '@/common/log';
import * as net from 'net'; import * as net from 'net';
import * as process from 'process'; import * as process from 'process';
import { Writable } from 'stream';
/** /**
* 连接到命名管道并重定向stdout * 连接到命名管道并重定向stdout
@@ -25,12 +26,50 @@ export function connectToNamedPipe(logger: LogWrapper, timeoutMs: number = 5000)
}, timeoutMs); }, timeoutMs);
try { try {
let originalStdoutWrite = process.stdout.write.bind(process.stdout); const originalStdoutWrite = process.stdout.write.bind(process.stdout);
const pipeSocket = net.connect(pipePath, () => { const pipeSocket = net.connect(pipePath, () => {
// 清除超时 // 清除超时
clearTimeout(timeoutId); clearTimeout(timeoutId);
// 优化网络性能设置
pipeSocket.setNoDelay(true); // 减少延迟
// 设置更高的高水位线,允许更多数据缓冲
logger.log(`[StdOut] 已重定向到命名管道: ${pipePath}`); logger.log(`[StdOut] 已重定向到命名管道: ${pipePath}`);
// 创建拥有更优雅背压处理的 Writable 流
const pipeWritable = new Writable({
highWaterMark: 1024 * 64, // 64KB 高水位线
write(chunk, encoding, callback) {
if (!pipeSocket.writable) {
// 如果管道不可写退回到原始stdout
logger.log('[StdOut] 管道不可写,回退到控制台输出');
return originalStdoutWrite(chunk, encoding, callback);
}
// 尝试写入数据到管道
const canContinue = pipeSocket.write(chunk, encoding, () => {
// 数据已被发送或放入内部缓冲区
});
if (canContinue) {
// 如果返回true表示可以继续写入更多数据
// 立即通知写入流可以继续
process.nextTick(callback);
} else {
// 如果返回false表示内部缓冲区已满
// 等待drain事件再恢复写入
pipeSocket.once('drain', () => {
callback();
});
}
// 明确返回true表示写入已处理
return true;
}
});
// 重定向stdout
process.stdout.write = ( process.stdout.write = (
chunk: any, chunk: any,
encoding?: BufferEncoding | (() => void), encoding?: BufferEncoding | (() => void),
@@ -40,8 +79,11 @@ export function connectToNamedPipe(logger: LogWrapper, timeoutMs: number = 5000)
cb = encoding; cb = encoding;
encoding = undefined; encoding = undefined;
} }
return pipeSocket.write(chunk, encoding as BufferEncoding, cb);
// 使用优化的writable流处理写入
return pipeWritable.write(chunk, encoding as BufferEncoding, cb as () => void);
}; };
// 提供断开连接的方法 // 提供断开连接的方法
const disconnect = () => { const disconnect = () => {
process.stdout.write = originalStdoutWrite; process.stdout.write = originalStdoutWrite;
@@ -53,6 +95,7 @@ export function connectToNamedPipe(logger: LogWrapper, timeoutMs: number = 5000)
resolve({ disconnect }); resolve({ disconnect });
}); });
// 管道错误处理
pipeSocket.on('error', (err) => { pipeSocket.on('error', (err) => {
clearTimeout(timeoutId); clearTimeout(timeoutId);
process.stdout.write = originalStdoutWrite; process.stdout.write = originalStdoutWrite;
@@ -60,11 +103,18 @@ export function connectToNamedPipe(logger: LogWrapper, timeoutMs: number = 5000)
reject(err); reject(err);
}); });
// 管道关闭处理
pipeSocket.on('end', () => { pipeSocket.on('end', () => {
process.stdout.write = originalStdoutWrite; process.stdout.write = originalStdoutWrite;
logger.log('命名管道连接已关闭'); logger.log('命名管道连接已关闭');
}); });
// 确保在连接意外关闭时恢复stdout
pipeSocket.on('close', () => {
process.stdout.write = originalStdoutWrite;
logger.log('命名管道连接已关闭');
});
} catch (error) { } catch (error) {
clearTimeout(timeoutId); clearTimeout(timeoutId);
logger.log(`尝试连接命名管道 ${pipePath} 时发生异常:`, error); logger.log(`尝试连接命名管道 ${pipePath} 时发生异常:`, error);