From d3405edd42f0a2a91326c61093f4bf1ace923d36 Mon Sep 17 00:00:00 2001 From: pk5ls20 Date: Fri, 25 Oct 2024 05:11:10 +0800 Subject: [PATCH] refactor: packet highway & etc, kill some todo --- src/core/apis/packet.ts | 2 +- src/core/packet/highway/client.ts | 2 +- src/core/packet/highway/session.ts | 9 ++- src/core/packet/highway/uploader.ts | 89 +++++++++++++++++------------ src/core/packet/msg/element.ts | 10 ++-- src/core/packet/packer.ts | 4 +- 6 files changed, 66 insertions(+), 50 deletions(-) diff --git a/src/core/apis/packet.ts b/src/core/apis/packet.ts index b51bbff2..99982318 100644 --- a/src/core/apis/packet.ts +++ b/src/core/apis/packet.ts @@ -150,7 +150,7 @@ export class NTQQPacketApi { } } } - return Promise.all(reqList); // TODO: use promise.allSettled + return Promise.allSettled(reqList); } async sendUploadForwardMsg(msg: PacketMsg[], groupUin: number = 0) { diff --git a/src/core/packet/highway/client.ts b/src/core/packet/highway/client.ts index dcf4ca5c..4c788568 100644 --- a/src/core/packet/highway/client.ts +++ b/src/core/packet/highway/client.ts @@ -36,7 +36,7 @@ export class PacketHighwayClient { this.port = port; } - private buildDataUpTrans(cmd: number, data: ReadStream, fileSize: number, md5: Uint8Array, extendInfo: Uint8Array, timeout: number = 3600): PacketHighwayTrans { + private buildDataUpTrans(cmd: number, data: ReadStream, fileSize: number, md5: Uint8Array, extendInfo: Uint8Array, timeout: number = 1200): PacketHighwayTrans { return { uin: this.sig.uin, cmd: cmd, diff --git a/src/core/packet/highway/session.ts b/src/core/packet/highway/session.ts index e06768d7..d66df215 100644 --- a/src/core/packet/highway/session.ts +++ b/src/core/packet/highway/session.ts @@ -59,7 +59,6 @@ export class PacketHighwaySession { private async checkAvailable() { if (!this.packetClient.available) { - this.logger.logError('[Highway] packetServer not available!'); throw new Error('packetServer不可用,请参照文档 https://napneko.github.io/config/advanced 检查packetServer状态或进行配置'); } if (this.sig.sigSession === null || this.sig.sessionKey === null) { @@ -96,7 +95,7 @@ export class PacketHighwaySession { async uploadImage(peer: Peer, img: PacketMsgPicElement): Promise { await this.checkAvailable(); if (peer.chatType === ChatType.KCHATTYPEGROUP) { - await this.uploadGroupImageReq(Number(peer.peerUid), img); + await this.uploadGroupImageReq(+peer.peerUid, img); } else if (peer.chatType === ChatType.KCHATTYPEC2C) { await this.uploadC2CImageReq(peer.peerUid, img); } else { @@ -107,7 +106,7 @@ export class PacketHighwaySession { async uploadVideo(peer: Peer, video: PacketMsgVideoElement): Promise { await this.checkAvailable(); if (peer.chatType === ChatType.KCHATTYPEGROUP) { - await this.uploadGroupVideoReq(Number(peer.peerUid), video); + await this.uploadGroupVideoReq(+peer.peerUid, video); } else if (peer.chatType === ChatType.KCHATTYPEC2C) { await this.uploadC2CVideoReq(peer.peerUid, video); } else { @@ -118,7 +117,7 @@ export class PacketHighwaySession { async uploadPtt(peer: Peer, ptt: PacketMsgPttElement): Promise { await this.checkAvailable(); if (peer.chatType === ChatType.KCHATTYPEGROUP) { - await this.uploadGroupPttReq(Number(peer.peerUid), ptt); + await this.uploadGroupPttReq(+peer.peerUid, ptt); } else if (peer.chatType === ChatType.KCHATTYPEC2C) { await this.uploadC2CPttReq(peer.peerUid, ptt); } else { @@ -129,7 +128,7 @@ export class PacketHighwaySession { async uploadFile(peer: Peer, file: PacketMsgFileElement): Promise { await this.checkAvailable(); if (peer.chatType === ChatType.KCHATTYPEGROUP) { - await this.uploadGroupFileReq(Number(peer.peerUid), file); + await this.uploadGroupFileReq(+peer.peerUid, file); } else if (peer.chatType === ChatType.KCHATTYPEC2C) { await this.uploadC2CFileReq(peer.peerUid, file); } else { diff --git a/src/core/packet/highway/uploader.ts b/src/core/packet/highway/uploader.ts index d19dd2f7..ae3c3fbb 100644 --- a/src/core/packet/highway/uploader.ts +++ b/src/core/packet/highway/uploader.ts @@ -19,11 +19,20 @@ abstract class HighwayUploader { this.logger = logger; } - encryptTransExt(key: Uint8Array) { + private encryptTransExt(key: Uint8Array) { if (!this.trans.encrypt) return; this.trans.ext = tea.encrypt(Buffer.from(this.trans.ext), Buffer.from(key)); } + protected timeout(): Promise { + return new Promise((_, reject) => { + setTimeout(() => { + reject(new Error(`[Highway] tcpUpload timeout after ${this.trans.timeout}s`)); + }, (this.trans.timeout ?? Infinity) * 1000 + ); + }) + } + buildPicUpHead(offset: number, bodyLength: number, bodyMd5: Uint8Array): Uint8Array { return new NapProtoMsg(ReqDataHighwayHead).encode({ msgBaseHead: { @@ -86,16 +95,18 @@ class HighwayTcpUploaderTransform extends stream.Transform { export class HighwayTcpUploader extends HighwayUploader { async upload(): Promise { - const highwayTransForm = new HighwayTcpUploaderTransform(this); - const upload = new Promise((resolve, _) => { + const controller = new AbortController(); + const { signal } = controller; + const upload = new Promise((resolve, reject) => { + const highwayTransForm = new HighwayTcpUploaderTransform(this); const socket = net.connect(this.trans.port, this.trans.server, () => { - this.trans.data.pipe(highwayTransForm).pipe(socket, { end: false }); + this.trans.data.pipe(highwayTransForm).pipe(socket, {end: false}); }); const handleRspHeader = (header: Buffer) => { const rsp = new NapProtoMsg(RespDataHighwayHead).decode(header); if (rsp.errorCode !== 0) { - // TODO: immediately reject promise if error code is not 0 - this.logger.logWarn(`[Highway] tcpUpload failed (code: ${rsp.errorCode})`); + socket.end(); + reject(new Error(`[Highway] tcpUpload failed (code=${rsp.errorCode})`)); } const percent = ((Number(rsp.msgSegHead?.dataOffset) + Number(rsp.msgSegHead?.dataLength)) / Number(rsp.msgSegHead?.filesize)).toFixed(2); this.logger.logDebug(`[Highway] tcpUpload ${rsp.errorCode} | ${percent} | ${Buffer.from(header).toString('hex')}`); @@ -106,49 +117,58 @@ export class HighwayTcpUploader extends HighwayUploader { } }; socket.on('data', (chunk: Buffer) => { - try { - const [head, _] = Frame.unpack(chunk); - handleRspHeader(head); - } catch (e) { - this.logger.logError(`[Highway] tcpUpload parse response error: ${e}`); + if (signal.aborted) { + socket.end(); + reject(new Error('Upload aborted due to timeout')); } + const [head, _] = Frame.unpack(chunk); + handleRspHeader(head); }); socket.on('close', () => { this.logger.logDebug('[Highway] tcpUpload socket closed.'); resolve(); }); socket.on('error', (err) => { - this.logger.logError('[Highway] tcpUpload socket.on error:', err); + socket.end(); + reject(new Error(`[Highway] tcpUpload socket.on error: ${err}`)); }); this.trans.data.on('error', (err) => { - this.logger.logError('[Highway] tcpUpload readable error:', err); socket.end(); + reject(new Error(`[Highway] tcpUpload readable error: ${err}`)); }); }); - const timeout = new Promise((_, reject) => { - setTimeout(() => { - reject(new Error(`[Highway] tcpUpload timeout after ${this.trans.timeout}s`)); - }, (this.trans.timeout ?? Infinity) * 1000 - ); + const timeout = this.timeout().then(() => { + controller.abort(); + throw new Error('Highway TCP Upload timed out'); }); await Promise.race([upload, timeout]); } } -// TODO: timeout impl export class HighwayHttpUploader extends HighwayUploader { async upload(): Promise { - let offset = 0; - for await (const chunk of this.trans.data) { - const block = chunk as Buffer; - try { - await this.uploadBlock(block, offset); - } catch (err) { - this.logger.logError(`[Highway] httpUpload Error uploading block at offset ${offset}: ${err}`); - throw err; + const controller = new AbortController(); + const { signal } = controller; + const upload = (async () => { + let offset = 0; + for await (const chunk of this.trans.data) { + if (signal.aborted) { + throw new Error('Upload aborted due to timeout'); + } + const block = chunk as Buffer; + try { + await this.uploadBlock(block, offset); + } catch (err) { + throw new Error(`[Highway] httpUpload Error uploading block at offset ${offset}: ${err}`) + } + offset += block.length; } - offset += block.length; - } + })(); + const timeout = this.timeout().then(() => { + controller.abort(); + throw new Error('Highway HTTP Upload timed out'); + }); + await Promise.race([upload, timeout]); } private async uploadBlock(block: Buffer, offset: number): Promise { @@ -159,10 +179,7 @@ export class HighwayHttpUploader extends HighwayUploader { const [head, body] = Frame.unpack(resp); const headData = new NapProtoMsg(RespDataHighwayHead).decode(head); this.logger.logDebug(`[Highway] httpUploadBlock: ${headData.errorCode} | ${headData.msgSegHead?.retCode} | ${headData.bytesRspExtendInfo} | ${head.toString('hex')} | ${body.toString('hex')}`); - if (headData.errorCode !== 0) { - // TODO: immediately throw error if error code is not 0 - this.logger.logError(`[Highway] httpUploadBlock failed (code=${headData.errorCode})`); - } + if (headData.errorCode !== 0) throw new Error(`[Highway] httpUploadBlock failed (code=${headData.errorCode})`); } private async httpPostHighwayContent(frame: Buffer, serverURL: string): Promise { @@ -178,12 +195,12 @@ export class HighwayHttpUploader extends HighwayUploader { }, }; const req = http.request(serverURL, options, (res) => { - let data = Buffer.alloc(0); + const data: Buffer[] = []; res.on('data', (chunk) => { - data = Buffer.concat([data, chunk]); + data.push(chunk); }); res.on('end', () => { - resolve(data); + resolve(Buffer.concat(data)); }); }); req.write(frame); diff --git a/src/core/packet/msg/element.ts b/src/core/packet/msg/element.ts index e96a0c30..e036c7f8 100644 --- a/src/core/packet/msg/element.ts +++ b/src/core/packet/msg/element.ts @@ -114,7 +114,7 @@ export class PacketMsgPicElement extends IPacketMsgElement { super(element); this.path = element.picElement.sourcePath; this.name = element.picElement.fileName; - this.size = Number(element.picElement.fileSize); + this.size = +element.picElement.fileSize; this.md5 = element.picElement.md5HexStr ?? ''; this.width = element.picElement.picWidth; this.height = element.picElement.picHeight; @@ -149,11 +149,11 @@ export class PacketMsgReplyElement extends IPacketMsgElement { constructor(element: SendReplyElement) { super(element); this.messageId = BigInt(element.replyElement.replayMsgId ?? 0); - this.messageSeq = Number(element.replyElement.replayMsgSeq ?? 0); - this.messageClientSeq = Number(element.replyElement.replyMsgClientSeq ?? 0); - this.targetUin = Number(element.replyElement.senderUin ?? 0); + this.messageSeq = +(element.replyElement.replayMsgSeq ?? 0); + this.messageClientSeq = +(element.replyElement.replyMsgClientSeq ?? 0); + this.targetUin = +(element.replyElement.senderUin ?? 0); this.targetUid = element.replyElement.senderUidStr ?? ''; - this.time = Number(element.replyElement.replyMsgTime ?? 0); + this.time = +(element.replyElement.replyMsgTime ?? 0); this.elems = []; // TODO: in replyElement.sourceMsgTextElems } diff --git a/src/core/packet/packer.ts b/src/core/packet/packer.ts index 3875ab55..6f99efa9 100644 --- a/src/core/packet/packer.ts +++ b/src/core/packet/packer.ts @@ -186,7 +186,7 @@ export class PacketPacker { uploadInfo: [ { fileInfo: { - fileSize: Number(img.size), + fileSize: +img.size, fileHash: img.md5, fileSha1: this.toHexStr(await calculateSha1(img.path)), fileName: img.name, @@ -254,7 +254,7 @@ export class PacketPacker { uploadInfo: [ { fileInfo: { - fileSize: Number(img.size), + fileSize: +img.size, fileHash: img.md5, fileSha1: this.toHexStr(await calculateSha1(img.path)), fileName: img.name,