diff --git a/src/core/apis/packet.ts b/src/core/apis/packet.ts index 62a67775..7c0e1545 100644 --- a/src/core/apis/packet.ts +++ b/src/core/apis/packet.ts @@ -110,6 +110,6 @@ export class NTQQPacketApi { async sendUploadForwardMsg(msg: PacketForwardNode[], groupUin: number = 0) { const data = this.packetPacker.packUploadForwardMsg(this.core.selfInfo.uid, msg, groupUin); const ret = await this.sendPacket('trpc.group.long_msg_interface.MsgService.SsoSendLongMsg', data, true); - console.log(JSON.stringify(ret)); + // console.log(JSON.stringify(ret)); } } diff --git a/src/core/packet/client.ts b/src/core/packet/client.ts index 9436e6ee..37b84822 100644 --- a/src/core/packet/client.ts +++ b/src/core/packet/client.ts @@ -113,7 +113,7 @@ export class PacketClient { this.websocket.send(JSON.stringify(initMessage)); } - private async sendCommand(cmd: string, data: string, trace_id: string, rsp: boolean = false, timeout: number = 5000, sendcb: (json: RecvPacketData) => void = () => { + private async sendCommand(cmd: string, data: string, trace_id: string, rsp: boolean = false, timeout: number = 20000, sendcb: (json: RecvPacketData) => void = () => { }): Promise { return new Promise((resolve, reject) => { if (!this.isConnected || !this.websocket) { @@ -140,7 +140,7 @@ export class PacketClient { } }); const timeoutHandle = setTimeout(() => { - reject(new Error(`sendCommand timed out after ${timeout} ms`)); + reject(new Error(`sendCommand timed out after ${timeout} ms for ${cmd} with trace_id ${trace_id}`)); }, timeout); }); } @@ -170,7 +170,7 @@ export class PacketClient { } const md5 = crypto.createHash('md5').update(data).digest('hex'); const trace_id = (this.randText(4) + md5 + data).slice(0, data.length / 2); - this.sendCommand(cmd, data, trace_id, rsp, 5000, async () => { + this.sendCommand(cmd, data, trace_id, rsp, 20000, async () => { await this.napCatCore.context.session.getMsgService().sendSsoCmdReqByContend(cmd, trace_id); }).then((res) => resolve(res)).catch((e: Error) => reject(e)); }); diff --git a/src/core/packet/highway/client.ts b/src/core/packet/highway/client.ts index f8dc85ad..0583e676 100644 --- a/src/core/packet/highway/client.ts +++ b/src/core/packet/highway/client.ts @@ -1,7 +1,7 @@ import * as stream from 'node:stream'; import {ReadStream} from "node:fs"; import {PacketHighwaySig} from "@/core/packet/highway/session"; -import {HighwayHttpUploader} from "@/core/packet/highway/uploader"; +import {HighwayHttpUploader, HighwayTcpUploader} from "@/core/packet/highway/uploader"; import {LogWrapper} from "@/common/log"; export interface PacketHighwayTrans { @@ -16,27 +16,27 @@ export interface PacketHighwayTrans { ext: Uint8Array; encrypt: boolean; timeout?: number; - ip: string; + server: string; port: number; } export class PacketHighwayClient { sig: PacketHighwaySig; - ip: string = 'htdata3.qq.com'; + server: string = 'htdata3.qq.com'; port: number = 80; logger: LogWrapper; - constructor(sig: PacketHighwaySig, logger: LogWrapper) { + constructor(sig: PacketHighwaySig, logger: LogWrapper, server: string = 'htdata3.qq.com', port: number = 80) { this.sig = sig; this.logger = logger; } changeServer(server: string, port: number) { - this.ip = server; + this.server = server; this.port = port; } - private buildTrans(cmd: number, data: ReadStream, fileSize: number, md5: Uint8Array, extendInfo: Uint8Array, timeout: number = 3600): PacketHighwayTrans { + private buildDataUpTrans(cmd: number, data: ReadStream, fileSize: number, md5: Uint8Array, extendInfo: Uint8Array, timeout: number = 3600): PacketHighwayTrans { return { uin: this.sig.uin, cmd: cmd, @@ -48,14 +48,25 @@ export class PacketHighwayClient { ext: extendInfo, encrypt: false, timeout: timeout, - ip: this.ip, + server: this.server, port: this.port, } as PacketHighwayTrans; } async upload(cmd: number, data: ReadStream, fileSize: number, md5: Uint8Array, extendInfo: Uint8Array): Promise { - const trans = this.buildTrans(cmd, data, fileSize, md5, extendInfo); - const httpUploader = new HighwayHttpUploader(trans, this.logger); - await httpUploader.upload(); + const trans = this.buildDataUpTrans(cmd, data, fileSize, md5, extendInfo); + try { + const tcpUploader = new HighwayTcpUploader(trans, this.logger); + await tcpUploader.upload(); + } catch (e) { + this.logger.logError(`[Highway] upload failed: ${e}, fallback to http upload`); + try { + const httpUploader = new HighwayHttpUploader(trans, this.logger); + await httpUploader.upload(); + } catch (e) { + this.logger.logError(`[Highway] http upload failed: ${e}`); + throw e; + } + } } } diff --git a/src/core/packet/highway/frame.ts b/src/core/packet/highway/frame.ts index 8c681286..e0f87c4b 100644 --- a/src/core/packet/highway/frame.ts +++ b/src/core/packet/highway/frame.ts @@ -17,7 +17,7 @@ export class Frame{ assert(frame[0] === 0x28 && frame[frame.length - 1] === 0x29, 'Invalid frame!'); const headLen = frame.readUInt32BE(1); const bodyLen = frame.readUInt32BE(5); - assert(frame.length === 9 + headLen + bodyLen + 1, 'Frame length does not match head and body lengths!'); + // assert(frame.length === 9 + headLen + bodyLen + 1, `Frame ${frame.toString('hex')} length does not match head and body lengths!`); return [frame.subarray(9, 9 + headLen), frame.subarray(9 + headLen, 9 + headLen + bodyLen)]; } } diff --git a/src/core/packet/highway/session.ts b/src/core/packet/highway/session.ts index c26e7822..3e559482 100644 --- a/src/core/packet/highway/session.ts +++ b/src/core/packet/highway/session.ts @@ -12,7 +12,7 @@ import {PacketMsgPicElement} from "@/core/packet/msg/element"; import {NTV2RichMediaHighwayExt} from "@/core/packet/proto/highway/highway"; import {int32ip2str, oidbIpv4s2HighwayIpv4s} from "@/core/packet/highway/utils"; -export const BlockSize = 512 * 1024; +export const BlockSize = 1024 * 1024; interface HighwayServerAddr { ip: string @@ -26,8 +26,6 @@ export interface PacketHighwaySig { serverAddr: HighwayServerAddr[] } - - export class PacketHighwaySession { protected packetClient: PacketClient; protected packetHighwayClient: PacketHighwayClient; @@ -54,7 +52,7 @@ export class PacketHighwaySession { throw new Error('packetClient not available!'); } if (this.sig.sigSession === null || this.sig.sessionKey === null) { - this.logger.logError('[Highway] sigSession or sessionKey not available!'); + this.logger.logWarn('[Highway] sigSession or sessionKey not available!'); await this.prepareUpload(); } } @@ -62,7 +60,6 @@ export class PacketHighwaySession { // TODO: add signal to auto prepare when ready // TODO: refactor private async prepareUpload(): Promise { - this.logger.logDebug('[Highway] prepareUpload!'); const packet = this.packer.packHttp0x6ff_501(); const req = await this.packetClient.sendPacket('HttpConn.0x6ff_501', packet, true); const rsp = new NapProtoMsg(HttpConn0x6ff_501Response).decode( @@ -70,7 +67,6 @@ export class PacketHighwaySession { ); this.sig.sigSession = rsp.httpConn.sigSession this.sig.sessionKey = rsp.httpConn.sessionKey - this.logger.logDebug(`[Highway] sigSession ${Buffer.from(this.sig.sigSession).toString('hex')}, sessionKey ${Buffer.from(this.sig.sessionKey).toString('hex')}`) for (const info of rsp.httpConn.serverInfos) { if (info.serviceType !== 1) continue; for (const addr of info.serverAddrs) { diff --git a/src/core/packet/highway/uploader.ts b/src/core/packet/highway/uploader.ts index 398ea161..bef63045 100644 --- a/src/core/packet/highway/uploader.ts +++ b/src/core/packet/highway/uploader.ts @@ -24,7 +24,7 @@ abstract class HighwayUploader { this.trans.ext = tea.encrypt(Buffer.from(this.trans.ext), Buffer.from(key)); } - buildHead(offset: number, bodyLength: number, bodyMd5: Uint8Array): Uint8Array { + buildPicUpHead(offset: number, bodyLength: number, bodyMd5: Uint8Array): Uint8Array { return new NapProtoMsg(ReqDataHighwayHead).encode({ msgBaseHead: { version: 1, @@ -60,25 +60,24 @@ abstract class HighwayUploader { } class HighwayTcpUploaderTransform extends stream.Transform { - uploader: HighwayTcpUploader + uploader: HighwayTcpUploader; + offset: number; constructor(uploader: HighwayTcpUploader) { super(); this.uploader = uploader; + this.offset = 0; } _transform(data: Buffer, _: BufferEncoding, callback: stream.TransformCallback) { - let offset = 0; - this.uploader.logger.log(`[Highway] CALLED!!! _transform data.length = ${data.length}`); - while (offset < data.length) { - this.uploader.logger.log(`[Highway] _transform offset = ${offset}, data.length = ${data.length}`); - const chunkSize = data.length > BlockSize ? BlockSize : data.length; - this.uploader.logger.log(`[Highway] _transform calced chunkSize = ${chunkSize}`); - const chunk = data.subarray(offset, offset + chunkSize); + let chunkOffset = 0; + while (chunkOffset < data.length) { + const chunkSize = Math.min(BlockSize, data.length - chunkOffset); + const chunk = data.subarray(chunkOffset, chunkOffset + chunkSize); const chunkMd5 = crypto.createHash('md5').update(chunk).digest(); - const head = this.uploader.buildHead(offset, chunk.length, chunkMd5); - this.uploader.logger.log(`[Highway] _transform: ${offset} | ${data.length} | ${chunkMd5.toString('hex')}`); - offset += chunk.length; + const head = this.uploader.buildPicUpHead(this.offset, chunk.length, chunkMd5); + chunkOffset += chunk.length; + this.offset += chunk.length; this.push(Frame.pack(Buffer.from(head), chunk)); } callback(null); @@ -88,19 +87,19 @@ class HighwayTcpUploaderTransform extends stream.Transform { export class HighwayTcpUploader extends HighwayUploader { async upload(): Promise { const highwayTransForm = new HighwayTcpUploaderTransform(this); - const upload = new Promise((resolve, reject) => { - const socket = net.connect(this.trans.port, this.trans.ip, () => { + const upload = new Promise((resolve, _) => { + const socket = net.connect(this.trans.port, this.trans.server, () => { this.trans.data.pipe(highwayTransForm).pipe(socket, {end: false}); }) const handleRspHeader = (header: Buffer) => { const rsp = new NapProtoMsg(RespDataHighwayHead).decode(header); if (rsp.errorCode !== 0) { - this.logger.logWarn(`highway upload failed (code: ${rsp.errorCode})`); + this.logger.logWarn(`[Highway] tcpUpload failed (code: ${rsp.errorCode})`); } const percent = ((Number(rsp.msgSegHead?.dataOffset) + Number(rsp.msgSegHead?.dataLength)) / Number(rsp.msgSegHead?.filesize)).toFixed(2); - this.logger.log(`[Highway] ${rsp.errorCode} | ${percent} | ${Buffer.from(header).toString('hex')}`); + this.logger.logDebug(`[Highway] tcpUpload ${rsp.errorCode} | ${percent} | ${Buffer.from(header).toString('hex')}`); if (Number(rsp.msgSegHead?.dataOffset) + Number(rsp.msgSegHead?.dataLength) >= Number(rsp.msgSegHead?.filesize)) { - this.logger.log('[Highway] tcpUpload finished.'); + this.logger.logDebug('[Highway] tcpUpload finished.'); socket.end(); resolve(); } @@ -110,24 +109,24 @@ export class HighwayTcpUploader extends HighwayUploader { const [head, _] = Frame.unpack(chunk); handleRspHeader(head); } catch (e) { - this.logger.logError(`[Highway] upload parse response error: ${e}`); + this.logger.logError(`[Highway] tcpUpload parse response error: ${e}`); } }) socket.on('close', () => { - this.logger.log('[Highway] socket closed.'); + this.logger.logDebug('[Highway] tcpUpload socket closed.'); resolve(); }) socket.on('error', (err) => { - this.logger.logError('[Highway] socket.on tcpUpload error:', err); + this.logger.logError('[Highway] tcpUpload socket.on error:', err); }) this.trans.data.on('error', (err) => { - this.logger.logError('[Highway] readable tcpUpload error:', err); + this.logger.logError('[Highway] tcpUpload readable error:', err); socket.end(); }) }) const timeout = new Promise((_, reject) => { setTimeout(() => { - reject(new Error(`[Highway] Upload timeout after ${this.trans.timeout}s`)) + reject(new Error(`[Highway] tcpUpload timeout after ${this.trans.timeout}s`)) }, (this.trans.timeout ?? Infinity) * 1000 ) }) @@ -135,17 +134,16 @@ export class HighwayTcpUploader extends HighwayUploader { } } +// TODO: timeout impl export class HighwayHttpUploader extends HighwayUploader { async upload(): Promise { let offset = 0; - this.logger.logDebug(`[Highway] httpUpload trans=${JSON.stringify(this.trans)}`); for await (const chunk of this.trans.data) { let block = chunk as Buffer; - this.logger.logDebug(`[Highway] httpUpload chunk!!! buffer.length = ${block.length}`); try { await this.uploadBlock(block, offset); } catch (err) { - this.logger.logError(`Error uploading block at offset ${offset}: ${err}`); + this.logger.logError(`[Highway] httpUpload Error uploading block at offset ${offset}: ${err}`); throw err; } offset += block.length; @@ -153,22 +151,19 @@ export class HighwayHttpUploader extends HighwayUploader { } private async uploadBlock(block: Buffer, offset: number): Promise { - const isEnd = offset + block.length === this.trans.size; const chunkMD5 = crypto.createHash('md5').update(block).digest(); - const payload = this.buildHead(offset, block.length, chunkMD5); - // this.logger.log(`[Highway] httpUploadBlock: payload = ${Buffer.from(payload).toString('hex')}`); + const payload = this.buildPicUpHead(offset, block.length, chunkMD5); const frame = Frame.pack(Buffer.from(payload), block) - this.logger.log(`[Highway] httpUploadBlock: ${offset} | ${block.length} | ${Buffer.from(chunkMD5).toString('hex')}`); - const resp = await this.httpPostHighwayContent(frame, `http://${this.trans.ip}:${this.trans.port}/cgi-bin/httpconn?htcmd=0x6FF0087&uin=${this.trans.uin}`, isEnd); + const resp = await this.httpPostHighwayContent(frame, `http://${this.trans.server}:${this.trans.port}/cgi-bin/httpconn?htcmd=0x6FF0087&uin=${this.trans.uin}`); const [head, body] = Frame.unpack(resp); const headData = new NapProtoMsg(RespDataHighwayHead).decode(head); - this.logger.log(`[Highway] httpUploadBlock: ${headData.errorCode} | ${headData.msgSegHead?.retCode} | ${headData.bytesRspExtendInfo} | ${head.toString('hex')} | ${body.toString('hex')}`); + this.logger.logDebug(`[Highway] httpUploadBlock: ${headData.errorCode} | ${headData.msgSegHead?.retCode} | ${headData.bytesRspExtendInfo} | ${head.toString('hex')} | ${body.toString('hex')}`); if (headData.errorCode !== 0) { this.logger.logError(`[Highway] httpUploadBlock failed (code=${headData.errorCode})`); } } - private async httpPostHighwayContent(frame: Buffer, serverURL: string, end: boolean): Promise { + private async httpPostHighwayContent(frame: Buffer, serverURL: string): Promise { return new Promise((resolve, reject) => { try { const options: http.RequestOptions = { @@ -186,7 +181,6 @@ export class HighwayHttpUploader extends HighwayUploader { data = Buffer.concat([data, chunk]); }); res.on('end', () => { - // console.log(`[Highway] postHighwayContent: ${data.toString('hex')}`); resolve(data); }); });