refactor: packet highway & etc, kill some todo

This commit is contained in:
pk5ls20 2024-10-25 05:11:10 +08:00
parent 698947ed97
commit d3405edd42
No known key found for this signature in database
GPG Key ID: 6370ED7A169F493A
6 changed files with 66 additions and 50 deletions

View File

@ -150,7 +150,7 @@ export class NTQQPacketApi {
}
}
}
return Promise.all(reqList); // TODO: use promise.allSettled
return Promise.allSettled(reqList);
}
async sendUploadForwardMsg(msg: PacketMsg[], groupUin: number = 0) {

View File

@ -36,7 +36,7 @@ export class PacketHighwayClient {
this.port = port;
}
private buildDataUpTrans(cmd: number, data: ReadStream, fileSize: number, md5: Uint8Array, extendInfo: Uint8Array, timeout: number = 3600): PacketHighwayTrans {
private buildDataUpTrans(cmd: number, data: ReadStream, fileSize: number, md5: Uint8Array, extendInfo: Uint8Array, timeout: number = 1200): PacketHighwayTrans {
return {
uin: this.sig.uin,
cmd: cmd,

View File

@ -59,7 +59,6 @@ export class PacketHighwaySession {
private async checkAvailable() {
if (!this.packetClient.available) {
this.logger.logError('[Highway] packetServer not available!');
throw new Error('packetServer不可用请参照文档 https://napneko.github.io/config/advanced 检查packetServer状态或进行配置');
}
if (this.sig.sigSession === null || this.sig.sessionKey === null) {
@ -96,7 +95,7 @@ export class PacketHighwaySession {
async uploadImage(peer: Peer, img: PacketMsgPicElement): Promise<void> {
await this.checkAvailable();
if (peer.chatType === ChatType.KCHATTYPEGROUP) {
await this.uploadGroupImageReq(Number(peer.peerUid), img);
await this.uploadGroupImageReq(+peer.peerUid, img);
} else if (peer.chatType === ChatType.KCHATTYPEC2C) {
await this.uploadC2CImageReq(peer.peerUid, img);
} else {
@ -107,7 +106,7 @@ export class PacketHighwaySession {
async uploadVideo(peer: Peer, video: PacketMsgVideoElement): Promise<void> {
await this.checkAvailable();
if (peer.chatType === ChatType.KCHATTYPEGROUP) {
await this.uploadGroupVideoReq(Number(peer.peerUid), video);
await this.uploadGroupVideoReq(+peer.peerUid, video);
} else if (peer.chatType === ChatType.KCHATTYPEC2C) {
await this.uploadC2CVideoReq(peer.peerUid, video);
} else {
@ -118,7 +117,7 @@ export class PacketHighwaySession {
async uploadPtt(peer: Peer, ptt: PacketMsgPttElement): Promise<void> {
await this.checkAvailable();
if (peer.chatType === ChatType.KCHATTYPEGROUP) {
await this.uploadGroupPttReq(Number(peer.peerUid), ptt);
await this.uploadGroupPttReq(+peer.peerUid, ptt);
} else if (peer.chatType === ChatType.KCHATTYPEC2C) {
await this.uploadC2CPttReq(peer.peerUid, ptt);
} else {
@ -129,7 +128,7 @@ export class PacketHighwaySession {
async uploadFile(peer: Peer, file: PacketMsgFileElement): Promise<void> {
await this.checkAvailable();
if (peer.chatType === ChatType.KCHATTYPEGROUP) {
await this.uploadGroupFileReq(Number(peer.peerUid), file);
await this.uploadGroupFileReq(+peer.peerUid, file);
} else if (peer.chatType === ChatType.KCHATTYPEC2C) {
await this.uploadC2CFileReq(peer.peerUid, file);
} else {

View File

@ -19,11 +19,20 @@ abstract class HighwayUploader {
this.logger = logger;
}
encryptTransExt(key: Uint8Array) {
private encryptTransExt(key: Uint8Array) {
if (!this.trans.encrypt) return;
this.trans.ext = tea.encrypt(Buffer.from(this.trans.ext), Buffer.from(key));
}
protected timeout(): Promise<void> {
return new Promise<void>((_, reject) => {
setTimeout(() => {
reject(new Error(`[Highway] tcpUpload timeout after ${this.trans.timeout}s`));
}, (this.trans.timeout ?? Infinity) * 1000
);
})
}
buildPicUpHead(offset: number, bodyLength: number, bodyMd5: Uint8Array): Uint8Array {
return new NapProtoMsg(ReqDataHighwayHead).encode({
msgBaseHead: {
@ -86,16 +95,18 @@ class HighwayTcpUploaderTransform extends stream.Transform {
export class HighwayTcpUploader extends HighwayUploader {
async upload(): Promise<void> {
const highwayTransForm = new HighwayTcpUploaderTransform(this);
const upload = new Promise<void>((resolve, _) => {
const controller = new AbortController();
const { signal } = controller;
const upload = new Promise<void>((resolve, reject) => {
const highwayTransForm = new HighwayTcpUploaderTransform(this);
const socket = net.connect(this.trans.port, this.trans.server, () => {
this.trans.data.pipe(highwayTransForm).pipe(socket, { end: false });
this.trans.data.pipe(highwayTransForm).pipe(socket, {end: false});
});
const handleRspHeader = (header: Buffer) => {
const rsp = new NapProtoMsg(RespDataHighwayHead).decode(header);
if (rsp.errorCode !== 0) {
// TODO: immediately reject promise if error code is not 0
this.logger.logWarn(`[Highway] tcpUpload failed (code: ${rsp.errorCode})`);
socket.end();
reject(new Error(`[Highway] tcpUpload failed (code=${rsp.errorCode})`));
}
const percent = ((Number(rsp.msgSegHead?.dataOffset) + Number(rsp.msgSegHead?.dataLength)) / Number(rsp.msgSegHead?.filesize)).toFixed(2);
this.logger.logDebug(`[Highway] tcpUpload ${rsp.errorCode} | ${percent} | ${Buffer.from(header).toString('hex')}`);
@ -106,49 +117,58 @@ export class HighwayTcpUploader extends HighwayUploader {
}
};
socket.on('data', (chunk: Buffer) => {
try {
const [head, _] = Frame.unpack(chunk);
handleRspHeader(head);
} catch (e) {
this.logger.logError(`[Highway] tcpUpload parse response error: ${e}`);
if (signal.aborted) {
socket.end();
reject(new Error('Upload aborted due to timeout'));
}
const [head, _] = Frame.unpack(chunk);
handleRspHeader(head);
});
socket.on('close', () => {
this.logger.logDebug('[Highway] tcpUpload socket closed.');
resolve();
});
socket.on('error', (err) => {
this.logger.logError('[Highway] tcpUpload socket.on error:', err);
socket.end();
reject(new Error(`[Highway] tcpUpload socket.on error: ${err}`));
});
this.trans.data.on('error', (err) => {
this.logger.logError('[Highway] tcpUpload readable error:', err);
socket.end();
reject(new Error(`[Highway] tcpUpload readable error: ${err}`));
});
});
const timeout = new Promise<void>((_, reject) => {
setTimeout(() => {
reject(new Error(`[Highway] tcpUpload timeout after ${this.trans.timeout}s`));
}, (this.trans.timeout ?? Infinity) * 1000
);
const timeout = this.timeout().then(() => {
controller.abort();
throw new Error('Highway TCP Upload timed out');
});
await Promise.race([upload, timeout]);
}
}
// TODO: timeout impl
export class HighwayHttpUploader extends HighwayUploader {
async upload(): Promise<void> {
let offset = 0;
for await (const chunk of this.trans.data) {
const block = chunk as Buffer;
try {
await this.uploadBlock(block, offset);
} catch (err) {
this.logger.logError(`[Highway] httpUpload Error uploading block at offset ${offset}: ${err}`);
throw err;
const controller = new AbortController();
const { signal } = controller;
const upload = (async () => {
let offset = 0;
for await (const chunk of this.trans.data) {
if (signal.aborted) {
throw new Error('Upload aborted due to timeout');
}
const block = chunk as Buffer;
try {
await this.uploadBlock(block, offset);
} catch (err) {
throw new Error(`[Highway] httpUpload Error uploading block at offset ${offset}: ${err}`)
}
offset += block.length;
}
offset += block.length;
}
})();
const timeout = this.timeout().then(() => {
controller.abort();
throw new Error('Highway HTTP Upload timed out');
});
await Promise.race([upload, timeout]);
}
private async uploadBlock(block: Buffer, offset: number): Promise<void> {
@ -159,10 +179,7 @@ export class HighwayHttpUploader extends HighwayUploader {
const [head, body] = Frame.unpack(resp);
const headData = new NapProtoMsg(RespDataHighwayHead).decode(head);
this.logger.logDebug(`[Highway] httpUploadBlock: ${headData.errorCode} | ${headData.msgSegHead?.retCode} | ${headData.bytesRspExtendInfo} | ${head.toString('hex')} | ${body.toString('hex')}`);
if (headData.errorCode !== 0) {
// TODO: immediately throw error if error code is not 0
this.logger.logError(`[Highway] httpUploadBlock failed (code=${headData.errorCode})`);
}
if (headData.errorCode !== 0) throw new Error(`[Highway] httpUploadBlock failed (code=${headData.errorCode})`);
}
private async httpPostHighwayContent(frame: Buffer, serverURL: string): Promise<Buffer> {
@ -178,12 +195,12 @@ export class HighwayHttpUploader extends HighwayUploader {
},
};
const req = http.request(serverURL, options, (res) => {
let data = Buffer.alloc(0);
const data: Buffer[] = [];
res.on('data', (chunk) => {
data = Buffer.concat([data, chunk]);
data.push(chunk);
});
res.on('end', () => {
resolve(data);
resolve(Buffer.concat(data));
});
});
req.write(frame);

View File

@ -114,7 +114,7 @@ export class PacketMsgPicElement extends IPacketMsgElement<SendPicElement> {
super(element);
this.path = element.picElement.sourcePath;
this.name = element.picElement.fileName;
this.size = Number(element.picElement.fileSize);
this.size = +element.picElement.fileSize;
this.md5 = element.picElement.md5HexStr ?? '';
this.width = element.picElement.picWidth;
this.height = element.picElement.picHeight;
@ -149,11 +149,11 @@ export class PacketMsgReplyElement extends IPacketMsgElement<SendReplyElement> {
constructor(element: SendReplyElement) {
super(element);
this.messageId = BigInt(element.replyElement.replayMsgId ?? 0);
this.messageSeq = Number(element.replyElement.replayMsgSeq ?? 0);
this.messageClientSeq = Number(element.replyElement.replyMsgClientSeq ?? 0);
this.targetUin = Number(element.replyElement.senderUin ?? 0);
this.messageSeq = +(element.replyElement.replayMsgSeq ?? 0);
this.messageClientSeq = +(element.replyElement.replyMsgClientSeq ?? 0);
this.targetUin = +(element.replyElement.senderUin ?? 0);
this.targetUid = element.replyElement.senderUidStr ?? '';
this.time = Number(element.replyElement.replyMsgTime ?? 0);
this.time = +(element.replyElement.replyMsgTime ?? 0);
this.elems = []; // TODO: in replyElement.sourceMsgTextElems
}

View File

@ -186,7 +186,7 @@ export class PacketPacker {
uploadInfo: [
{
fileInfo: {
fileSize: Number(img.size),
fileSize: +img.size,
fileHash: img.md5,
fileSha1: this.toHexStr(await calculateSha1(img.path)),
fileName: img.name,
@ -254,7 +254,7 @@ export class PacketPacker {
uploadInfo: [
{
fileInfo: {
fileSize: Number(img.size),
fileSize: +img.size,
fileHash: img.md5,
fileSha1: this.toHexStr(await calculateSha1(img.path)),
fileName: img.name,